本篇内容主要讲解“flink的Transformation数据处理方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink的Transformation数据处理方法是什么”吧!
Transformation 数据处理
SingleDataStream
Map
val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5))
//方法一
val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1))
//方法二
val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{
override def map(t: (String,Int)): (String,Int) ={
(t._1, t._2+1)
}
})
FlatMap
val dataStream:DataStream[String] = environment.fromCollections()
val resultStream[String] =dataStream.flatMap{str => str.split(" ")}
Filter
//通配符
val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0}
//运算表达式
val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}
KeyBy
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
Reduce
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//实现一:滚动第二个字段进行reduce相加求和
val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)}
//实现二:实现ReduceFunction
val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] {
override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = {
(t1._1, t1._2+ t2._2)
}
})
运行结果为:(c,2)(c,7)(a,3)(d,4)(a,8),结果不是最后求和的值,是将每条记录累加后的结果输出。
Aggregations
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//对第二个字段进行sum统计
val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1)
//输出统计结果
sumStream.print()
//统计计算指定key最小值
val minStream: DataStream[(Int,Int)] = keyedStream.min(1)
//统计计算指定key最大值
val maxStream: DataStream[(Int,Int)] = keyedStream.max(1)
//统计计算指定key最小值,返回最小值对应元素
val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1)
//统计计算指定key最大值,返回最大值对应元素
val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)
MultiDataStream
Unio
//创建不同数据集
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6))
val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1))
//合并两个数据集
val unionStream = dataStream1.union(dataStream2)
//合并多个数据集
val allUnionStream = dataStream1.union(dataStream2,dataStream3)
Connect,CoMap,CoflatMap
该算子为了合并两种或多种不同类型的数据集,合并后会保留原始数据集的数类型。连接操作允许共享状态数据,也就是说在多个数据集之间可以操作和查看对方数据集的状态。
实例:dataStream1数据集为(String,Int)元祖类型,dataStream2数据集为Int类型,通过connect连接将两种类型数据结合在一起,形成格式为ConnectedStream是的数据集,其内部数据为[(String,Int),Int]的混合数据类型,保留两个数据集的数据类型。
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6)
//连接两个数据集
val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{
//定义第一个数据集函数处理逻辑,输入值为第一个DataStream
override def map1(in1: (String,Int)): (Int ,String) = {
(int1._2 , in1._1)
}
//定义第二个数据集函数处理逻辑
override def amp2(in2: Int):(Int,String) = {
(int2,"default")
}
})
val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{
//定义共享变量
var number=0
//定义第一个数据集处理函数
override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = {
collector.collect((in1._1,in1._2,number))
}
//定义第二个数据集处理函数
override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = {
number=in2
}
})
//通过keyby函数根据指定的key连接两个数据集
val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0)
//通过broadcast关联两个数据集
val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())
split
//创建数据集
val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//合并连个DataStream数据集
val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))
Select
//筛选出偶数数据集
val evenStream: DataStream[(String,Int)] = splitedStream.select("even")
//筛选出奇数数据集
val oddStream: DataStream[(String,Int)] = splitedStream.select("odd")
//筛选出偶数和奇数数据集
val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")
Iterate
//创建数据集,map处理为对数据分区根据默认并行度进行平衡
val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t}
val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => {
//定义两个map处理数据集,第一个map反馈操作,第二个map将数据输出到下游
val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2"))
},1000) //超过1000ms没有数据接入终止迭代
物理分区
随机分区(Random Partitioning)
val shuffleStream=dataStream.shuffle
平衡分区(Roundrobin Partitioning)
val shuffleStream= dataStream.rebalance();
Rescaling partitioning
//通过调用DataStream API中rescale()方法实现Rescaling Partitioning操作
val shuffleStream = dataStream.rescale();
广播操作
//通过DataStream API的broadcast() 方法实现广播分区
val shuffleStream= dataStream.broadcast()
自定义分区
Object customPartitioner extends Partitioner[String]{
//获取随机数生成器
val r=scala.util.Random
override def partition(key: String, numPartitions: Int): Int ={
//定义分区策略,key中如果包含a则放入0分区中,其他情况则根据Partitions num随机分区
if(key.contains("flink")) 0 else r.nextInt(numPartitions)
}
}
//通过数据集字段名称指定分区字段
dataStream.partitionCustom(customPartitioner,"filed_name");
//通过数据集字段索引指定分区字段
dataStream.partitionCustom(customPartitioner,0)
到此,相信大家对“flink的Transformation数据处理方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!