基本的结构化操作
模式
模式定义DataFrame的列名以及列的数据类型,它可以由数据源来定义(称作读时定义),也可以由用户自己手动定义(显式定义)。
注意:实际应用场景决定了定义schema的方式。应当用于即席分析时,使用读时模式即可。不过,使用读时模式极有可能出现类型读取错误的问题,如long型读成了int类型,这回导致精度损失,所以最好采用显式的schema定义。
示例:
spark.read.format("json").load("./wj.json").schema 这里是使用了读时模式进行schema的定义。
一个模式由多个字段构成的StructType。
这些字段即为StructField,StructField具有名称、类型、布尔标志(该标志用于标记该列是否可以有缺失值或空值),并且用户可以指定与该列系关联的元数据(metadata)。元数据存储着有关此列的信息(spark的机器学习库中会使用该功能)。
模式还包含其他的StructType(spark的复杂类型)。
示例:
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
mySchema = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age",IntegerType(),True)
])列和表达式
列
有很多不同的方法来构造和引用列,其中最常用的两种方法是col和column函数。使用这两个函数需要传入列名。
示例:
from pyspark.sql.functions import col, column
col("name")
column("id")在scala中有两种更简短的引用行的方法。
示例:
$"name"
'id符号$将字符串指定为表达式,而符号( ' )指定一个symbol,是scala引用标识符的特殊结构。
显式列引用
如果需要对dataframe的某一列进行引用,则可以直接在该dataframe上使用col方法。当进行连接操作的时候,如果两个dataframe有着同名列,该方法会非常有用。
示例:
df.col("name")表达式
表达式(expression)是对一个dataframe中某一个记录的一个或多个值的一组转换操作。
通常情况下,最简单的方法是通过expr函数创建仅仅对一个dataframe列引用的表达式。即单列应用的情况下expr和col等同。
示例:
expr("name")
col("name")列作为表达式
列提供了表达式功能的一个子集。如果使用col,并想对该列执行转换操作,则必须对该列的引用执行这些转换操作。使用表达式时,expr函数实际上可以将字符串解析成转换操作和列引用,也可以在之后传递到下一步的转换操作。
示例:
expr("age - 5")
col("age")-5
expr("age")-5以上三个方法都是相同的转换操作。spark将他们编译为表示顺序操作的逻辑树。
注意:
- 列只是表达式。
- 列与对这些列的转换操作被编译后生成的逻辑计划,与解析后的表达式的逻辑计划是一样的。
访问dataframe的列
想要在程序中访问所有的列,可以使用columns方法来查询
示例:
df.columns记录和行
在sapark中,dataframe的每一行都是一个记录,而记录是row类型的对象。spark使用列表达式操纵row类型对象。
row对象内部其实是字节数组,但是spark没有提供相应的访问用的接口,所以只能通过列表达式去操纵。
当使用dataframe时,向驱动器请求行的命令总是返回一个或多个Row类型的行数据。
创建行
spark可以基于已知的每列数值去手动实例化一个Row对象来创建行。
手动创建行必须按照该行所附属的DataFrame的顺序来初始化Row对象。
示例:
from pyspapk.spl import Row
val MyRow = Row(“kirito”,2017,17)访问行数据
访问行数据的方法同样简单:只需要像访问数组和列表一样,利用下标访问即可。
示例:
MyRow[0]DataFrame转换操作
DataFrame转换操作的核心为:
- 添加行或列
- 删除行或列
- 将一行转换操作为列
- 根据列中的值更改行的顺序
创建DataFrame
可以使用数据源创建DataFrame,也可以创建临时视图。
示例:
df = spark.read.format("json").load("lxyd.json")
df.createOrReplaceTempiew("dftable")我们也可以通过获取一组行并将他们转换操作为一个DataFrame来即时创建一个Dataframe。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
myRow: Row = Row("kirito", 1999, 17)
myDf = spark.createDataFrame([myRow], mySchame)
myDf.show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |kirito|1999| 17|
# --> +------+----+---+select函数和selectExpr函数
select函数和selectExpr函数支持在DataFrame上执行类似数据表的SQL的查询。
单列查询
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 单列查询
# # # # # # # # # # # # # # # # # # #
myDf.select("name").show() # 执行查询 -->
# --> +------+
# --> | name|
# --> +------+
# --> |kirito|
# --> | asina|
# --> | allen|
# --> +------+多列查询
多列查询只需要在select中调用更多的列名即可。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 多列查询
# # # # # # # # # # # # # # # # # # #
myDf.select("name", "age").show() # 执行查询 -->
# --> +------+---+
# --> | name|age|
# --> +------+---+
# --> |kirito| 17|
# --> | asina| 16|
# --> | allen| 17|
# --> +------+---+等价互换
DataFrame可以通过多种不同的方式引用列,而且这些方式可以等价互换。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, column, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 列引用
# # # # # # # # # # # # # # # # # # #
myDf.select(expr("id"), col("name"), column("age")).show() # 执行查询 -->
# --> +----+------+---+
# --> | id| name|age|
# --> +----+------+---+
# --> |1999|kirito| 17|
# --> |2000| asina| 16|
# --> |2001| allen| 17|
# --> +----+------+---+ 需要注意的是:column对象和字符串类型不能够一起混用,否则会报错。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, column, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 混用
# # # # # # # # # # # # # # # # # # #
myDf.select(col("name"), "age").show() # 这里在python中执行的时候并没有报错,可能是scala和java中特有的情况。操作
expr是相当灵活的引用方式。它能够改引用一列,也能够引用对列进行操作的字符串表达式。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, column, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 expr灵活应用
# # # # # # # # # # # # # # # # # # #
myDf.select(expr("name as actor")).show(0) # 直接引用字符串表达式,通过as方法进行重命名列
myDf.select(expr("name as actor").alias("name")).show(0) # 也可以通过调用方法来进行操作,通过调用alias方法来进行重命名。
# 执行查询 -->
# 执行查询 -->
# --> +-----+
# --> |actor|
# --> +-----+
# --> +----+
# --> |name|
# --> +----+selectExpr
因为select后面使用expr是非常常见的写法,所以spark有一个有效地描述此操作序列的接口:selectExpr。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, column, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 selectExpr的使用
# # # # # # # # # # # # # # # # # # #
myDf.selectExpr("name as actor").show() # 执行查询 -->
# --> +------+
# --> | actor|
# --> +------+
# --> |kirito|
# --> | asina|
# --> | allen|
# --> +------+
myDf.select("name as actor").show() # 执行查询 --> 会报错
# --> pyspark.sql.utils.AnalysisException: "cannot resolve '`name as actor`' given input columns: [name, id, age];;\n'Project ['name as actor]\n+- LogicalRDD [name#0, id#1L, age#2], false\n"转换操作成spark类型(字面量)
有时候需要给spark传递显式的值,它们只是一个值而非新列。这可能是一个常量,或是接下来需要比较的值。
传递值的方式是通过字面量(literal)传递。字面量就是表达式。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 字面量
# # # # # # # # # # # # # # # # # # #
myDf.select("name", lit(1).alias("value")).show() # -->
# --> +------+-----+
# --> | name|value|
# --> +------+-----+
# --> |kirito| 1|
# --> | asina| 1|
# --> | allen| 1|
# --> +------+-----+ 当需要比较一个值是否大于一个常量或者是程序创建的变量时,就可以使用这个方法。
添加列
使withColumn方法可以为DataFrame添加新列,这种方式也更加规范一些。
注意:
- 常用列对象:' 、$ 、col 、column。
- withColumn的第二个参数要传入已有列的Column对象,否则会报错。
- sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, column, expr, lit
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 添加列
# # # # # # # # # # # # # # # # # # #
myDf.withColumn("atk",lit(1)).show() # -->
# --> +------+----+---+---+
# --> | name| id|age|atk|
# --> +------+----+---+---+
# --> |kirito|1999| 17| 1|
# --> | asina|2000| 16| 1|
# --> | allen|2001| 17| 1|
# --> +------+----+---+---+重命名列
在spark中,有两种重命名列的方法,一种是通过withColumn对列重命名,一种是通过withColumnRenamed方法对列重命名。
withColumn:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 重命名列
# # # # # # # # # # # # # # # # # # #
myDf.withColumn("actor", expr("name")).show() # -->
# --> +------+----+---+------+
# --> | name| id|age| actor|
# --> +------+----+---+------+
# --> |kirito|1999| 17|kirito|
# --> | asina|2000| 16| asina|
# --> | allen|2001| 17| allen|
# --> +------+----+---+------+withColumnRenamed:
# # # # # # # # # # # # # # # # # # #
# 重命名列
# # # # # # # # # # # # # # # # # # #
myDf.withColumnRenamed("name", "actor").show() # -->
# --> +------+----+---+
# --> | actor| id|age|
# --> +------+----+---+
# --> |kirito|1999| 17|
# --> | asina|2000| 16|
# --> | allen|2001| 17|
# --> +------+----+---+保留字与关键字
在spark程序的开发中,遇到列名中包含空格或者连字符等保留字,要处理这些保留字符号意味着要适当地对列名进行转义。
在spark中,通过使用反引号( ` )字符来实现转义。
示例:
没有使用转义符:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("actor name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 表达式转义
# # # # # # # # # # # # # # # # # # #
myDf.select(expr("actor name as name")).show() # -->
# --> pyspark.sql.utils.ParseException: "\nmismatched input 'as' expecting <EOF>(line 1, pos 11)\n\n== SQL ==\nactor name as name\n-----------^^^\n"使用转义符:
# # # # # # # # # # # # # # # # # # #
# 查询部分 select部分 表达式转义
# # # # # # # # # # # # # # # # # # #
myDf.select(expr("`actor name` as name")).show() # -->
# --> +------+
# --> | name |
# --> +------+
# --> |kirito|
# --> | asina|
# --> | allen|
# --> +------+注意:
- 显式地引用列,可以直接引用带有保留字符的类。如
col("actor name")。 - 只需要转义使用保留字符或者关键字的表达式。
区分大小写
spark默认是不区分大小写的,但是可以通过配置spark.sql.caseSensitive使spark区分大小写。
示例:
set spark.sql.caseSensitive true删除列
spark可以通过select进行类似于sql的查询操作,同样也可以使用drop方法来进行删除列。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("actor name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 删除列
# # # # # # # # # # # # # # # # # # #
myDf.drop("id").show() # -->
# --> +----------+---+
# --> |actor name|age|
# --> +----------+---+
# --> | kirito| 17|
# --> | asina| 16|
# --> | allen| 17|
# --> +----------+---+更改列的类型(强制类型转换)
在spark中,可以通过更改列的类型来转换数据类型。所需要使用到的一个方法是cast方法。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 更改列的类型
# # # # # # # # # # # # # # # # # # #
myDf.withColumn("name",col("name").cast("long")).show() # -->
# --> +----+----+---+
# --> |name| id|age|
# --> +----+----+---+
# --> |null|1999| 17|
# --> |null|2000| 16|
# --> |null|2001| 17|
# --> +----+----+---+过滤行
过滤行只需要创建一个表达式来判断这个表达式是true还是false,然后过滤掉返回值为false的表达式。
spark提供了两种实现过滤的方式,分别为where和filter。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen], mySchame)
# # # # # # # # # # # # # # # # # # #
# 过滤列
# # # # # # # # # # # # # # # # # # #
myDf.filter(expr("age < 17")).show() # -->
# --> +-----+----+---+
# --> | name| id|age|
# --> +-----+----+---+
# --> |asina|2000| 16|
# --> +-----+----+---+# # # # # # # # # # # # # # # # # # #
# 过滤列
# # # # # # # # # # # # # # # # # # #
myDf.where(expr("age > 16")).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |kirito|1999| 17|
# --> | allen|2001| 17|
# --> +------+----+---+去重
在DataFrame中去重,可以使用distinct方法。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
# # # # # # # # # # # # # # # # # # #
# 去重
# # # # # # # # # # # # # # # # # # #
myDf.select("*").distinct().show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | allen|2001| 17|
# --> | asina|2000| 16|
# --> |kirito|1999| 17|
# --> +------+----+---+随机抽样
在spark中,可以使用sample方法来进行随即抽样的工作。
sample(withReplacement,fraction,seed)有三个参数,一般来说只需要前两个参数。
参数解释:
- withReplacement:该参数表示了是否放回抽样数据,true为放回抽样,false则是不放回(无重复样本抽样)。
- fraction:该参数表示了抽取样本占据数据总数的比例,该参数的值在0-1之间。
- seed:该参数用于取定值,即填入该参数后,sample每一次取出的样本都会是一样的。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit,expr,col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
# # # # # # # # # # # # # # # # # # #
# 随即抽样
# # # # # # # # # # # # # # # # # # #
seed = 5
withReplacement = False
fraction = 0.3
myDf.sample(withReplacement,fraction,seed).show() # -->
# --> +-----+----+---+
# --> | name| id|age|
# --> +-----+----+---+
# --> |asina|2000| 16|
# --> |allen|2001| 17|
# --> +-----+----+---+随机分割
当需要将DataFrame随机分割为多个分片时,可以使用随机分割(randomSplit)。
randomSplit(list(),send)有两个参数,一般只需要用第一个参数即可。
参数解释:
- list():表示一个列表,列表中的参数的和必须在0-1之间。
- seed:该参数用于取定值,即填入该参数后,randomSplit每一次分割的结果都会是一样的。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, expr, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
# # # # # # # # # # # # # # # # # # #
# 随机分割
# # # # # # # # # # # # # # # # # # #
seed = 1
fgDf = myDf.randomSplit([0.25, 0.75],seed)
fgDf[0].show() # -->
# --> +-----+----+---+
# --> | name| id|age|
# --> +-----+----+---+
# --> |allen|2001| 17|
# --> +-----+----+---+连接和追加行(联合操作)
DataFrame是一个不可变的数据类型。即,在创建了DataFrame后,不能给DataFrame追加行。不过可以通过两个DataFrame联合操作(union)的方式实现该效果。
常见的作法有两种:一是创建新的临时试图,二是注册成一个数据表。
注意:
- 联合操作是基于位置而不是基于schame模式来进行合并的。
- 联合操作不会自动根据列名匹配对齐后再进行合并。
- 两个联合的DataFrame需要完全相同的模式和列数。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, expr, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
youSchame = StructType([
StructField("name",StringType(),False),
StructField("id",LongType(),False),
StructField("age",IntegerType(),True)
])
Edward: Row = Row("Edward", 2333, 17)
Ella: Row = Row("Ella", 2233, 16)
Deval: Row = Row("Deval", 1987, 17)
edmond: Row = Row("edmond", 1132, 17)
youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame)
# # # # # # # # # # # # # # # # # # #
# 连接和追加行(联合操作)
# # # # # # # # # # # # # # # # # # #
newDf = myDf.union(youDf)
newDf.show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |kirito|1999| 17|
# --> | asina|2000| 16|
# --> | allen|2001| 17|
# --> | allen|2001| 17|
# --> |Edward|2333| 17|
# --> | Ella|2233| 16|
# --> | Deval|1987| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+小技巧:在scala中,可以使用=!=运算符,该运算符不仅可以比较字符串,也可以比较表达式。
行排序
在DataFrame中进行排序的时候,可以使用sort和orderBy方法。这两个方法是互相等价的,执行的方式也一样。默认设置是按升序排序。如果要更明确地指定排序方式,则需要使用asc函数和desc函数。
提示:
一个高级的技巧:可以指定空值在排序中的位置。
- asc_nulls_first和desc_nulls_first都表示空值排在最前面,asc_nulls_last和desc_nulls_last都表示空值排在最后面。
- 出于性能优化的目的,最好在进行别的转换之前,先对每个分区进行内部排序。可以使用
sortWithinPartitions方法进行这一操作。
注意:
- sort和orderBy方法,这两个方法是互相等价的,执行的方式也一样。
- sort和orderBy方法,两个可以接收字符串也可以接收列表达式。
- sort和orderBy方法,两个方法可以接收多列。
- 默认设置是按升序排序。
- 如果要更明确地指定排序方式,则需要使用asc函数和desc函数。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, expr, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
youSchame = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
Edward: Row = Row("Edward", 2333, 17)
Ella: Row = Row("Ella", 2233, 16)
Deval: Row = Row("Deval", 1987, 17)
edmond: Row = Row("edmond", 1132, 17)
youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame)
# # # # # # # # # # # # # # # # # # #
# 行排序 sort和orderBy
# # # # # # # # # # # # # # # # # # #
youDf.sort("age").show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> |Edward|2333| 17|
# --> | Deval|1987| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+
youDf.orderBy("age").show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> | Deval|1987| 17|
# --> |edmond|1132| 17|
# --> |Edward|2333| 17|
# --> +------+----+---+# # # # # # # # # # # # # # # # # # #
# 行排序 使用列表达式
# # # # # # # # # # # # # # # # # # #
youDf.sort(expr("age")).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> |edmond|1132| 17|
# --> | Deval|1987| 17|
# --> |Edward|2333| 17|
# --> +------+----+---+
youDf.orderBy(col("age")).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> | Deval|1987| 17|
# --> |Edward|2333| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+# # # # # # # # # # # # # # # # # # #
# 行排序 多列排序
# # # # # # # # # # # # # # # # # # #
youDf.sort("age", "name").show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> | Deval|1987| 17|
# --> |Edward|2333| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+
youDf.orderBy("age", "name").show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> | Deval|1987| 17|
# --> |Edward|2333| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+# # # # # # # # # # # # # # # # # # #
# 行排序 使用asc和desc
# # # # # # # # # # # # # # # # # # #
youDf.sort(asc("age")).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> |Edward|2333| 17|
# --> | Deval|1987| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+
youDf.orderBy(desc("age")).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |edmond|1132| 17|
# --> | Deval|1987| 17|
# --> |Edward|2333| 17|
# --> | Ella|2233| 16|
# --> +------+----+---+
youDf.sort(expr("age").desc()).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |edmond|1132| 17|
# --> |Edward|2333| 17|
# --> | Deval|1987| 17|
# --> | Ella|2233| 16|
# --> +------+----+---+# # # # # # # # # # # # # # # # # # #
# 行排序 分区内部排序
# # # # # # # # # # # # # # # # # # #
youDf.repartition(1).sortWithinPartitions("age").show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> | Ella|2233| 16|
# --> |Edward|2333| 17|
# --> | Deval|1987| 17|
# --> |edmond|1132| 17|
# --> +------+----+---+
# 注意,这里已经使用了repartition进行了重新分区,现在所进行的排序是一个分区内的。注意:
- 在pyspark下,
sort(expr("age desc"))写法排序不会起作用。
限制提取limit方法
在dataframe中,可以使用limit方法来限制每次可提取的数据数量。
提示:
- 该方法结合排序方法,可以获取数据排名前多少行或后多少行的数据。
示例:
# # # # # # # # # # # # # # # # # # #
# 限制提取
# # # # # # # # # # # # # # # # # # #
youDf.limit(2).show() # -->
# --> +------+----+---+
# --> | name| id|age|
# --> +------+----+---+
# --> |Edward|2333| 17|
# --> | Ella|2233| 16|
# --> +------+----+---+重划分和合并
重划分
根据一些经常过滤的数据进行分区,是一项重要的优化方案。控制跨集群数据的物理布局,包括分区方案和分区数。
注意:
- 重新分区会导致数据的全面洗牌。
- 如果将来的分区数大于当前的分区数,或是需要基于某一组特定的列来进行分区时,通常只能重新分区。
- 可以使用rdd下的方法
getNumPartitions来获取当前dataframe的分区数。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, expr, col, asc, desc
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
youSchame = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
Edward: Row = Row("Edward", 2333, 17)
Ella: Row = Row("Ella", 2233, 16)
Deval: Row = Row("Deval", 1987, 17)
edmond: Row = Row("edmond", 1132, 17)
youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame)
# # # # # # # # # # # # # # # # # # #
# 重划分和合并 重划分
# # # # # # # # # # # # # # # # # # #
print(myDf.rdd.getNumPartitions()) # --> 8
selfDf = myDf.repartition(2)
print(selfDf.rdd.getNumPartitions()) # --> 2# # # # # # # # # # # # # # # # # # #
# 重划分和合并 根据特定列进行重划分
# # # # # # # # # # # # # # # # # # #
print(myDf.rdd.getNumPartitions()) # --> 8
selfDf = myDf.repartition("age")
print(selfDf.rdd.getNumPartitions()) # --> 200# # # # # # # # # # # # # # # # # # #
# 重划分和合并 根据特定列指定分区数量进行重划分
# # # # # # # # # # # # # # # # # # #
print(myDf.rdd.getNumPartitions()) # --> 8
selfDf = myDf.repartition(2,"age")
print(selfDf.rdd.getNumPartitions()) # --> 2合并
合并操作使用coalesece方法。合并操作不会重新洗牌,但是会尝试合并分区。
注意:
- 合并操作不会使合并的分区大于原本的分区。
示例:
# # # # # # # # # # # # # # # # # # #
# 重划分和合并 合并
# # # # # # # # # # # # # # # # # # #
print(myDf.rdd.getNumPartitions()) # --> 8
selfDf = myDf.repartition("age")
print(selfDf.rdd.getNumPartitions()) # --> 200
mySelfDf = selfDf.coalesce(1)
print(mySelfDf.rdd.getNumPartitions()) # --> 1# # # # # # # # # # # # # # # # # # #
# 重划分和合并
# # # # # # # # # # # # # # # # # # #
print(myDf.rdd.getNumPartitions()) # --> 8
mySelfDf = myDf.coalesce(10)
print(mySelfDf.rdd.getNumPartitions()) # --> 8驱动器获取行
spark的驱动器维护着集群状态,有时候需要让驱动器收集一些数据到本地,以方便在本地处理它们。
这个操作并没有明确的定义,有以下几种方法可以实现这个效果:
- collect:该函数会获取整个Dataframe的数据。
- take:该函数会获取前N行,并使用show打印一些行。
- toLocalIterator:该函数是一个迭代器,会将每个分区的数据返回给驱动器。该函数允许以串行的方式一个一个分区地迭代整个数据集。
注意:
- 将数据集传递给驱动器的代价很高。当数据量很大的时候调用collect函数,可能会导致驱动器崩溃。如果使用toLocalIterator,并且分区很大,则容易使驱动器节点崩溃并丢失应用程序的状态,代价也是巨大的。因此我们可以一个一个分区进行操作,而不是并行运行。
示例:
# 添加此代码
import findspark
findspark.init() # 用于查找本机spark配置位置
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit, expr, col, asc, desc
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
conf = SparkConf().setAppName("createCS").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mySchame: StructType = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
kirito: Row = Row("kirito", 1999, 17)
asina: Row = Row("asina", 2000, 16)
allen: Row = Row("allen", 2001, 17)
shadow: Row = Row("allen", 2001, 17)
myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame)
youSchame = StructType([
StructField("name", StringType(), False),
StructField("id", LongType(), False),
StructField("age", IntegerType(), True)
])
Edward: Row = Row("Edward", 2333, 17)
Ella: Row = Row("Ella", 2233, 16)
Deval: Row = Row("Deval", 1987, 17)
edmond: Row = Row("edmond", 1132, 17)
youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame)
# # # # # # # # # # # # # # # # # # #
# 驱动器获取行
# # # # # # # # # # # # # # # # # # #
print(type(myDf.take(2))) # --> <class 'list'>
print(myDf.take(2)) # --> [Row(name='kirito', id=1999, age=17), Row(name='asina', id=2000, age=16)] 

Comments | NOTHING