source
kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers", url)
properties.setProperty("auto.offset.reset", "latest")
env.addSource(new FlinkKafkaConsumer011[String](
topic,new SimpleStringSchema[String],properties
))
TransFrom
Sink
官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/
kafka
- 导入kafka 依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
- sink-kafka ( scala )
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection( List("a","b","c"))
.addSink(new FlinkKafkaProducer011[String](
"hadoop-master:9092",
"flink",
new SimpleStringSchema()
) )
env.execute()
redis
- pom 依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
- 代码实现
class RedisExampleMapper extends RedisMapper[(String, String)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
}
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
ElasticSearch
- 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
- ES 中创建 Index
PUT test02
{
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "keyword"
},
"id": {
"type": "keyword"
}
}
}
}
}
代码实现
/**
将 List 中的数据 写入到 ES 中
/
def esSink() = {
// new HttpHosts
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("192.168.1.10",9200))
// new ES sink
val esSink = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val json = new util.HashMap[String, String]()
json.put("data", t)
val indexRequest = Requests.indexRequest()
.index("test02")
.`type`("_doc")
.source(json)
requestIndexer.add(indexRequest)
}
}
)
// 产生一个 List 数据, 写入 ES 中
env.fromCollection( List("a","b","c") )
.addSink(esSink.build())
env.execute()
}
自定义 ( mysql )
- pom
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
- 创建表
CREATE TABLE `student` (
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`id` int(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
- 代码实现
def MysqlSink() = {
env.fromCollection( List( ("a",1),("b",2),("c",3)))
.addSink(new MySQLSink() )
env.execute()
}
MySQLSink Class 的代码实现
package com.ylb.myCluss
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
/**
* @author yanglibin
* @create 2020-03-03 21:22
*/
class MySQLSink extends RichSinkFunction[(String,Int)] {
var connection:Connection = _
var preparedStatement:PreparedStatement = _
override def invoke(value: (String,Int), context: SinkFunction.Context[_]): Unit = {
// 设置
preparedStatement.setString(1,value._1)
preparedStatement.setInt(2,value._2)
// 执行
preparedStatement.execute()
}
// 建立链接
override def open(parameters: Configuration): Unit = {
// Class.forName("")
connection = DriverManager.getConnection("jdbc:mysql://hadoop-node-02:3306/flink-test",
"root", "123")
val sql = "INSERT INTO student(name,age) VALUES (?,?)"
preparedStatement = connection.prepareStatement(sql)
}
// close
override def close(): Unit = {
if (preparedStatement != null) preparedStatement.close()
if (connection != null ) connection.close()
}
}