sparkRDD编程


spark RDD编程

image-20220417094827722.png

spark-shell2.x版本中,默认将SparkContext类实例化为了sc,将SparkSession实例化为了spark。使用时直接调用sc和spark即可。

创建RDD

1. 从内存中创建

​ 在spark中,可以使用parallelize方法和makeRDD两种方法从内存中创建RDD。

  • parallelize方法是将seq(即序列)转化为RDD

  • makeRDD方法是从已有的RDD创建为新的RDD

(1) parallelize

parallelize(seq, num)

​ 该方法可以将seq集合转化为RDD。

参数解释:

  • seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数)
  • num表示分区数。不设分区,则默认为该application分配到的资源的CPU数。

格式:

val 常量名 = sc.parallelize(seq, num);

示例:

var datas = Array(1,2,3,4,5);
val lx = sc.parallelize(datas);
println(lx.partitions.size) // --> 8
var datas = Array(1,2,3,4,5);
val lx = sc.parallelize(datas,3);
println(lx.partitions.size) // --> 3

提示:使用rdd.partitions.size方法可以输出rdd所用分区数。

注意:这里第一次输出8,是因为分配了cpu的8个核心资源。

(2)makeRDD

makeRDD(seq)

​ 该方法是从已有的RDD创建为新的RDD。

​ 该方法创建的RDD的分区数取决于集合元素。

参数解释:

  • seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数)

格式:

val 常量名 = sc.makeRDD(seq)

示例:

var datas = Array(1,2,3,4,5);
val lx = sc.makeRDD(datas);

从外存中创建

​ 在spark中,使用textFile方法可以从外存中读取数据集。该方法支持多种类型的数据集,如目录、文本文件、压缩文件、和通配符匹配的文件等,并且允许设定分区个数。

textFile(path, num)

参数解释:

  • path就是外存文件的获取路径。默认使用hdfs协议,访问本地时需要使用file协议。
  • num即分区数。
(1)从HDFS文件创建RDD

这种方法最常用。textFile默认读取HDFS中的文件。

格式:

val 常量名 = sc.textFile("文件路径")
val 常量名 = sc.textFile("hdfs://文件路径")

示例:

val tests = sc.textFile("/lxyd/test.txt")
val tests = sc.textFile("hdfs://主节点:端口号/lxyd/test.txt")

image-20220417113955153.png

image-20220417113035551.png

(2)从本地创建RDD

同样通过textFile方法读取文件。

格式:

val 常量名 = sc.textFile("file://文件路径")

示例:

val tests = sc.textFile("file:///lxyd/test.txt")

image-20220417114033536.png

转换

spark提供了大量的转换RDD用的方法。

map

map(fun)

​ 该方法通过参数中的函数将RDD中的各个元素处理后,返回一个新的RDD。

参数解释:

  • fun是一个函数表达式。map会将调用map方法的RDD中的元素一一作为参数传递给fun,fun返回的值又会被map组合为一个新的RDD返回。

格式:

val 常量名 = rdd.map(fun)

示例:

val ageArr = Array(12,13,13,15)
val age = sc.parallelize(ageArr)
val newAge = age.map(x=>x+1)
newAge.collect // --> (12,14,14,16)

image-20220417155749820.png

flatMap

flatMap(fun)

​ 该方法与map方法类似。将元素处理,然后再转换为同级别,最后组成一个新的RDD返回。

参数解释:

  • fun是一个函数表达式。flatMap会将调用采用类似map的方法将RDD中的元素一一作为参数传递给fun,fun返回的值又会被flatMap转换为同级,组合为一个新的RDD返回。

格式:

val 常量名 = rdd.flatMap(fun)

示例:(采用map于其做比较)

var wordsArr = Array("holle world", "I is test", "我只是 一个 测试")
val words = sc.parallelize(wordsArr)
val newWords = words.map(x=>x.split(" "))
newWords.collect // -->Array(Array(holle, world), Array(I, is, test), Array(我只是, 一个, 测试))
var wordsArr = Array("holle world", "I is test", "我只是 一个 测试")
val words = sc.parallelize(wordsArr)
val newWords = words.flatMap(x=>x.split(" "))
newWords.collect // -->Array(holle, world, I, is, test, 我只是, 一个, 测试)

