前言

在使用FlinkSQL考虑到做指标统计,心中疑问好多:
状态能恢复吗?
状态数据越来越大怎么办,怎么去清除之前数据?
我怎么按天去统计数据?
如果遇到主表要join很早的数据怎么办?……
心中想到这些问题,一万个xxx而过。所以需要做个热身准备,不然真的不敢上手开发业务。
1. 验证状态数据恢复
2. 验证状态的清除策略与时效
3. 验证按天计算数据
4. 遇到Join表怎么办?(其他方案替代)
5. 「附带验证过程的」Bug记录
如果其他疑点需要验证或者调研的,可以评论或者留言给我。欢迎转发给需要的朋友……

Flink环境配置

def getTable(): StreamTableEnvironment ={
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)
bsEnv.getCheckpointConfig.setCheckpointTimeout(60000)
bsEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
bsEnv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
bsEnv.setStateBackend(new EmbeddedRocksDBStateBackend)
val dir =”file:///D:/hdfs/ bsEnv.getCheckpointConfig.setCheckpointStorage(dir)
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv
}

主体逻辑

从source kafka 到 sink kafka:做简单的pv和uv的
val source =
s”””
|CREATE TABLE pageviews (
| user_id BIGINT,
| page_id BIGINT,
| user_region STRING
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.bootstrap.servers’ = ‘devcdh1:9092,devcdh2:9092,devcdh3:9092’,
| ‘format’ = ‘json’
|)
“””.stripMargin

val sink =
s”””
|CREATE TABLE pageviews_per_region (
| user_region STRING,
| pv BIGINT,
| uv BIGINT,
| PRIMARY KEY (user_region) NOT ENFORCED
|) WITH (
| ‘connector’ = ‘upsert-kafka’,
| ‘topic’ = ‘test’,
| ‘properties.bootstrap.servers’ = ‘xxxx’,
| ‘properties.group.id’ = ‘test’,
| ‘key.format’ = ‘json’,
| ‘value.format’ = ‘json’
|)
“””.stripMargin
tableEnv.executeSql(source)
tableEnv.executeSql(sink)
val insert =
s”””
|INSERT INTO pageviews_per_region
|SELECT
| user_region,
| COUNT(*),
| COUNT(DISTINCT user_id)
|FROM pageviews
|GROUP BY user_region
“””.stripMargin
tableEnv.executeSql(insert)

1. 验证状态数据恢复

往期对状态的详细解读,这里不再细说:
Flink的Checkpoint机制详解
Flink的一致性保证
source数据
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U003”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U003”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U003”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U003”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U003”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
>{“user_id”:1002,”page_id”:1002,”user_region”:”U002”}
sink结果
kill前的{“user_region”:”U002”,”pv”:1,”uv”:1}
{“user_region”:”U002”,”pv”:2,”uv”:1}
{“user_region”:”U002”,”pv”:3,”uv”:1}
{“user_region”:”U002”,”pv”:1,”uv”:1}
{“user_region”:”U002”,”pv”:2,”uv”:1}
{“user_region”:”U003”,”pv”:1,”uv”:1}
{“user_region”:”U003”,”pv”:2,”uv”:1}
{“user_region”:”U003”,”pv”:3,”uv”:1}
{“user_region”:”U003”,”pv”:4,”uv”:1}
{“user_region”:”U003”,”pv”:5,”uv”:1}
{“user_region”:”U002”,”pv”:3,”uv”:1}
恢复后的
{“user_region”:”U002”,”pv”:4,”uv”:1}
{“user_region”:”U002”,”pv”:5,”uv”:1}

操作

第一次启动flink run -m yarn-cluster -ys 1 -p 1 -ynm flink-sql /data/flink_jars/indicator-sql.jar
kill后的再启动
flink run -s hdfs://nameservice//user/flink/checkpoint/8d80fe1615666f65342b2348d3c39044/chk-18/_metadata -m yarn-cluster -ys 1 -p 1 -ynm flink-sql /data/flink_jars/indicator-sql.jar

2. 验证状态的清除(如何限制state)

「说明」:实时计算不断地去充实内存保存状态,达到一定量的时候肯定撑不住,现需要定期去清除状态。
TableEnvironmentTableConfig tConfig = tableEnv.getConfig();

tConfig.setIdleStateRetentionTime(Time.hours(5), Time.hours(10));
对setIdleStateRetentionTime两个参数的理解
这两个参数的含义:
minIdleStateRetentionTime: key被移除前state最少的空闲时间;
maxIdleStateRetentionTime:key被移除前state最长的空闲时间。


