source

kafka

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", url)
  3. properties.setProperty("auto.offset.reset", "latest")
  4. env.addSource(new FlinkKafkaConsumer011[String](
  5. topic,new SimpleStringSchema[String],properties
  6. ))

TransFrom

Sink

官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/

kafka

  1. 导入kafka 依赖
  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  5. <version>1.7.2</version>
  6. </dependency>
  1. sink-kafka ( scala )
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.fromCollection( List("a","b","c"))
  3. .addSink(new FlinkKafkaProducer011[String](
  4. "hadoop-master:9092",
  5. "flink",
  6. new SimpleStringSchema()
  7. ) )
  8. env.execute()

redis

  1. pom 依赖
  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>
  1. 代码实现
  1. class RedisExampleMapper extends RedisMapper[(String, String)]{
  2. override def getCommandDescription: RedisCommandDescription = {
  3. new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
  4. }
  5. override def getKeyFromData(data: (String, String)): String = data._1
  6. override def getValueFromData(data: (String, String)): String = data._2
  7. }
  8. val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
  9. stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

ElasticSearch

  1. 导入依赖
  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  5. <version>1.9.2</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
  8. <dependency>
  9. <groupId>org.apache.httpcomponents</groupId>
  10. <artifactId>httpclient</artifactId>
  11. <version>4.5.5</version>
  12. </dependency>
  1. ES 中创建 Index
  1. PUT test02
  2. {
  3. "mappings": {
  4. "_doc": {
  5. "properties": {
  6. "name": {
  7. "type": "keyword"
  8. },
  9. "id": {
  10. "type": "keyword"
  11. }
  12. }
  13. }
  14. }
  15. }
  1. 代码实现

    1. /**
    2. 将 List 中的数据 写入到 ES 中
    3. /
    4. def esSink() = {
    5. // new HttpHosts
    6. val httpHosts = new util.ArrayList[HttpHost]()
    7. httpHosts.add(new HttpHost("192.168.1.10",9200))
    8. // new ES sink
    9. val esSink = new ElasticsearchSink.Builder[String](
    10. httpHosts,
    11. new ElasticsearchSinkFunction[String] {
    12. override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    13. val json = new util.HashMap[String, String]()
    14. json.put("data", t)
    15. val indexRequest = Requests.indexRequest()
    16. .index("test02")
    17. .`type`("_doc")
    18. .source(json)
    19. requestIndexer.add(indexRequest)
    20. }
    21. }
    22. )
    23. // 产生一个 List 数据, 写入 ES 中
    24. env.fromCollection( List("a","b","c") )
    25. .addSink(esSink.build())
    26. env.execute()
    27. }

自定义 ( mysql )

  1. pom
  1. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>5.1.44</version>
  6. </dependency>
  1. 创建表
  1. CREATE TABLE `student` (
  2. `name` varchar(255) DEFAULT NULL,
  3. `age` int(11) DEFAULT NULL,
  4. `id` int(20) NOT NULL AUTO_INCREMENT,
  5. PRIMARY KEY (`id`)
  6. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
  1. 代码实现
  1. def MysqlSink() = {
  2. env.fromCollection( List( ("a",1),("b",2),("c",3)))
  3. .addSink(new MySQLSink() )
  4. env.execute()
  5. }

MySQLSink Class 的代码实现

  1. package com.ylb.myCluss
  2. import java.sql.{Connection, DriverManager, PreparedStatement}
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  5. /**
  6. * @author yanglibin
  7. * @create 2020-03-03 21:22
  8. */
  9. class MySQLSink extends RichSinkFunction[(String,Int)] {
  10. var connection:Connection = _
  11. var preparedStatement:PreparedStatement = _
  12. override def invoke(value: (String,Int), context: SinkFunction.Context[_]): Unit = {
  13. // 设置
  14. preparedStatement.setString(1,value._1)
  15. preparedStatement.setInt(2,value._2)
  16. // 执行
  17. preparedStatement.execute()
  18. }
  19. // 建立链接
  20. override def open(parameters: Configuration): Unit = {
  21. // Class.forName("")
  22. connection = DriverManager.getConnection("jdbc:mysql://hadoop-node-02:3306/flink-test",
  23. "root", "123")
  24. val sql = "INSERT INTO student(name,age) VALUES (?,?)"
  25. preparedStatement = connection.prepareStatement(sql)
  26. }
  27. // close
  28. override def close(): Unit = {
  29. if (preparedStatement != null) preparedStatement.close()
  30. if (connection != null ) connection.close()
  31. }
  32. }