image-20220417161258775.png

filter

filter(fun)

​ 该方法用于过滤RDD中不需要的元素。

参数解释:

  • fun是一个函数表达式。filter会将调用该方法的RDD的元素一一作为参数传入fun,符合fun函数条件的元素会被处理后返回给filter,filter会将最后留下的元素组合为一个新的RDD并返回。

格式:

val 常量名 = rdd.filter(fun)

示例:

var ageArr = Array(12,13,14,14,51,15)
val age = sc.parallelize(ageArr)
val newAge = age.filter(x=>x<20)
newAge.collect // -->Array(12, 13, 14, 14, 15)

distinct

distinct()

​ 该方法可以去除RDD中的重复值,并返回一个新的RDD

格式:

val 常量名 = rdd.distinct()

示例:

var actorArr = Array("kirito", "kirito", "asina", "asina")
val actor = sc.parallelize(actorArr)
val newActor = actor.distinct()
newActor.collect // -->Array(kirito, asina)

union

union(RDD)

​ 该方法用于合并多个RDD,并返回一个新的RDD。

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

格式:

val 常量名 = rdd.union(rdd)

示例:

var ageoArr = Array(12,13,14)
var agetArr = Array(15,16,17)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.union(aget)
newAge.collect // --> Array(12, 13, 14, 15, 16, 17)

intersection

intersection(RDD)

​ 该方法用于获取两个RDD之间相同的元素,并返回一个新的RDD

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

格式:

val 常量名 = rdd.intersection(rdd)

示例:

var ageoArr = Array(12,14,15,13,12,12)
var agetArr = Array(12,12,15)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.intersection(aget)
newAge.collect // --> Array(12,15)

subtract

subtract(RDD)

​ 该方法的调用,调用该方法的RDD会被删去已在参数中RDD中出现过的元素,并将剩下的元素组成新的RDD返回

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

格式:

val 常量名 = rdd.subtract(rdd)

示例:

var ageoArr = Array(12,13,14)
var agetArr = Array(12,15,16)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAgeo = ageo.subtract(aget)
newAgeo.collect // --> Array(13,14)
val newAget = aget.subtract(ageo)
newAget.collect // --> Array(15,16)

cartesian

cartesian(RDD)

​ 该方法会将两个RDD中元素组合为笛卡尔积,并组成新的RDD返回。

​ 假设RDDA有4个元素,RDDB有4个元素,RDDA中的每一个元素都会与RDDB中的每个元素组成一组,最后会返回16个元素。

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

格式:

val 常量名 = rdd.cartesian(rdd)

示例:

var ageoArr = Array(1,2,3,2)
var agetArr = Array(1,5,7,9)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.cartesian(aget)
newAge.collect // -->Array((1,1), (1,5), (1,7), (1,9), (2,1), (2,5), (2,7), (2,9), (3,1), (3,5), (3,7), (3,9), (2,1), (2,5), (2,7), (2,9))

reduce

reduce(fun)

​ 该方法会将RDD中的元素一一取出,进行处理,并直接返回结果。

参数解释:

  • fun表示函数表达式。默认有两个参数,用于处理并返回。

格式:

val 常量名 = rdd.reduce(fun)

示例:

var ageArr = Array(1,2,3,4,5,6,7)
val age = sc.parallelize(ageArr)
val newAge = age.reduce((a,b)=>a*b)
println(newAge) // --> 5040

reduceByKey

reduceByKey(fun)

​ 当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值传入fun函数中,fun处理完毕后,将新的元素组合为RDD并返回。

参数解释:

  • fun表示函数表达式。默认有两个参数,用于处理并返回。

格式:

val 常量名 = rdd.reduceByKey(fun)

示例:

var actorArr = Array(("kirito",11),("kirito",11),("asina",12),("asina",14))
val actor = sc.parallelize(actorArr)
val newActor = actor.reduceByKey((a,b)=>a*b)
newActor.collect // --> Array((kirito,121), (asina,168))