第一次:我们假设在时间为0的时候来了一条数据,那么会根据这个数据的key以及时当前系统时间为其注册一个Timer,这个Timer是用来清除state的,那么注册的Timer的时间是什么呢?
long cleanupTime = currentTime + maxRetentionTime;
其实Timer的时间就是 0 + 10 =10,也就是在10min后会回调相应的方法,这个方法主要就是用来清除state的。


第二次:在时间为2的时候来了第二条数据,这个数据的key与第一个一样,那么会操作同一个state,那么这个数据是否会去注册Timer吗?
答案是肯定会去注册Timer,只不过引入了一个if条件,只有满足这个条件才会去把之前的Timer删掉,然后重新注册一个Timer,否则维持之前的Timer不变。
(currentTime + minRetentionTime) > curCleanupTime
currentTime = 2
minRetentionTime = 5
curCleanupTime = 10
curCleanupTime我们可以理解为当前这个key对应的Timer的时间,也就是第一条数据注册的Timer的时间 10,因为2 + 5 > 10 条件不成立,所以不会对这个key的Timer做任何操作。


第三次:在时间为10的时候来了第三条数据,并且key与前两个数据一样,我们看一下此时条件是否成立吗?
currentTime = 10
minRetentionTime = 5
curCleanupTime = 10
那么10 +5 > 10 成立,会对这个key的Timer进行更新,把之前的10的Timer删掉,然后重新注册一个Timer时间是
currentTime = 10
maxRetentionTime = 10
所以最新的Timer时间 = 10 + 10= 20


第四次:继续,第四条数据在当前时间是12到来,12 + 5 > 20不成立,不去做任和Timer的更新操作。


第五次:继续,第五条相同key的数据到来,此时时间是16,但是在20的时候这个key的Timer执行了回调,进行了state的清除操作,所以这条数据已经找不到之前的state了,相当于重新开始,创建state,然后去注册Timer,Timer的时间是20 + 10 = 30。


对上面几次小结:通过上面可以发现,第3条数据到来时间是10,第四条数据到来时间是12,然后20min时候state清除,所以这个state的空闲时间是 20 - 12 =8,我们可以说这个state空闲了8,然后被删除,完全符合最少5,最长10的定义。
我们假设第4条数据没来过,那么第3条数据到来时间是10,然后20时候state被清除,期间一直无这个key的数据,所以state存活时间是10,同样完全符合我们定义的最小时间5以及最大时间10的范围。


「源码」
public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
if (maxTime.toMilliseconds() - minTime.toMilliseconds() >= 300000L || maxTime.toMilliseconds() == 0L && minTime.toMilliseconds() == 0L) {
this.minIdleStateRetentionTime = minTime.toMilliseconds();
this.maxIdleStateRetentionTime = maxTime.toMilliseconds();
} else {
throw new IllegalArgumentException(“Difference between minTime: “ + minTime.toString() + “ and maxTime: “ + maxTime.toString() + “shoud be at least 5 minutes.”);
}
}

public interface CleanupState {
default void registerProcessingCleanupTimer(
ValueState cleanupTimeState,
long currentTime,
long minRetentionTime,
long maxRetentionTime,
TimerService timerService)
throws Exception {
// last registered timer Long curCleanupTime = cleanupTimeState.value();

// check if a cleanup timer is registered and // that the current cleanup timer won’t delete state we need to keep if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
// we need to register a new (later) timer long cleanupTime = currentTime + maxRetentionTime;
// register timer and remember clean-up time timerService.registerProcessingTimeTimer(cleanupTime);
// delete expired timer if (curCleanupTime != null) {
timerService.deleteProcessingTimeTimer(curCleanupTime);
}
cleanupTimeState.update(cleanupTime);
}
}
}


「总结」
对于上面,最好的理解是:
「最长体现在」: 如果在0的时间进来一条数据,然后10内没有数据进来,这个时候会触发清除操作,也就是所谓的key被移除前state最长的空闲时间。
「最短体现在」:第一次进来数据时间是5.1(刚过5),5+5.1>10成立,Timer的时间更新为15.1,这个时候闲置5min+,即5.1+5+5+ >15.1 然后被清除,也就是所谓的key被移除前state最短的空闲时间。


验证

