Flink已经拥有了强大的DataStream/DataSetAPI,可以基本满足流计算和批计算中的所有需求。为什么还需要Table&SQLAPI呢?
  首先TableAPI是一种关系型API,类SQL的API,用户可以像操作表一样地操作数据,非常的直观和方便。用户只需要说需要什么东西,系统会自动地帮你决定如何高效地计算它,而不需要像DataStream一样写一大堆Function,优化还得纯靠手工调优。另外,SQL作为一个“人所皆知”的语言,如果一个引擎提供SQL,它将很容易被人们接受。这已经是业界很常见的现象了。值得学习的是,Flink的TableAPI与SQLAPI的实现,有80%的代码是共用的。所以当我们讨论TableAPI时,常常是指Table&SQLAPI。
  Table&SQLAPI还有另一个职责,是流处理和批处理统一的API层。Flink在runtime层是统一的,因为Flink将批任务看做流的一种特例来执行,这也是Flink向外鼓吹的一点。然而在编程模型上,Flink却为批和流提供了两套API(DataSet和DataStream)。为什么runtime统一,而编程模型不统一呢?在我看来,这是本末倒置的事情。用户才不管你runtime层是否统一,用户更关心的是写一套代码。这也是为什么现在ApacheBeam能这么火的原因。所以Table&SQLAPI扛起了统一API的大旗,批上的查询会随着输入数据的结束而结束并生成有限结果集,流上的查询会一直运行并生成结果流。Table&SQLAPI做到了批与流上的查询具有同样的语法,因此不用改代码能同时在批和流上跑。

  聊聊历史
  TableAPI始于Flink0.9,Flink0.9是一个类库百花齐放的版本,众所周知的TableAPI,Gelly,FlinkML都是在这个版本加进去的。Flink0.9大概是在2015年6月正式发布的,在Flink0.9发布之前,社区对SQL展开过好几次争论,不过当时社区认为应该首先完善TableAPI的功能,再去搞SQL,如果两头一起搞很容易什么都做不好。而且在整个Hadoop生态圈中已经有大量的所谓“SQL-on-Hadoop”的解决方案,譬如ApacheHive,ApacheDrill,ApacheImpala。”SQL-on-Flink”的事情也可以像Hadoop一样丢给其他社区去搞。
  不过,随着Flink0.9的发布,意味着抽象语法树、代码生成、运行时函数等都已经成熟,这为SQL的集成铺好了前进道路。另一方面,用户对SQL的呼声越来越高。2015年下半年,Timo大神也加入了dataArtisans,于是对TableAPI的改造开始了。2016年初的时候,改造基本上完成了。我们也是在这个时间点发现了TableAPI的潜力,并加入了社区。经过这一年的努力,Flink已经发展成Apache中火热的项目之一,而Flink中活跃的类库目前非TableAPI莫属。这其中离不开国内公司的支持,TableAPI的贡献者绝大多数都来自于阿里巴巴和华为,并且主导着TableAPI的发展方向,这是非常令国人自豪的。而我在社区贡献了一年后,幸运地成为了FlinkCommitter。
  TableAPI&SQL长什么样?
  这里不会详细介绍TableAPI&SQL的使用,只是做一个展示。更多使用细节方面的问题请访问官网文档。
  下面这个例子展示了如何用TableAPI处理温度传感器数据。计算每天每个以room开头的location的平均温度。例子中涉及了如何使用window,event-time等。
