Batch WordCount

    1. %flink
    2. val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
    3. data.flatMap(line => line.split("\\s"))
    4. .map(w => (w, 1))
    5. .groupBy(0)
    6. .sum(1)
    7. .print()

    Streaming WordCount

    1. %flink
    2. val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
    3. data.flatMap(line => line.split("\\s"))
    4. .map(w => (w, 1))
    5. .keyBy(0)
    6. .sum(1)
    7. .print
    8. senv.execute()

    Read HDFS File

    1. %flink
    2. val data = benv.readTextFile("hdfs:///tmp/bank.csv")
    3. data.print()
    1. %flink.conf
    2. # 设置任务使用的时间属性是eventtime
    3. #pipeline.time-characteristic EventTime
    4. #3 设置checkpoint的时间间隔
    5. #execution.checkpointing.interval 10000
    6. # 确保检查点之间的间隔
    7. #execution.checkpointing.min-pause 60000
    8. # 设置checkpoint的超时时间
    9. #execution.checkpointing.timeout 60000
    10. # 设置任务取消后保留hdfs上的checkpoint文件
    11. #execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION

    配置 kafka source

    1. %flink.ssql
    2. -- https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/create.html
    3. -- https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connect.html#table-formats
    4. DROP TABLE IF EXISTS ods__log;
    5. CREATE TABLE ods__log (
    6. `database` STRING,
    7. `table` STRING ,
    8. `type` STRING,
    9. `data` ROW(id BIGINT,server_id INT,log_user STRING, f1 STRING,f2 STRING,log_time TIMESTAMP(3)),
    10. `log_ts` as IF(data.log_time is NULL,TO_TIMESTAMP('1970-01-01 00:00:00'),data.log_time) ,
    11. proctime AS PROCTIME()
    12. ) WITH (
    13. 'connector' = 'kafka', -- using kafka connector
    14. 'topic' = 'ylyh_test', -- kafka topic
    15. 'properties.group.id' = 'k2q',
    16. 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', -- kafka broker address
    17. 'format' = 'json', -- the data format is json
    18. 'json.fail-on-missing-field' = 'false',
    19. 'json.ignore-parse-errors' = 'true'
    20. );

    Create kudu 统计 rpt slink

    1. %flink.ssql
    2. -- https://www.jianshu.com/p/bb94fe99df87
    3. DROP TABLE IF EXISTS fs_table;
    4. CREATE TABLE fs_table (
    5. `data` STRING,
    6. `e` STRING,
    7. `dt` STRING,
    8. `hour` STRING
    9. ) PARTITIONED BY (`e`,`dt`, `hour`) WITH (
    10. 'connector'='filesystem',
    11. 'path'='hdfs:///user/hive/warehouse/test.db/fs_table',
    12. 'format'='parquet',
    13. 'sink.partition-commit.delay'='1 h',
    14. 'sink.partition-commit.policy.kind'='success-file',
    15. 'auto-compaction'='true'
    16. );

    file slink

    1. %flink.ssql
    2. -- https://www.jianshu.com/p/bb94fe99df87
    3. DROP TABLE IF EXISTS fs_table;
    4. CREATE TABLE fs_table (
    5. `data` STRING,
    6. `e` STRING,
    7. `dt` STRING,
    8. `hour` STRING
    9. ) PARTITIONED BY (`e`,`dt`, `hour`) WITH (
    10. 'connector'='filesystem',
    11. 'path'='hdfs:///user/hive/warehouse/test.db/fs_table',
    12. 'format'='parquet',
    13. 'sink.partition-commit.delay'='1 h',
    14. 'sink.partition-commit.policy.kind'='success-file'
    15. );

    转大写 UDF

    1. %flink
    2. // https://blog.csdn.net/wangpei1949/article/details/103444412
    3. class ScalaUpper extends ScalarFunction {
    4. def eval(str: String) = str.toUpperCase
    5. }
    6. stenv.registerFunction("scala_upper", new ScalaUpper())

    解析 JSON UDF

    1. %flink
    2. import org.apache.flink.types.Row
    3. import com.google.gson.Gson
    4. class ScalaToJson extends ScalarFunction {
    5. def eval(obj:Any):String ={
    6. val gson = new Gson
    7. val jsonString = gson.toJson(obj)
    8. return jsonString
    9. }
    10. }
    11. stenv.registerFunction("tojson", new ScalaToJson())

    获取 JSON UDF

    1. %flink
    2. import com.google.gson.JsonObject
    3. import com.google.gson.JsonParser
    4. class ScalaGetJsonObject extends ScalarFunction {
    5. def eval(jsonString:String,key:String):String ={
    6. val json = new JsonParser()
    7. val obj = json.parse(jsonString).asInstanceOf[JsonObject]
    8. return obj.get(key).getAsString()
    9. }
    10. }
    11. stenv.registerFunction("get_json_object", new ScalaGetJsonObject())

    bitmap 精确去重

    1. %flink
    2. import org.roaringbitmap.longlong.Roaring64NavigableMap
    3. // https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/functions/udfs.html#%E8%81%9A%E5%90%88%E5%87%BD%E6%95%B0
    4. // https://gitbook.cn/books/5e4a6cc338499d0e64018856/index.html
    5. class PreciseDistinct extends AggregateFunction[Long, Roaring64NavigableMap] {
    6. override def createAccumulator():Roaring64NavigableMap = {
    7. return new Roaring64NavigableMap()
    8. }
    9. def accumulate(accumulator: Roaring64NavigableMap, id: Long): Unit = {
    10. accumulator.addLong(id);
    11. }
    12. override def getValue(accumulator: Roaring64NavigableMap): Long = {
    13. return accumulator.getLongCardinality();
    14. }
    15. }
    16. stenv.registerFunction("precise_distinct", new PreciseDistinct())

    连接 redis

    1. %flink
    2. import org.apache.flink.table.functions.FunctionContext
    3. import org.apache.flink.table.functions.ScalarFunction
    4. import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
    5. class RedisIdMap() extends ScalarFunction {
    6. private var jedis:Jedis = null
    7. private var count_key:String = "_internal_count_"
    8. @throws[Exception]
    9. override def open(context: FunctionContext) = {
    10. super.open(context)
    11. val redisHost = context.getJobParameter("redis.host", "master")
    12. val redisPort = Integer.valueOf(context.getJobParameter("redis.port", "6379"))
    13. var redisPwd = context.getJobParameter("redis.password", "123456")
    14. jedis = new Jedis(redisHost, redisPort,5)
    15. jedis.auth(redisPwd)
    16. }
    17. @throws[Exception]
    18. override def close() = {
    19. super.close()
    20. jedis.close()
    21. }
    22. def eval(key: String,group:String):Long = {
    23. var id = jedis.hget(group,key)
    24. if (id == null){
    25. var new_id = jedis.hincrBy(group,count_key,1)
    26. jedis.hset(group,key,String.valueOf(new_id))
    27. return new_id
    28. }
    29. return id.toLong
    30. }
    31. }
    32. stenv.registerFunction("id_map", new RedisIdMap())
    1. %flink.ssql(type=update)
    2. -- https://www.jianshu.com/p/bb94fe99df87
    3. -- batch sql, select with partition pruning
    4. SELECT data,tojson(`data`),get_json_object(tojson(`data`),'kind') FROM ods__log limit 10;

    查询数据插入结果表

    1. %flink.ssql(type=update,parallelism=5,savepointDir=hdfs:///user/flink_zeppelin/savepoints/zeppelin/k2q,resumeFromSavepoint=false,resumeFromLatestCheckpoint=false)
    2. -- 参数说明(注意不需要加引号):http://zeppelin.apache.org/docs/0.9.0-preview2/interpreter/flink.html
    3. -- savepointDir 设置
    4. -- refreshInterval 刷新间隔
    5. -- parallelismd 并行度
    6. -- template html模板
    7. -- maxParallelism 最大并行度
    8. -- streaming sql, insert into file system table
    9. INSERT INTO fs_table
    10. SELECT tojson(`data`),`table`, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM ods__log where `table` like 'log_%';