setIdleStateRetentionTime 方法在此版本已经过时,新版本已经没有maxIdleStateRetentionTime,变为取决于minIdleStateRetentionTime
@Deprecated public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
if (maxTime.toMilliseconds() - minTime.toMilliseconds() >= 300000L || maxTime.toMilliseconds() == 0L && minTime.toMilliseconds() == 0L) {
this.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds()));
} else {
throw new IllegalArgumentException(“Difference between minTime: “ + minTime.toString() + “ and maxTime: “ + maxTime.toString() + “ should be at least 5 minutes.”);
}
}

@Deprecated public long getMaxIdleStateRetentionTime() {
return this.getMinIdleStateRetentionTime() 3L / 2L;
}

public void setIdleStateRetention(Duration duration) {
this.configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
}

所以,用setIdleStateRetention方法即可,原理和上面说的一样
bsTableEnv
.getConfig
.setIdleStateRetention(
Duration.ofMillis(
Time.minutes(1L).toMilliseconds
)
)
*「首次输入一批数据」

{“user_id”:1002,”page_id”:1001,”user_region”:”U001”}

{“user_id”:1001,”page_id”:1002,”user_region”:”U001”}

{“user_id”:1002,”page_id”:1001,”user_region”:”U002”}

{“user_id”:1001,”page_id”:1002,”user_region”:”U001”}

{“user_id”:1002,”page_id”:1002,”user_region”:”U001”}
「首次结果」
{“user_region”:”U001”,”pv”:1,”uv”:1}
{“user_region”:”U001”,”pv”:2,”uv”:1}
{“user_region”:”U002”,”pv”:1,”uv”:1}
{“user_region”:”U001”,”pv”:3,”uv”:2}
{“user_region”:”U001”,”pv”:4,”uv”:2}
「一分钟后再次输入数据」
{“user_id”:1001,”page_id”:1002,”user_region”:”U001”}
{“user_id”:1002,”page_id”:1002,”user_region”:”U001”}
「一分钟后再次输入数据后的结果(从头计算)」
{“user_region”:”U001”,”pv”:1,”uv”:1}
{“user_region”:”U001”,”pv”:2,”uv”:2}
「验证状态的清除成功」


3. 验证按天计算数据

「逻辑SQL」
val source =
s”””
|CREATE TABLE pageviews (
| user_id BIGINT,
| page_id BIGINT,
| user_region STRING,
| app_time BIGINT,
| dt AS FROM_UNIXTIME(app_time / 1000, ‘yyyy-MM-dd’)
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.bootstrap.servers’ = ‘xxxx’,
| ‘format’ = ‘json’
|)
“””.stripMargin

  1. val sink =<br /> s"""<br /> |CREATE TABLE pageviews_per_region (<br /> | dt STRING,<br /> | user_region STRING,<br /> | pv BIGINT,<br /> | uv BIGINT,<br /> | PRIMARY KEY (user_region) NOT ENFORCED<br /> |) WITH (<br /> | 'connector' = 'upsert-kafka',<br /> | 'topic' = 'test',<br /> | 'properties.bootstrap.servers' = 'xxxx',<br /> | 'properties.group.id' = 'test',<br /> | 'key.format' = 'json',<br /> | 'value.format' = 'json'<br /> |)<br /> """.stripMargin<br /> tableEnv.executeSql(source)<br /> tableEnv.executeSql(sink)
  2. val insert =<br /> s"""<br /> |INSERT INTO pageviews_per_region<br /> |SELECT<br /> | dt,<br /> | user_region,<br /> | COUNT(*),<br /> | COUNT(DISTINCT user_id)<br /> |FROM pageviews<br /> |GROUP BY dt,user_region<br /> """.stripMargin<br /> tableEnv.executeSql(insert)<br />**「测试数据」**<br />2021-08-04数据<br />>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1628061421071}

{“user_id”:1002,”page_id”:1002,”user_region”:”U001”,”app_time”:1628061421071}

{“user_id”:1002,”page_id”:1002,”user_region”:”U001”,”app_time”:1628006400000}

2021-08-03数据
>{“user_id”:1002,”page_id”:1002,”user_region”:”U001”,”app_time”:1627920000000}

{“user_id”:1002,”page_id”:1002,”user_region”:”U001”,”app_time”:1627920000000}
「测试结果」
{“dt”:”2021-08-04”,”user_region”:”U001”,”pv”:1,”uv”:1}

{“dt”:”2021-08-04”,”user_region”:”U001”,”pv”:2,”uv”:1}

