问题导读:
    1、如何使用Flink导入数据?
    2、如何使用Spark导入数据?
    3、如何从MySQL中导入数据?
    4、如何从Hive中导入数据?

    本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink、Spark、Kafka、MySQL、Hive将数据导入ClickHouse,具体内容包括:

    • 使用Flink导入数据
    • 使用Spark导入数据
    • 从Kafka中导入数据
    • 从MySQL中导入数据
    • 从Hive中导入数据

    使用Flink导入数据

    本文介绍使用 flink-jdbc将数据导入ClickHouse,Maven依赖为:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
    4. <version>1.10.1</version>
    5. </dependency>

    示例

    本示例使用Kafka connector,通过Flink将Kafka数据实时导入到ClickHouse

    1. public class FlinkSinkClickHouse {
    2. public static void main(String[] args) throws Exception {
    3. String url = "jdbc:clickhouse://192.168.10.203:8123/default";
    4. String user = "default";
    5. String passwd = "hOn0d9HT";
    6. String driver = "ru.yandex.clickhouse.ClickHouseDriver";
    7. int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
    8. // 创建执行环境
    9. EnvironmentSettings settings = EnvironmentSettings
    10. .newInstance()
    11. .useBlinkPlanner()
    12. .inStreamingMode()
    13. .build();
    14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    15. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    16. String kafkaSource11 = "" +
    17. "CREATE TABLE user_behavior ( " +
    18. " `user_id` BIGINT, -- 用户id\n" +
    19. " `item_id` BIGINT, -- 商品id\n" +
    20. " `cat_id` BIGINT, -- 品类id\n" +
    21. " `action` STRING, -- 用户行为\n" +
    22. " `province` INT, -- 用户所在的省份\n" +
    23. " `ts` BIGINT, -- 用户行为发生的时间戳\n" +
    24. " `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
    25. " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间\n" +
    26. " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
    27. ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
    28. " 'topic' = 'user_behavior', -- kafka主题\n" +
    29. " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +
    30. " 'properties.group.id' = 'group1', -- 消费者组\n" +
    31. " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
    32. " 'format' = 'json', -- 数据源格式为 json\n" +
    33. " 'json.fail-on-missing-field' = 'true',\n" +
    34. " 'json.ignore-parse-errors' = 'false'" +
    35. ")";
    36. // Kafka Source
    37. tEnv.executeSql(kafkaSource11);
    38. String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
    39. Table table = tEnv.sqlQuery(query);
    40. String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
    41. "VALUES(?,?,?,?,?,?)";
    42. //将数据写入 ClickHouse Sink
    43. JDBCAppendTableSink sink = JDBCAppendTableSink
    44. .builder()
    45. .setDrivername(driver)
    46. .setDBUrl(url)
    47. .setUsername(user)
    48. .setPassword(passwd)
    49. .setQuery(insertIntoCkSql)
    50. .setBatchSize(batchsize)
    51. .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
    52. .build();
    53. String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
    54. TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};
    55. tEnv.registerTableSink(
    56. "sink",
    57. arr,
    58. type,
    59. sink
    60. );
    61. tEnv.insertInto(table, "sink");
    62. tEnv.execute("Flink Table API to ClickHouse Example");
    63. }
    64. }

    Note:

    • 由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
    • 在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。

    使用Spark导入数据

    本文主要介绍如何通过Spark程序写入数据到Clickhouse中。

    1. <dependency>
    2. <groupId>ru.yandex.clickhouse</groupId>
    3. <artifactId>clickhouse-jdbc</artifactId>
    4. <version>0.2.4</version>
    5. </dependency>
    6. <!-- 如果报错:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,则添加下面的依赖 -->
    7. <dependency>
    8. <groupId>com.google.guava</groupId>
    9. <artifactId>guava</artifactId>
    10. <version>28.0-jre</version>
    11. </dependency>

    示例

    1. object Spark2ClickHouseExample {
    2. val properties = new Properties()
    3. properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
    4. properties.put("user", "default")
    5. properties.put("password", "hOn0d9HT")
    6. properties.put("batchsize", "1000")
    7. properties.put("socket_timeout", "300000")
    8. properties.put("numPartitions", "8")
    9. properties.put("rewriteBatchedStatements", "true")
    10. case class Person(name: String, age: Long)
    11. private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
    12. import spark.implicits._
    13. // DataFrames转成DataSet
    14. val path = "file:///e:/people.json"
    15. val peopleDS = spark.read.json(path)
    16. peopleDS.createOrReplaceTempView("people")
    17. val ds = spark.sql("SELECT name,age FROM people").as[Person]
    18. ds.show()
    19. ds
    20. }
    21. def main(args: Array[String]) {
    22. val url = "jdbc:clickhouse://kms-1:8123/default"
    23. val table = "people"
    24. val spark = SparkSession
    25. .builder()
    26. .appName("Spark Example")
    27. .master("local") //设置为本地运行
    28. .getOrCreate()
    29. val ds = runDatasetCreationExample(spark)
    30. ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    31. spark.stop()
    32. }
    33. }

    从Kafka中导入数据

    主要是使用ClickHouse的表引擎。

    使用方式

    1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    2. (
    3. name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    4. name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    5. ...
    6. ) ENGINE = Kafka()
    7. SETTINGS
    8. kafka_broker_list = 'host:port',
    9. kafka_topic_list = 'topic1,topic2,...',
    10. kafka_group_name = 'group_name',
    11. kafka_format = 'data_format'[,]
    12. [kafka_row_delimiter = 'delimiter_symbol',]
    13. [kafka_schema = '',]
    14. [kafka_num_consumers = N,]
    15. [kafka_max_block_size = 0,]
    16. [kafka_skip_broken_messages = N,]
    17. [kafka_commit_every_batch = 0,]
    18. [kafka_thread_per_consumer = 0]
    19. kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
    20. kafka_topic_list Kafka 主题列表,多个主题用逗号分隔.
    21. kafka_group_name :消费者组.
    22. kafka_format Message format. 比如JSONEachRowJSONCSV等等

    使用示例

    在kafka中创建user_behavior主题,并向该主题写入数据,数据示例为:

    1. {"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
    2. {"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
    3. {"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

    在ClickHouse中创建表,选择表引擎为Kafka(),如下:

    1. CREATE TABLE kafka_user_behavior (
    2. user_id UInt64 COMMENT '用户id',
    3. item_id UInt64 COMMENT '商品id',
    4. cat_id UInt16 COMMENT '品类id',
    5. action String COMMENT '行为',
    6. province UInt8 COMMENT '省份id',
    7. ts UInt64 COMMENT '时间戳'
    8. ) ENGINE = Kafka()
    9. SETTINGS
    10. kafka_broker_list = 'cdh04:9092',
    11. kafka_topic_list = 'user_behavior',
    12. kafka_group_name = 'group1',
    13. kafka_format = 'JSONEachRow'
    14. ;
    15. -- 查询
    16. cdh04 :) select * from kafka_user_behavior ;
    17. -- 再次查看数据,发现数据为空
    18. cdh04 :) select count(*) from kafka_user_behavior;
    19. SELECT count(*)
    20. FROM kafka_user_behavior
    21. ┌─count()─┐
    22. 0
    23. └─────────┘

    通过物化视图将kafka数据导入ClickHouse

    当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

    • 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
    • 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
    • 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
    1. -- 创建Kafka引擎表
    2. CREATE TABLE kafka_user_behavior_src (
    3. user_id UInt64 COMMENT '用户id',
    4. item_id UInt64 COMMENT '商品id',
    5. cat_id UInt16 COMMENT '品类id',
    6. action String COMMENT '行为',
    7. province UInt8 COMMENT '省份id',
    8. ts UInt64 COMMENT '时间戳'
    9. ) ENGINE = Kafka()
    10. SETTINGS
    11. kafka_broker_list = 'cdh04:9092',
    12. kafka_topic_list = 'user_behavior',
    13. kafka_group_name = 'group1',
    14. kafka_format = 'JSONEachRow'
    15. ;
    16. -- 创建一张终端用户使用的表
    17. CREATE TABLE kafka_user_behavior (
    18. user_id UInt64 COMMENT '用户id',
    19. item_id UInt64 COMMENT '商品id',
    20. cat_id UInt16 COMMENT '品类id',
    21. action String COMMENT '行为',
    22. province UInt8 COMMENT '省份id',
    23. ts UInt64 COMMENT '时间戳'
    24. ) ENGINE = MergeTree()
    25. ORDER BY user_id
    26. ;
    27. -- 创建物化视图,同步数据
    28. CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    29. AS SELECT * FROM kafka_user_behavior_src ;
    30. -- 查询,多次查询,已经被查询的数据依然会被输出
    31. cdh04 :) select * from kafka_user_behavior;
    32. Note:
    33. Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。

    从MySQL中导入数据

    同kafka中导入数据类似,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。
    数据类型对应关系

    MySQL中数据类型与ClickHouse类型映射关系如下表。
    一文讲解从Flink、Spark、Kafka、MySQL、Hive导入数据到ClickHouse - 图1

    使用方式

    1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    2. (
    3. name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    4. name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    5. ...
    6. ) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

    使用示例

    1. -- 连接MySQLclickhouse数据库的test
    2. CREATE TABLE mysql_users(
    3. id Int32,
    4. name String
    5. ) ENGINE = MySQL(
    6. '192.168.10.203:3306',
    7. 'clickhouse',
    8. 'users',
    9. 'root',
    10. '123qwe');
    11. -- 查询数据
    12. cdh04 :) SELECT * FROM mysql_users;
    13. SELECT *
    14. FROM mysql_users
    15. ┌─id─┬─name──┐
    16. 1 tom
    17. 2 jack
    18. 3 lihua
    19. └────┴───────┘
    20. -- 插入数据,会将数据插入MySQL对应的表中
    21. -- 所以当查询MySQL数据时,会发现新增了一条数据
    22. INSERT INTO users VALUES(4,'robin');
    23. -- 再次查询
    24. cdh04 :) select * from mysql_users;
    25. SELECT *
    26. FROM mysql_users
    27. ┌─id─┬─name──┐
    28. 1 tom
    29. 2 jack
    30. 3 lihua
    31. 4 robin
    32. └────┴───────┘

    注意:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错:

    1. -- 执行更新
    2. ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
    3. -- 执行删除
    4. ALTER TABLE mysql_users DELETE WHERE id = 1;
    5. -- 报错
    6. DB::Exception: Mutations are not supported by storage MySQL.

    从Hive中导入数据

    本文使用Waterdrop进行数据导入,Waterdrop是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。
    我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。配置文件包括四个部分,分别是Spark、Input、filter和Output。
    关于Waterdrop的安装,十分简单,只需要下载ZIP文件,解压即可。使用Waterdrop需要安装Spark。

    1. Waterdrop安装目录的config/文件夹下创建配置文件:hive_table_batch.conf,内容如下。主要包括四部分:SparkInputfilterOutput
    • Spark部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
    • Input部分是定义数据源,其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。
    • filter部分配置一系列的转化,比如过滤字段
    • Output部分是将处理好的结构化数据写入ClickHouse,ClickHouse的连接配置。
    • 需要注意的是,必须保证hive的metastore是在服务状态。
    1. spark {
    2. spark.app.name = "Waterdrop_Hive2ClickHouse"
    3. spark.executor.instances = 2
    4. spark.executor.cores = 1
    5. spark.executor.memory = "1g"
    6. // 这个配置必需填写
    7. spark.sql.catalogImplementation = "hive"
    8. }
    9. input {
    10. hive {
    11. pre_sql = "select * from default.users"
    12. table_name = "hive_users"
    13. }
    14. }
    15. filter {}
    16. output {
    17. clickhouse {
    18. host = "kms-1:8123"
    19. database = "default"
    20. table = "users"
    21. fields = ["id", "name"]
    22. username = "default"
    23. password = "hOn0d9HT"
    24. }
    25. }
    1. 执行任务
    1. [kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh --config config/hive_table_batch.conf --master yarn --deploy-mode cluster

    这样就会启动一个Spark作业执行数据的抽取,等执行完成之后,查看ClickHouse的数据。

    总结
    本文主要介绍了如何通过Flink、Spark、Kafka、MySQL以及Hive,将数据导入到ClickHouse,对每一种方式都出了详细的示例,希望对你有所帮。