这篇文章主要讲解了“flinksql表的查询转换方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flinksql表的查询转换方法”吧!
package com.jd.data;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import scala.Tuple3;
import static org.apache.flink.table.api.Expressions.$;
public class TableAipDemo03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1、创建表执行环节
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String path = "/Users/liuhaijing/Desktop/flinktestword/aaa.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv()) // 定义格式化方法
.withSchema(new Schema().field("a", DataTypes.STRING()) // 定义表的结构
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
.createTemporaryTable("xxx");
// 表的查询与转换
Table xxx = tableEnv.from("xxx");
// 简单查询
Table select = xxx.select("a, b").filter($("a").isEqual("a"));
Table select2 = select.groupBy($("a"))
.select($("a"), $("a").count().as("count"));
select2.printSchema();
// 可撤回的方式 输出结果前面会有一列 boolean true 表示以这次为准, false为作废
tableEnv.toRetractStream(select2, Row.class ).print();
env.execute("job");
}
}
感谢各位的阅读,以上就是“flinksql表的查询转换方法”的内容了,经过本文的学习后,相信大家对flinksql表的查询转换方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!