source
kafka
val properties = new Properties()properties.setProperty("bootstrap.servers", url)properties.setProperty("auto.offset.reset", "latest")env.addSource(new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema[String],properties))
TransFrom
Sink
官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/
kafka
- 导入kafka 依赖
 
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version></dependency>
- sink-kafka ( scala )
 
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection( List("a","b","c")).addSink(new FlinkKafkaProducer011[String]("hadoop-master:9092","flink",new SimpleStringSchema()) )env.execute()
redis
- pom 依赖
 
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>
- 代码实现
 
class RedisExampleMapper extends RedisMapper[(String, String)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2}val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
ElasticSearch
- 导入依赖
 
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.9.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.5</version></dependency>
- ES 中创建 Index
 
PUT test02{"mappings": {"_doc": {"properties": {"name": {"type": "keyword"},"id": {"type": "keyword"}}}}}
代码实现
/**将 List 中的数据 写入到 ES 中/def esSink() = {// new HttpHostsval httpHosts = new util.ArrayList[HttpHost]()httpHosts.add(new HttpHost("192.168.1.10",9200))// new ES sinkval esSink = new ElasticsearchSink.Builder[String](httpHosts,new ElasticsearchSinkFunction[String] {override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {val json = new util.HashMap[String, String]()json.put("data", t)val indexRequest = Requests.indexRequest().index("test02").`type`("_doc").source(json)requestIndexer.add(indexRequest)}})// 产生一个 List 数据, 写入 ES 中env.fromCollection( List("a","b","c") ).addSink(esSink.build())env.execute()}
自定义 ( mysql )
- pom
 
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency>
- 创建表
 
CREATE TABLE `student` (`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,`id` int(20) NOT NULL AUTO_INCREMENT,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
- 代码实现
 
def MysqlSink() = {env.fromCollection( List( ("a",1),("b",2),("c",3))).addSink(new MySQLSink() )env.execute()}
MySQLSink Class 的代码实现
package com.ylb.myClussimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}/*** @author yanglibin* @create 2020-03-03 21:22*/class MySQLSink extends RichSinkFunction[(String,Int)] {var connection:Connection = _var preparedStatement:PreparedStatement = _override def invoke(value: (String,Int), context: SinkFunction.Context[_]): Unit = {// 设置preparedStatement.setString(1,value._1)preparedStatement.setInt(2,value._2)// 执行preparedStatement.execute()}// 建立链接override def open(parameters: Configuration): Unit = {// Class.forName("")connection = DriverManager.getConnection("jdbc:mysql://hadoop-node-02:3306/flink-test","root", "123")val sql = "INSERT INTO student(name,age) VALUES (?,?)"preparedStatement = connection.prepareStatement(sql)}// closeoverride def close(): Unit = {if (preparedStatement != null) preparedStatement.close()if (connection != null ) connection.close()}}
