Batch WordCount
%flinkval data = benv.fromElements("hello world", "hello flink", "hello hadoop")data.flatMap(line => line.split("\\s")).map(w => (w, 1)).groupBy(0).sum(1).print()
Streaming WordCount
%flinkval data = senv.fromElements("hello world", "hello flink", "hello hadoop")data.flatMap(line => line.split("\\s")).map(w => (w, 1)).keyBy(0).sum(1)senv.execute()
Read HDFS File
%flinkval data = benv.readTextFile("hdfs:///tmp/bank.csv")data.print()
%flink.conf# 设置任务使用的时间属性是eventtime#pipeline.time-characteristic EventTime#3 设置checkpoint的时间间隔#execution.checkpointing.interval 10000# 确保检查点之间的间隔#execution.checkpointing.min-pause 60000# 设置checkpoint的超时时间#execution.checkpointing.timeout 60000# 设置任务取消后保留hdfs上的checkpoint文件#execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
配置 kafka source
%flink.ssql-- https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/create.html-- https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connect.html#table-formatsDROP TABLE IF EXISTS ods__log;CREATE TABLE ods__log (`database` STRING,`table` STRING ,`type` STRING,`data` ROW(id BIGINT,server_id INT,log_user STRING, f1 STRING,f2 STRING,log_time TIMESTAMP(3)),`log_ts` as IF(data.log_time is NULL,TO_TIMESTAMP('1970-01-01 00:00:00'),data.log_time) ,proctime AS PROCTIME()) WITH ('connector' = 'kafka', -- using kafka connector'topic' = 'ylyh_test', -- kafka topic'properties.group.id' = 'k2q','properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', -- kafka broker address'format' = 'json', -- the data format is json'json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true');
Create kudu 统计 rpt slink
%flink.ssql-- https://www.jianshu.com/p/bb94fe99df87DROP TABLE IF EXISTS fs_table;CREATE TABLE fs_table (`data` STRING,`e` STRING,`dt` STRING,`hour` STRING) PARTITIONED BY (`e`,`dt`, `hour`) WITH ('connector'='filesystem','path'='hdfs:///user/hive/warehouse/test.db/fs_table','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file','auto-compaction'='true');
file slink
%flink.ssql-- https://www.jianshu.com/p/bb94fe99df87DROP TABLE IF EXISTS fs_table;CREATE TABLE fs_table (`data` STRING,`e` STRING,`dt` STRING,`hour` STRING) PARTITIONED BY (`e`,`dt`, `hour`) WITH ('connector'='filesystem','path'='hdfs:///user/hive/warehouse/test.db/fs_table','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file');
转大写 UDF
%flink// https://blog.csdn.net/wangpei1949/article/details/103444412class ScalaUpper extends ScalarFunction {def eval(str: String) = str.toUpperCase}stenv.registerFunction("scala_upper", new ScalaUpper())
解析 JSON UDF
%flinkimport org.apache.flink.types.Rowimport com.google.gson.Gsonclass ScalaToJson extends ScalarFunction {def eval(obj:Any):String ={val gson = new Gsonval jsonString = gson.toJson(obj)return jsonString}}stenv.registerFunction("tojson", new ScalaToJson())
获取 JSON UDF
%flinkimport com.google.gson.JsonObjectimport com.google.gson.JsonParserclass ScalaGetJsonObject extends ScalarFunction {def eval(jsonString:String,key:String):String ={val json = new JsonParser()val obj = json.parse(jsonString).asInstanceOf[JsonObject]return obj.get(key).getAsString()}}stenv.registerFunction("get_json_object", new ScalaGetJsonObject())
bitmap 精确去重
%flinkimport org.roaringbitmap.longlong.Roaring64NavigableMap// https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/functions/udfs.html#%E8%81%9A%E5%90%88%E5%87%BD%E6%95%B0// https://gitbook.cn/books/5e4a6cc338499d0e64018856/index.htmlclass PreciseDistinct extends AggregateFunction[Long, Roaring64NavigableMap] {override def createAccumulator():Roaring64NavigableMap = {return new Roaring64NavigableMap()}def accumulate(accumulator: Roaring64NavigableMap, id: Long): Unit = {accumulator.addLong(id);}override def getValue(accumulator: Roaring64NavigableMap): Long = {return accumulator.getLongCardinality();}}stenv.registerFunction("precise_distinct", new PreciseDistinct())
连接 redis
%flinkimport org.apache.flink.table.functions.FunctionContextimport org.apache.flink.table.functions.ScalarFunctionimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}class RedisIdMap() extends ScalarFunction {private var jedis:Jedis = nullprivate var count_key:String = "_internal_count_"@throws[Exception]override def open(context: FunctionContext) = {super.open(context)val redisHost = context.getJobParameter("redis.host", "master")val redisPort = Integer.valueOf(context.getJobParameter("redis.port", "6379"))var redisPwd = context.getJobParameter("redis.password", "123456")jedis = new Jedis(redisHost, redisPort,5)jedis.auth(redisPwd)}@throws[Exception]override def close() = {super.close()jedis.close()}def eval(key: String,group:String):Long = {var id = jedis.hget(group,key)if (id == null){var new_id = jedis.hincrBy(group,count_key,1)jedis.hset(group,key,String.valueOf(new_id))return new_id}return id.toLong}}stenv.registerFunction("id_map", new RedisIdMap())
%flink.ssql(type=update)-- https://www.jianshu.com/p/bb94fe99df87-- batch sql, select with partition pruningSELECT data,tojson(`data`),get_json_object(tojson(`data`),'kind') FROM ods__log limit 10;
查询数据插入结果表
%flink.ssql(type=update,parallelism=5,savepointDir=hdfs:///user/flink_zeppelin/savepoints/zeppelin/k2q,resumeFromSavepoint=false,resumeFromLatestCheckpoint=false)-- 参数说明(注意不需要加引号):http://zeppelin.apache.org/docs/0.9.0-preview2/interpreter/flink.html-- savepointDir 设置-- refreshInterval 刷新间隔-- parallelismd 并行度-- template html模板-- maxParallelism 最大并行度-- streaming sql, insert into file system tableINSERT INTO fs_tableSELECT tojson(`data`),`table`, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM ods__log where `table` like 'log_%';
