本篇内容介绍了“Transformation和Action怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成! 一、Transformation算子演示val conf =
new SparkConf().setAppName("Test").setMaster("local") val sc =
new SparkContext(conf)
//通过并行化生成rdd val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))
//map:对rdd里面每一个元乘以2然后排序 val rdd2: RDD[Int] = rdd.map(_ *
2) //collect以数组的形式返回数据集的所有元素(是Action算子) println(rdd2.collect().toBuffer)
//filter:该RDD由经过func函数计算后返回值为true的输入元素组成 val rdd3: RDD[Int] = rdd2.filter(_ >
10) println(rdd3.collect().toBuffer)
val rdd4 = sc.parallelize(Array("a b c","b c d")) //flatMap:将rdd4中的元素进行切分后压平 val rdd5: RDD[String] = rdd4.flatMap(_.split(" ")) println(rdd5.collect().toBuffer) //假如: List(List(" a,b" ,"b c"),List("e c"," i o")) //压平
flatMap(_.flatMap(_.split(" "))) //sample随机抽样 //withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样 //fraction抽样比例例如30%
即0.3
但是这个值是一个浮动的值不准确 //seed用于指定随机数生成器种子 默认参数不传 val rdd5_1 = sc.parallelize(1 to
10) val sample = rdd.sample(false,0.5) println(sample.collect().toBuffer)
//union:求并集 val rdd6 = sc.parallelize(List(5,6,7,8)) val rdd7 = sc.parallelize(List(1,2,5,6)) val rdd8 = rdd6 union rdd7 println(rdd8.collect.toBuffer)
//intersection:求交集 val rdd9 = rdd6 intersection rdd7 println(rdd9.collect.toBuffer)
//distinct:去重出重复 println(rdd8.distinct.collect.toBuffer)
//join相同的key会被合并 val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) val rdd10_3 = rdd10_1 join rdd10_2 println(rdd10_3.collect().toBuffer) //左连接和右连接 //除基准值外是Option类型,因为可能存在空值所以使用Option val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //以左边为基准没有是null val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //以右边为基准没有是null println(rdd10_4.collect().toList) println(rdd10_5.collect().toBuffer)
val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) //笛卡尔积 val rdd11_3 = rdd11_1 cartesian rdd11_2 println(rdd11_3.collect.toBuffer) //根据传入的参数进行分组 val rdd11_5_1 = rdd11_4.groupBy(_._1) println(rdd11_5_1.collect().toList)
//按照相同key进行分组,并且可以制定分区 val rdd11_5_2 = rdd11_4.groupByKey println(rdd11_5_2.collect().toList)
//根据相同key进行分组[分组的话需要二元组] //cogroup
和
groupBykey的区别 //cogroup不需要对数据先进行合并就以进行分组 得到的结果是 同一个key
和不同数据集中的数据集合 //groupByKey是需要先进行合并然后在根据相同key进行分组 val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2 println(rdd11_6) |
二、Action算子演示 val conf =
new SparkConf().setAppName("Test").setMaster("local[*]") val sc =
new SparkContext(conf) /* Action
算子*/ //集合函数 val rdd1 = sc.parallelize(List(2,1,3,6,5),2) val rdd1_1 = rdd1.reduce(_+_) println(rdd1_1) //以数组的形式返回数据集的所有元素 println(rdd1.collect().toBuffer) //返回RDD的元素个数 println(rdd1.count()) //取出对应数量的值 默认降序,
若输入0
会返回一个空数组 println(rdd1.top(3).toBuffer) //顺序取出对应数量的值 println(rdd1.take(3).toBuffer) //顺序取出对应数量的值 默认生序 println(rdd1.takeOrdered(3).toBuffer) //获取第一个值 等价于
take(1) println(rdd1.first()) //将处理过后的数据写成文件(存储在HDFS或本地文件系统) //rdd1.saveAsTextFile("dir/file1") //统计key的个数并生成map k是key名
v是key的个数 val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2) val rdd2_1: collection.Map[String, Long] = rdd2.countByKey() println(rdd2_1) //遍历数据 rdd1.foreach(x =>
println(x))
/*其他算子*/ //统计value的个数 但是会将集合中的一个元素看做是一个vluae val value: collection.Map[(String, Int), Long] = rdd2.countByValue println(value) //filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据 val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括开始和结束的 println(rdd3_1.collect.toList) //flatMapValues对参数进行扁平化操作,是value的值 val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4"))) println( rdd3_2.flatMapValues(_.split(" ")).collect.toList) //foreachPartition
循环的是分区数据 // foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储 val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd4.foreachPartition(x =>
println(x.reduce(_+_))) //keyBy
以传入的函数返回值作为key ,RDD中的元素为value
新的元组 val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3) val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) println(rdd5_1.collect.toList) //keys获取所有的key values
获取所有的values println(rdd5_1.keys.collect.toList) println(rdd5_1.values.collect.toList) //collectAsMap 将需要的二元组转换成Map val map: collection.Map[String, Int] = rdd2.collectAsMap() println(map) |
“Transformation和Action怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!
|