spark RDD编程
spark-shell2.x版本中,默认将SparkContext类实例化为了sc,将SparkSession实例化为了spark。使用时直接调用sc和spark即可。
创建RDD
1. 从内存中创建
在spark中,可以使用parallelize方法和makeRDD两种方法从内存中创建RDD。
parallelize方法是将seq(即序列)转化为RDD
makeRDD方法是从已有的RDD创建为新的RDD
(1) parallelize
parallelize(seq, num)
该方法可以将seq集合转化为RDD。
参数解释:
- seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数)
- num表示分区数。不设分区,则默认为该application分配到的资源的CPU数。
格式:
val 常量名 = sc.parallelize(seq, num);
示例:
var datas = Array(1,2,3,4,5);
val lx = sc.parallelize(datas);
println(lx.partitions.size) // --> 8
var datas = Array(1,2,3,4,5);
val lx = sc.parallelize(datas,3);
println(lx.partitions.size) // --> 3
提示:使用rdd.partitions.size
方法可以输出rdd所用分区数。
注意:这里第一次输出8,是因为分配了cpu的8个核心资源。
(2)makeRDD
makeRDD(seq)
该方法是从已有的RDD创建为新的RDD。
该方法创建的RDD的分区数取决于集合元素。
参数解释:
- seq表示要转化集合。(seq是一种序列。所谓序列是指一类具有一定长度的可迭代访问的对象,其中每一个元素均带一个固定索引,该索引从0开始计数)
格式:
val 常量名 = sc.makeRDD(seq)
示例:
var datas = Array(1,2,3,4,5);
val lx = sc.makeRDD(datas);
从外存中创建
在spark中,使用textFile
方法可以从外存中读取数据集。该方法支持多种类型的数据集,如目录、文本文件、压缩文件、和通配符匹配的文件等,并且允许设定分区个数。
textFile(path, num)
参数解释:
- path就是外存文件的获取路径。默认使用hdfs协议,访问本地时需要使用file协议。
- num即分区数。
(1)从HDFS文件创建RDD
这种方法最常用。textFile默认读取HDFS中的文件。
格式:
val 常量名 = sc.textFile("文件路径")
val 常量名 = sc.textFile("hdfs://文件路径")
示例:
val tests = sc.textFile("/lxyd/test.txt")
val tests = sc.textFile("hdfs://主节点:端口号/lxyd/test.txt")
(2)从本地创建RDD
同样通过textFile方法读取文件。
格式:
val 常量名 = sc.textFile("file://文件路径")
示例:
val tests = sc.textFile("file:///lxyd/test.txt")
转换
spark提供了大量的转换RDD用的方法。
map
map(fun)
该方法通过参数中的函数将RDD中的各个元素处理后,返回一个新的RDD。
参数解释:
- fun是一个函数表达式。map会将调用map方法的RDD中的元素一一作为参数传递给fun,fun返回的值又会被map组合为一个新的RDD返回。
格式:
val 常量名 = rdd.map(fun)
示例:
val ageArr = Array(12,13,13,15)
val age = sc.parallelize(ageArr)
val newAge = age.map(x=>x+1)
newAge.collect // --> (12,14,14,16)
flatMap
flatMap(fun)
该方法与map方法类似。将元素处理,然后再转换为同级别,最后组成一个新的RDD返回。
参数解释:
- fun是一个函数表达式。flatMap会将调用采用类似map的方法将RDD中的元素一一作为参数传递给fun,fun返回的值又会被flatMap转换为同级,组合为一个新的RDD返回。
格式:
val 常量名 = rdd.flatMap(fun)
示例:(采用map于其做比较)
var wordsArr = Array("holle world", "I is test", "我只是 一个 测试")
val words = sc.parallelize(wordsArr)
val newWords = words.map(x=>x.split(" "))
newWords.collect // -->Array(Array(holle, world), Array(I, is, test), Array(我只是, 一个, 测试))
var wordsArr = Array("holle world", "I is test", "我只是 一个 测试")
val words = sc.parallelize(wordsArr)
val newWords = words.flatMap(x=>x.split(" "))
newWords.collect // -->Array(holle, world, I, is, test, 我只是, 一个, 测试)
filter
filter(fun)
该方法用于过滤RDD中不需要的元素。
参数解释:
- fun是一个函数表达式。filter会将调用该方法的RDD的元素一一作为参数传入fun,符合fun函数条件的元素会被处理后返回给filter,filter会将最后留下的元素组合为一个新的RDD并返回。
格式:
val 常量名 = rdd.filter(fun)
示例:
var ageArr = Array(12,13,14,14,51,15)
val age = sc.parallelize(ageArr)
val newAge = age.filter(x=>x<20)
newAge.collect // -->Array(12, 13, 14, 14, 15)
distinct
distinct()
该方法可以去除RDD中的重复值,并返回一个新的RDD
格式:
val 常量名 = rdd.distinct()
示例:
var actorArr = Array("kirito", "kirito", "asina", "asina")
val actor = sc.parallelize(actorArr)
val newActor = actor.distinct()
newActor.collect // -->Array(kirito, asina)
union
union(RDD)
该方法用于合并多个RDD,并返回一个新的RDD。
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
格式:
val 常量名 = rdd.union(rdd)
示例:
var ageoArr = Array(12,13,14)
var agetArr = Array(15,16,17)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.union(aget)
newAge.collect // --> Array(12, 13, 14, 15, 16, 17)
intersection
intersection(RDD)
该方法用于获取两个RDD之间相同的元素,并返回一个新的RDD
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
格式:
val 常量名 = rdd.intersection(rdd)
示例:
var ageoArr = Array(12,14,15,13,12,12)
var agetArr = Array(12,12,15)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.intersection(aget)
newAge.collect // --> Array(12,15)
subtract
subtract(RDD)
该方法的调用,调用该方法的RDD会被删去已在参数中RDD中出现过的元素,并将剩下的元素组成新的RDD返回
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
格式:
val 常量名 = rdd.subtract(rdd)
示例:
var ageoArr = Array(12,13,14)
var agetArr = Array(12,15,16)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAgeo = ageo.subtract(aget)
newAgeo.collect // --> Array(13,14)
val newAget = aget.subtract(ageo)
newAget.collect // --> Array(15,16)
cartesian
cartesian(RDD)
该方法会将两个RDD中元素组合为笛卡尔积,并组成新的RDD返回。
假设RDDA有4个元素,RDDB有4个元素,RDDA中的每一个元素都会与RDDB中的每个元素组成一组,最后会返回16个元素。
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
格式:
val 常量名 = rdd.cartesian(rdd)
示例:
var ageoArr = Array(1,2,3,2)
var agetArr = Array(1,5,7,9)
val ageo = sc.parallelize(ageoArr)
val aget = sc.parallelize(agetArr)
val newAge = ageo.cartesian(aget)
newAge.collect // -->Array((1,1), (1,5), (1,7), (1,9), (2,1), (2,5), (2,7), (2,9), (3,1), (3,5), (3,7), (3,9), (2,1), (2,5), (2,7), (2,9))
reduce
reduce(fun)
该方法会将RDD中的元素一一取出,进行处理,并直接返回结果。
参数解释:
- fun表示函数表达式。默认有两个参数,用于处理并返回。
格式:
val 常量名 = rdd.reduce(fun)
示例:
var ageArr = Array(1,2,3,4,5,6,7)
val age = sc.parallelize(ageArr)
val newAge = age.reduce((a,b)=>a*b)
println(newAge) // --> 5040
reduceByKey
reduceByKey(fun)
当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值传入fun函数中,fun处理完毕后,将新的元素组合为RDD并返回。
参数解释:
- fun表示函数表达式。默认有两个参数,用于处理并返回。
格式:
val 常量名 = rdd.reduceByKey(fun)
示例:
var actorArr = Array(("kirito",11),("kirito",11),("asina",12),("asina",14))
val actor = sc.parallelize(actorArr)
val newActor = actor.reduceByKey((a,b)=>a*b)
newActor.collect // --> Array((kirito,121), (asina,168))
groupByKey
groupByKey()
当转化为RDD的数据集为键值对形式的时候,该方法会将相同键的元素的值划分为一组(以Iterable的方式存放),并组合为新的RDD返回。
格式:
val 常量名 = rdd.groupByKey()
示例:
var actorArr = Array(("kirito",1),("kirito",12),("asina",43),("asina",32))
val actor = sc.parallelize(actorArr)
val newActor = actor.groupByKey()
newActor.collect // --> Array((kirito,CompactBuffer(1, 12)), (asina,CompactBuffer(43, 32)))
join
join(RDD)
当转化为RDD的数据集为键值对形式的时候,该方法会提取两个RDD键相同的值组合在一起,组合成一个新的RDD并返回。
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
格式:
val 常量名 = rdd.join(rdd)
示例:
var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLx= lxo.join(lxt)
newLx.collect // --> Array(("a",(1,2)))
leftOuterJoin
RDDA leftOuterJoin RDDB
对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD1的其他键值对一起组合成一个新的RDD并返回。
格式:
val 常量名 = rdd1 leftOuterJoin rdd2
示例:
var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLxo = lxo leftOuterJoin lxt
newLxo.collect // --> Array((a,(1,Some(2))), (b,(2,None)), (c,(3,None)))
val newLxt = lxt leftOuterJoin lxo
newLxt.collect // --> Array((a,(2,Some(1))), (d,(2,None)), (e,(3,None)))
rightOuterJoin
RDDA rightOuterJoin RDDB
对两个RDD进行左连接。设RDD1对RDD2进行左连接,则将两个RDD键相同的值组合在一起,连同RDD2的其他键值对一起组合成一个新的RDD并返回。
格式:
val 常量名 = rdd rightOuterJoin rdd
示例:
var lxoArr = Array(("a",1),("b",2),("c",3))
var lxtArr = Array(("a",2),("d",2),("e",3))
val lxo = sc.parallelize(lxoArr)
val lxt = sc.parallelize(lxtArr)
val newLxo = lxo rightOuterJoin lxt
newLxo.collect // -->Array((a,(Some(1),2)), (d,(None,2)), (e,(None,3)))
val newLxt = lxt rightOuterJoin lxo
newLxt.collect // -->Array((a,(Some(2),1)), (b,(None,2)), (c,(None,3)))
zip
zip(RDD)
该方法可以将两个长度一致的RDD压缩到一起,以键值对的形式组合为一个新的RDD并返回。
参数解释:
- RDD即直接填入一个RDD即可。(一次只能填入一个)
注意:
- 该方法的两个rdd长度必须一致
格式:
val 常量名 = rdd.zip(rdd)
示例:
val nameArr = Array("a","b","c","d","e")
val ageArr = Array(1,2,3,4,5)
val name = sc.parallelize(nameArr)
val age = sc.parallelize(ageArr)
val actor = name.zip(age)
actor.collect // --> Array((a,1), (b,2), (c,3), (d,4), (e,5))
combineByKey
combineByKey(createCombiner,mergeValue,mergeCombiners)
该方法用于将相同键的数据聚合,并允许返回类型与输入数据不同的返回值。
参数解释:
- createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
- mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)(如果分区太多,每个分区只有一个参数就不会执行该操作)
- mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
注意:
这玩意儿一定要注意分区,如果分区数高于元素数量,则会直接执行mergeCombiners。
格式:
val 常量名 = actor.combineByKey(
V=>C,
(获取C的变量名:Int,获取V的变量名:Int)=>C,
(获取新C值的变量名:Int,获取C值的变量名:Int)=>C
)
示例:
val nameArr = Array("a","a","a","a","c","c")
val ageArr = Array(1,2,3,4,5,6)
val name = sc.parallelize(nameArr)
val age = sc.parallelize(ageArr)
val actor = name.zip(age)
val newActor = actor.combineByKey(
x=>{println(s"x是$x,x+1=${x+1}");x+1},
(x:Int,acc:Int)=>{println(s"$acc + $x");acc+x},
(x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}
)
newActor.collect
// --> x是1,x+1=2
// --> x是3,x+1=4
// --> x是6,x+1=7
// --> x是2,x+1=3
// --> x是4,x+1=5
// --> x是5,x+1=6
// --> 3 - 2
// --> 7 - 6
// --> 4 - 1
// --> 5 - 3
// --> Array[(String, Int)] = Array((a,2), (c,1))
val nameArr = Array("a","a","a","a","c","c")
val ageArr = Array(1,2,3,4,5,6)
val name = sc.parallelize(nameArr,3)
val age = sc.parallelize(ageArr,3)
val actor = name.zip(age)
val newActor = actor.combineByKey(
x=>{println(s"x是$x,x+1=${x+1}");x+1},
(x:Int,acc:Int)=>{println(s"$acc + $x");acc+x},
(x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}
)
newActor.collect
// --> x是5,x+1=6
// --> x是3,x+1=4
// --> x是1,x+1=2
// --> 6 + 6
// --> 4 + 4
// --> 2 + 2
// --> 8 - 4
// --> Array[(String, Int)] = Array((c,12), (a,4))
代码解释:
这里被分为三个分区。第一个分区是(a,1)(a,2)第二个分区是(a,3),(a,4)。第一个分区执行的时候先传入(a,1),然后执行x=>{println(s"x是$x,x+1=${x+1}");x+1}
这个加1的操作,变成了2,这里只会执行一次。然后会执行第二条(x:Int,acc:Int)=>{println(s"$acc + $x");acc+x}
,这里会把上一条的2和(a,2)里的2一起作为参数传进来,变成4。然后这个分区没有别的元素了,所以就直接返回出来。然后是第二个分区也执行一遍相同的操作。所以一号分区的值是4,二号分区的值是8。接着就会执行第三条也就是分区之间的操作(x:Int,acc:Int)=>{println(s"$acc - $x");acc-x}
,8-4,得到最终结果4。
查询
collect
collect()
该方法是一个动作操作,会把RDD所有元素转换为数组并返回到Driver端,适用于小数据处理。
格式:
var 常量名 = rdd.collect
示例:
var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
var a = age.collect
println(a(2)) // --> 3
take
take(num)
该方法可以获取RDD的前num个元素,并返回数组
参数解释:
- num表示需要返回的元素个数。(超出范围只按最大范围算)
格式:
var 常量名 = rdd.take(num)
示例:
var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
val a = age.take(3) // --> Array(1,2,3)
for(b:Int<-a){
println(b)
} // --> 1 2 3
var ageArr = Array(1,2,3,4,5)
val age = sc.parallelize(ageArr)
val a = age.take(6) // --> Array(1,2,3)
for(b:Int<-a){
println(b)
} // --> 1 2 3 4 5
lookup
lookup(args)
该方法用于获取键值对类型RDD中键匹配的所有值。
参数解释:
- args表示键值对中的键。
格式:
var 变量名 = rdd.lookup(args)
示例:
var actorArr = Array(("a",1),("b",2),("a",3),("c",4))
val actor = sc.parallelize(actorArr)
val a = actor.lookup("a") // -->WrappedArray(1, 3)
println(a)
for(b:Int<-a){
println(b)
} // --> 1 3
排序
sortBy
sortBy(f(T)=>K,ascending,numPartitions)
该方法可以对标准RDD进行排序。
参数解释:
- f(T)=>k,f(T)表示要被排序对象的每一个元素,右边则是返回元素中要进行排序的值。
- ascending,表示排序的顺序,参数为true或是flase,默认是true即升序。
- numPartitions,表示排序后的RDD的分区个数,默认排序分区后的分区个数和排序之前的个数相同,即为this.partitions.size
格式:
val 常量名 = rdd.sortBy(f(T)=>k,ascending,numPartitions)
示例:
var ageArr = Array(3,45,13,88,13)
val age = sc.parallelize(ageArr)
val newAge = age.sortBy(x=>x,true,3)
newAge.collect // --> Array(3, 13, 13, 45, 88)
newAge.partitions.size // --> 3
var ageArr = Array((3,1),(43,4),(3,4),(23,5),(23,9))
val age = sc.parallelize(ageArr,3)
val newAge = age.sortBy(x=>x._1,true)
newAge.collect // --> Array((3,1), (3,4), (23,5), (23,9), (43,4))
newAge.partitions.size // --> 3
Comments | NOTHING