本篇内容主要讲解“flink1.2版本时间、水位线的介绍和用法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink1.2版本时间、水位线的介绍和用法”吧!
水位线
种类
顺序事件中的Watermarks
乱序事件中的Watermarks
并行数据流中的Watermarks
时间概念
Event Time
watermark
// 1、创建flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 设置并行度
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //处理模式设定:流或批
// 生成 watermark 的时间间隔(每 n 毫秒),设置周期性的产生水位线的时间间隔。当数据流很大的时候,如果每个事件都产生水位线,会影响性能。
//env.getConfig().setAutoWatermarkInterval(1000); // 自动水印时间间隔 12版本不用设置,有默认
指定Timestamps
SingleOutputStreamOperator<Tuple3<String,String, Integer>> formatData =text.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
// 数据格式转换
private static final long serialVersionUID = 1L;
@Override
public Tuple3<String, String, Integer> map(String value) throws Exception {
Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
String[] dataTmp = value.split("\\|");
data.f0 = dataTmp[0];
data.f1 = dataTmp[1];
data.f2 = Integer.parseInt(dataTmp[2]);
return data;
}
});
SingleOutputStreamOperator<Tuple3<String,String, Integer>> orderDSWithWatemark=formatData
.assignTimestampsAndWatermarks( // 设置watermark watemark = 最大事件时间 - 最大延迟或乱序时间
WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定maxOutOfOrderness最大无序度时间即最大延迟时间/乱序时间
.withTimestampAssigner((data,timestamp) -> Long.parseLong(DateUtil.dateToUTC(data.f0))*1000) //时间为毫秒级
);
SingleOutputStreamOperator<Tuple3<String,String, Integer>> result=orderDSWithWatemark.keyBy(one -> one.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设定窗口大小
// .allowedLateness(Time.seconds(1)) //延时处理时间
// .sideOutputLateData(lateOutputTag) //侧输出
.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { // 处理逻辑
private static final long serialVersionUID = -6695049408336015245L;
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1,
Tuple3<String, String, Integer> value2) throws Exception {
Tuple3<String, String, Integer> data = new Tuple3<String, String, Integer>();
data.f0 = value2.f0;
data.f1 = value1.f1;
data.f2 = value1.f2 + value2.f2;
System.out.println(data);
return data;
}
});
result.print("滚动事件时间");
env.execute();
总结
到此,相信大家对“flink1.2版本时间、水位线的介绍和用法”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!