本文共 3519 字,大约阅读时间需要 11 分钟。
本文为您介绍Flink SQL支持的Event Time和Processing Time数据类型,以及watermark和计算列。
EventTime也称为rowtime。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成 EventTime。目前只支持将 Timestamp 类型(将来会支持 Long 类型)声明成 rowtime 字段。如果不是 Timestamp 类型,需要借助计算列,基于现有列构造出一个 Timestamp 列。
但由于数据本身会有乱序,加之网络抖动或其它原因,rowtime 到达的顺序和被处理的顺序可能是不一致的(乱序)。因此定义一个rowtime字段,需要显示地定义一个 Watermark计算方法。
Watremark
Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,Watermark的定义是source表DDL定义的一部分。Flink提供了如下语法定义Watermark。
WATERMARK [watermarkName] FORAS withOffset( , offset)
watermarkName
标识Watermark的名字,可选。<rowtime_field>
必须是表中已定义的一列(当前仅支持为Timestamp
类型),含义是基于该列生成Watermark,并且标识该列为Event Time列,可以在后续query中用来定义窗口。withOffset
是目前提供的Watermark的生成策略,是根据<rowtime_field> - offset
生成Watermark的值。withOffset的第一个参数必须是<rowtime_field>
。offset
单位为毫秒,含义为Watermark值与event time值的偏移量。通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个字段为1501750584000(2017-08-03 08:56:24.000)
。定义一个基于该rowtime列,策略为偏移4秒的Watermark,示例如下。
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
在这种情况下,这条数据的Watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)
。这条数据的Watermark时间含义即:timestamp小于1501750580000(2017-08-03 08:56:20.000)
的数据,都已经到达了。
CREATE TABLE tt_stream( a varchar, b varchar, c timeStamp, WATERMARK wk1 FOR c as withOffset(c, 1000)) with ( type = 'SLS', topic = 'blink_tt2tt_test', accessId = '0622174XXXXXXTS', accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b');CREATE TABLE rds_output( id varchar, c TIMESTAMP, f TIMESTAMP, cnt BIGINT) with ( type = 'rds', url = 'jdbc:mysql://XXXXXXXX3306/test', tableName = 'datahub2rds', userName = 'XXXXXt', password = '1XXXXX');INSERT INTO rds_outputSELECT a AS id, SESSION_START(c, INTERVAL '1' SECOND) AS c, CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f, COUNT(a) AS cntFROM tt_streamGROUP BY SESSION(c, INTERVAL '1' SECOND), a
由于目前Watermark的rowtime列,只支持Timestamp类型(未来会支持Long类型),如果不是Timestamp类型,就需要借助计算列 ,基于现有列构造出一个Timestamp列。计算列的表达式非常灵活,可以使用任意表达式、内置函数、或是自定义函数,灵活度与 SELECT中的表达式一样。计算列在Flink SQ中可以像普通字段一样被使用。
<computed_column_definition> ::= column_name AS computed_column_expression
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts AS to_timestamp(c), WATERMARK FOR ts AS withOffset(ts, 1000)) with ( type = 'sls', ...);
如上示例中所示,源表数据中的字段c包含时间信息,但是是字符串类型。使用TO_TIMESTAMP内置函数将字符串转成了Timestamp类型,并用该计算列作为Watermark 的rowtime字段。
processing time是系统产生的,不在您的原始数据中,需要显式的定义一个processing time列。
filedName as PROCTIME()
这个定义需要在source的DDL中显式指明,示例如下:
CREATE TABLE tt_stream ( a varchar, b varchar, c BIGINT, d AS PROCTIME()) with ( type = 'tt', ... ); CREATE TABLE rds_output ( id varchar, c TIMESTAMP, f TIMESTAMP, cnt BIGINT ) with ( type = 'rds', ... );INSERT INTO rds_output SELECT a AS id, SESSION_START(d, INTERVAL '1' SECOND) AS c, SESSION_END(d, INTERVAL '1' SECOND) AS f, COUNT(a) AS cnt FROM tt_stream GROUP BY SESSION(d, INTERVAL '1' SECOND), a
本文转自实时计算——
转载地址:http://bvpja.baihongyu.com/