groupByKey

groupByKey()

​ 当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值划分为一组(以Iterable的方式存放),并组合为新的RDD返回。

格式:

val 常量名 = rdd.groupByKey()

示例:

var actorArr = Array(("kirito",1),("kirito",12),("asina",43),("asina",32))
val actor = sc.parallelize(actorArr)
val newActor = actor.groupByKey()
newActor.collect // --> Array((kirito,CompactBuffer(1, 12)), (asina,CompactBuffer(43, 32)))

join

join(RDD)

​ 当转化为RDD的数据集为键值对形式的时候,该方法会提取两个RDD键相同的值组合在一起,组合成一个新的RDD并返回。

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

格式:

val 常量名 = rdd.join(rdd)

示例:

var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLx= lxo.join(lxt)
newLx.collect // --> Array(("a",(1,2)))

leftOuterJoin

RDDA leftOuterJoin RDDB

​ 对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD1的其他键值对一起组合成一个新的RDD并返回。

格式:

val 常量名 = rdd1 leftOuterJoin rdd2

示例:

var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLxo = lxo leftOuterJoin lxt
newLxo.collect // --> Array((a,(1,Some(2))), (b,(2,None)), (c,(3,None)))
val newLxt = lxt leftOuterJoin lxo
newLxt.collect // --> Array((a,(2,Some(1))), (d,(2,None)), (e,(3,None)))

rightOuterJoin

RDDA rightOuterJoin RDDB

​ 对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD2的其他键值对一起组合成一个新的RDD并返回。

格式:

val 常量名 = rdd rightOuterJoin rdd

示例:

var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLxo = lxo rightOuterJoin lxt
newLxo.collect // -->Array((a,(Some(1),2)), (d,(None,2)), (e,(None,3)))
val newLxt = lxt rightOuterJoin lxo
newLxt.collect // -->Array((a,(Some(2),1)), (b,(None,2)), (c,(None,3)))

zip

zip(RDD)

​ 该方法可以将两个长度一致的RDD压缩到一起,以键值对的形式组合为一个新的RDD并返回。

参数解释:

  • RDD即直接填入一个RDD即可。(一次只能填入一个)

注意:

  • 该方法的两个rdd长度必须一致

格式:

val 常量名 = rdd.zip(rdd)

示例:

val nameArr = Array("a","b","c","d","e")
val ageArr = Array(1,2,3,4,5)
val name = sc.parallelize(nameArr)
val age = sc.parallelize(ageArr)
val actor = name.zip(age)
actor.collect // --> Array((a,1), (b,2), (c,3), (d,4), (e,5))

combineByKey

combineByKey(createCombiner,mergeValue,mergeCombiners)

​ 该方法用于将相同键的数据聚合,并允许返回类型与输入数据不同的返回值。

参数解释:

  • createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
  • mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)(如果分区太多,每个分区只有一个参数就不会执行该操作)
  • mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)

注意:

这玩意儿一定要注意分区,如果分区数高于元素数量,则会直接执行mergeCombiners。

格式:

val 常量名 = actor.combineByKey(
    V=>C,
    (获取C的变量名:Int,获取V的变量名:Int)=>C,
    (获取新C值的变量名:Int,获取C值的变量名:Int)=>C
)

示例:

val nameArr = Array("a","a","a","a","c","c")
val ageArr = Array(1,2,3,4,5,6)
val name = sc.parallelize(nameArr)
val age = sc.parallelize(ageArr)
val actor = name.zip(age)
val newActor = actor.combineByKey(
    x=>{println(s"x是$x,x+1=${x+1}");x+1},
    (x:Int,acc:Int)=>{println(s"$acc + $x");acc+x},
    (x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}
)
newActor.collect 
 // --> x是1,x+1=2
 // --> x是3,x+1=4
 // --> x是6,x+1=7
 // --> x是2,x+1=3
 // --> x是4,x+1=5
 // --> x是5,x+1=6
 // --> 3 - 2
 // --> 7 - 6
 // --> 4 - 1
 // --> 5 - 3
 // --> Array[(String, Int)] = Array((a,2), (c,1))