{“dt”:”2021-08-04”,”user_region”:”U001”,”pv”:3,”uv”:1}

{“dt”:”2021-08-03”,”user_region”:”U001”,”pv”:1,”uv”:1}

{“dt”:”2021-08-03”,”user_region”:”U001”,”pv”:2,”uv”:1}


4. 替代join方式

「方案一 UDF」
先实现一个简单的
class JoinFunction extends ScalarFunction {
def eval(str:String): String ={
str+”_hello” }

}
tableEnv.createTemporarySystemFunction(“test_function”,new JoinFunction())

val sql =
s”””

  1. |SELECT<br /> | dt,<br /> | test_function(user_region),<br /> | COUNT(*),<br /> | COUNT(DISTINCT user_id)<br /> |FROM pageviews<br /> |GROUP BY dt,user_region<br /> """.stripMargin <br />tableEnv.executeSql(sql).print()<br />+----+--------------------------------+--------------------------------+----------------------+----------------------+<br />| op | dt | EXPR$1 | EXPR$2 | EXPR$3 |<br />+----+--------------------------------+--------------------------------+----------------------+----------------------+<br />| +I | 2021-08-03 | U001_hello | 1 | 1 |

去库里查找(类join取值)
class JoinFunction extends ScalarFunction {
def eval(column:String): String ={
println(s”进来的参数:column:${column}”)
val sql=s”select app_code from default.test_flink where order_number=’$column’” val maps: util.List[util.Map[String, AnyRef]] = ImpalaDevUtil.executeQuery(sql, null)
println(maps)
println(!maps.isEmpty && maps.size()>0)
if (!maps.isEmpty && maps.size()>0) {
val app_code: String = maps.get(0).get(“app_code”).toString
println(s”获取的值:$app_code”)
return app_code
}
null
}

}
注册函数与逻辑
tableEnv.createTemporarySystemFunction(“join_function”,new JoinFunction())
“””
|SELECT
| dt,
| join_function(user_region) appcode,
| COUNT(),
| COUNT(DISTINCT user_id)
|FROM pageviews
|GROUP BY dt,user_region
“””.stripMargin
数据
{“user_id”:1002,”page_id”:1002,”user_region”:”1556666”,”app_time”:1627920000000}
*结果

+——+————————————————+————————————————+———————————+———————————+
| op | dt | appcode | EXPR$2 | EXPR$3 |
+——+————————————————+————————————————+———————————+———————————+
| +I | 2021-08-03 | 25656660 | 1 | 1 |


方案2:实现其他库的connectors(可以用Join)
JDBC 现支持Derby,MySQL,Postgres,对于不同的数据源需要自定义:比如Impala
1. ImpalaRowConverter 修改converterName即可
public class ImpalaRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;

  1. public String converterName() {<br /> return "Impala";<br /> }
  2. public ImpalaRowConverter(RowType rowType) {<br /> super(rowType);<br /> }<br />}<br />**2. 实现 ImpalaDialect**<br />**并重写getUpsertStatement 或者直接复制MySQLDialect微修改就可以**<br />ImpalaDialect extends AbstractDialect<br />**3. 增添Impala**<br />public final class JdbcDialects {

private static final List DIALECTS = Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect());
}
4. 重新打包flink-connector-jdbc替换Flink集群/lib下面原生的jar包


5. Bug记录

FlinkSQL 找不到 kafka connector
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: ‘connector’=’kafka’ at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
… 53 more

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
解决:下载到lib包下


org.apache.flink
flink-sql-connector-kafka_2.11
1.13.1
provided


kafka目前只能写入append的数据
TableSink doesn’t support consuming update changes which is produced by node GroupAggregate
解决:这个是正常现象。如果你用了普通的group by的话,那么它的结果就是有更新的,所以需要sink支持写入update的结果, 但是kafka目前只能写入append的数据,所以会报上面的错误。
o 使用upsert-kafka
WITH (‘connector’ = ‘upsert-kafka’,’topic’ = ‘test’,’properties.bootstrap.servers’ = ‘xxxx’,’properties.group.id’ = ‘test’,
‘key.format’ = ‘json’,
‘value.format’ = ‘json’)
o 或者使用window group


总结

通过以上验证,最起码前期准备工作已经做完,可以用SQL去开发业务了。其他性能问题或者业务复杂度问题根据情况去解决。
如果其他疑点需要验证或者调研的,可以评论或者留言给我……