Flink学习(四) Flink Table & SQL 实现wordcount Java版本
Flink Table & SQL WordCount
Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
一个完整的 Flink SQL 编写的程序包括如下三部分。
Source Operator:是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现,比如 MySQL、Kafka 等。
Transformation Operators:算子操作主要完成比如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作。
Sink Operator:是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等。
我们也是通过用一个最经典的 WordCount 程序作为入门,上面已经通过 DataSet/DataStream API 开发,那么实现同样的 WordCount 功能, Flink Table & SQL 核心只需要一行代码:
//省略掉初始化环境等公共代码 SELECT word, COUNT(word) FROM table GROUP BY word;
首先,整个工程中我们 pom 中的依赖如下图所示:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11 <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.10.0</version> </dependency>
第一步,创建上下文环境:
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
第二步,读取一行模拟数据作为输入:
String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for(String word : split){
    WC wc = new WC(word,1);
    list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);第三步,注册成表,执行 SQL,然后输出:
//DataSet 转sql, 指定字段名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
//注册为一个表
fbTableEnv.createTemporaryView("WordCount", table);
Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
//将表转换DataSet
DataSet<WC> ds3  = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();整体代码结构如下:
package wyh.tableApi;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;import java.util.ArrayList;public class WCTableApi {    public static void main(String[] args) {        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);        String words="hello flink hello shujia";        String[] split = words.split("\\W+");        ArrayList<WC> list = new ArrayList<>();        for (String word : split) {            WC wc = new WC(word, 1L);            list.add(wc);        }//        DataSet<WC> input = fbEnv.fromCollection(list);        DataSource<WC> input = fbEnv.fromCollection(list);        Table table = fbTableEnv.fromDataSet(input, "word,frequency");        table.printSchema();        fbTableEnv.createTemporaryView("wordcount",table);        Table table1 = fbTableEnv.sqlQuery("select word,sum(frequency) as frequency from wordcount group by word");        DataSet<WC> ds3 = fbTableEnv.toDataSet(table1, WC.class);        try {            ds3.printToErr();        } catch (Exception e) {            e.printStackTrace();        }    }    public static class WC{        public String word;        public Long frequency;        public WC() {        }        public WC(String word, Long frequency) {            this.word = word;            this.frequency = frequency;        }        @Override        public String toString() {            return "WC{" +                    "word=‘" + word + ‘\‘‘ +                    ", frequency=" + frequency +                    ‘}‘;        }    }}
 我们直接运行该程序,在控制台可以看到输出结果:

相关推荐
  xiaoyutongxue    2020-05-27  
   raidtest    2020-10-09  
   匆匆那些年    2020-06-27  
   oXiaoChong    2020-06-20  
   yuchuanchen    2020-06-16  
   Spark高级玩法    2020-06-14  
   Leonwey    2020-06-11  
   Spark高级玩法    2020-06-09  
   文报    2020-06-09  
   xorxos    2020-06-07  
   yuchuanchen    2020-05-27  
   阿尼古    2020-05-26  
   千慧    2020-05-18  
   yuchuanchen    2020-05-17  
   yuchuanchen    2020-05-16  
   Spark高级玩法    2020-05-11  
   yuchuanchen    2020-05-11