val nameArr = Array("a","a","a","a","c","c")
val ageArr = Array(1,2,3,4,5,6)
val name = sc.parallelize(nameArr,3)
val age = sc.parallelize(ageArr,3)
val actor = name.zip(age)
val newActor = actor.combineByKey(
    x=>{println(s"x是$x,x+1=${x+1}");x+1},
    (x:Int,acc:Int)=>{println(s"$acc + $x");acc+x},
    (x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}
)
newActor.collect 
 // --> x是5,x+1=6
 // --> x是3,x+1=4
 // --> x是1,x+1=2
 // --> 6 + 6
 // --> 4 + 4
 // --> 2 + 2
 // --> 8 - 4
 // --> Array[(String, Int)] = Array((c,12), (a,4))

代码解释:

​ 这里被分为三个分区。第一个分区是(a,1)(a,2)第二个分区是(a,3),(a,4)。第一个分区执行的时候先传入(a,1),然后执行x=>{println(s"x是$x,x+1=${x+1}");x+1}
这个加1的操作,变成了2,这里只会执行一次。然后会执行第二条
(x:Int,acc:Int)=>{println(s"$acc + $x");acc+x},这里会把上一条的2和(a,2)里的2一起作为参数传进来,变成4。然后这个分区没有别的元素了,所以就直接返回出来。然后是第二个分区也执行一遍相同的操作。所以一号分区的值是4,二号分区的值是8。接着就会执行第三条也就是分区之间的操作
(x:Int,acc:Int)=>{println(s"$acc - $x");acc-x},8-4,得到最终结果4。

查询

collect

collect()

​ 该方法是一个动作操作,会把RDD所有元素转换为数组并返回到Driver端,适用于小数据处理。

格式:

var 常量名 = rdd.collect

示例:

var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
var a = age.collect
println(a(2)) // --> 3

take

take(num)

​ 该方法可以获取RDD的前num个元素,并返回数组

参数解释:

  • num表示需要返回的元素个数。(超出范围只按最大范围算)

格式:

var 常量名 = rdd.take(num)

示例:

var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
val a = age.take(3) // --> Array(1,2,3)
for(b:Int<-a){
    println(b)
} // --> 1 2 3
var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
val a = age.take(6) // --> Array(1,2,3)
for(b:Int<-a){
    println(b)
} // --> 1 2 3 4 5

lookup

lookup(args)

​ 该方法用于获取键值对类型RDD中键匹配的所有值。

参数解释:

  • args表示键值对中的键。

格式:

var 变量名 = rdd.lookup(args)

示例:

var actorArr = Array(("a",1),("b",2),("a",3),("c",4))
val actor = sc.parallelize(actorArr)
val a = actor.lookup("a") // -->WrappedArray(1, 3)
println(a)
for(b:Int<-a){
    println(b)
} // --> 1 3

排序

sortBy

sortBy(f(T)=>K,ascending,numPartitions)

​ 该方法可以对标准RDD进行排序。

参数解释:

  • f(T)=>k,f(T)表示要被排序对象的每一个元素,右边则是返回元素中要进行排序的值。
  • ascending,表示排序的顺序,参数为true或是flase,默认是true即升序。
  • numPartitions,表示排序后的RDD的分区个数,默认排序分区后的分区个数和排序之前的个数相同,即为this.partitions.size

格式:

val 常量名 = rdd.sortBy(f(T)=>k,ascending,numPartitions)

示例:

var ageArr = Array(3,45,13,88,13)
val age = sc.parallelize(ageArr)
val newAge = age.sortBy(x=>x,true,3)
newAge.collect // --> Array(3, 13, 13, 45, 88)
newAge.partitions.size // --> 3
var ageArr = Array((3,1),(43,4),(3,4),(23,5),(23,9))
val age = sc.parallelize(ageArr,3)
val newAge = age.sortBy(x=>x._1,true)
newAge.collect // --> Array((3,1), (3,4), (23,5), (23,9), (43,4))
newAge.partitions.size // --> 3

写入磁盘

json

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - sparkRDD编程


欢迎来到我的小屋