valsensorData:DataStream[(String,Long,Double)]=???
//convertDataSetintoTable
valsensorTable:Table=sensorData
.toTable(tableEnv,'location,'time,'tempF)
//definequeryonTable
valavgTempCTable:Table=sensorTable
.window(Tumbleover1.dayon'rowtimeas'w)
.groupBy('location,'w)
.select('w.startas'day,'location,(('tempF.avg-32)*0.556)as'avgTempC)
.where('locationlike"room%")
  下面的例子是展示了如何用SQL来实现。
valsensorData:DataStream[(String,Long,Double)]=???
//registerDataStream
tableEnv.registerDataStream("sensorData",sensorData,'location,’time,'tempF)
//queryregisteredTable
valavgTempCTable:Table=tableEnv.sql("""
SELECTFLOOR(rowtime()TODAY)ASday,location,
AVG((tempF-32)*0.556)ASavgTempC
FROMsensorData
WHERElocationLIKE'room%'
GROUPBYlocation,FLOOR(rowtime()TODAY)""")
  TableAPI&SQL原理
  Flink非常明智,没有像Spark那样重复造轮子(SparkCatalyst),而是将SQL校验、SQL解析以及SQL优化交给了ApacheCalcite。Calcite在其他很多开源项目里也都应用到了,譬如ApacheHive,ApacheDrill,ApacheKylin,Cascading。Calcite在新的架构中处于核心的地位,如下图所示。

  新的架构中,构建抽象语法树的事情全部交给了Calcite去做。SQLquery会经过Calcite解析器转变成SQL节点树,通过验证后构建成Calcite的抽象语法树(也是图中的LogicalPlan)。另一边,TableAPI上的调用会构建成TableAPI的抽象语法树,并通过Calcite提供的RelBuilder转变成Calcite的抽象语法树。
  以上面的温度计代码为样例,TableAPI和SQL的转换流程如下,绿色的节点代表FlinkTableNodes,蓝色的节点代表CalciteLogicalNodes。终都转化成了相同的LogicalPlan表现形式。

  之后会进入优化器,Calcite会基于优化规则来优化这些LogicalPlan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),另一类是是将LogicalNode转变成FlinkNode的规则。这两类规则的应用体现为下图中的①和②步骤,这两步骤都属于Calcite的优化阶段。得到的DataStreamPlan封装了如何将节点翻译成对应DataStream/DataSet程序的逻辑。步骤③是将不同的DataStream/DataSetNode通过代码生成(CodeGen)翻译成终可执行的DataStream/DataSet程序。

  代码生成是TableAPI&SQL中核心的一块内容。表达式、条件、内置函数等等是需要CodeGen出具体的Function代码的,这部分跟SparkSQL的结构很相似。CodeGen出的Function以字符串的形式存在。在提交任务后会分发到各个TaskManager中运行,在运行时会使用Janino编译器编译代码后运行。
  TableAPI&SQL现状
  目前TableAPI对于批和流都已经支持了基本的Selection,Projection,Union,以及Window操作(包括固定窗口、滑动窗口、会话窗口)。SQL的话由于Calcite在近的版本中才支持Window语法,所以目前FlinkSQL还不支持Window的语法。并且TableAPI和SQL都支持了UDF,UDTF,UDAF(开发中)。
  TableAPI&SQL未来
  1、DynamicTables

  DynamicTable是传统意义上的表,只不过表中的数据是会变化更新的。Flink提出Stream<–>DynamicTable之间是可以等价转换的。不过这需要引入Retraction机制。有机会的话,我会专门写一篇文章来介绍。
  2、Joins
  包括了支持流与流的Join,以及流与表的Join。
  3、SQL客户端
  目前SQL是需要内嵌到Java/Scala代码中运行的,不是纯SQL的使用方式。未来需要支持SQL客户端执行提交SQL纯文本运行任务。
  4、并行度设置
  目前TableAPI&SQL是无法设置并行度的,这使得TableAPI看起来仍像个玩具。
  在我看来,Flink的Table&SQLAPI是走在时代前沿的,在很多方面在做着定义业界标准的事情,比如SQL上Window的表达,时间语义的表达,流和批语义的统一等。在我看来,SQL拥有更天然的流与批统一的特性,并且能够自动帮用户做很多SQL优化(下推、剪枝等),这是Beam所做不到的地方。当然,未来如果Table&SQLAPI发展成熟的话,剥离出来作为业界标准的流与批统一的API也不是不可能(叫BeamTable,BeamSQL?),哈哈。这也是我非常看好Table&SQLAPI,认为其大有潜力的一个原因。当然目前来说,需要走的路还很长,TableAPI现在还只是个玩具。