1. 读取 kafka,写入 kafka
1.1 代码实现
import com.wsy.util.SparkStreamingQueryListener
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
object Kafka2Kafka extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("yarn")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "300")
.config("spark.sql.shuffle.partitions", "200")
.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "com.wsy.util.SparkAppListener")
.getOrCreate()
spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.1:9092")
.option("subscribe", "mytopic")
.load()
val ds = df.selectExpr("CAST(value AS STRING)").as[String]
//时间单位参考这个类
//import org.apache.spark.unsafe.types.CalendarInterval
//hour minute second
val dsKVagg = ds.groupBy(window(current_timestamp, "1 hour", "1 hour"), $"name", $"age")
.agg(
count("value").as("count"),
max("value").as("max"),
min("value").as("min"),
avg("value").as("avg"),
variance("value").as("variance"),
stddev("value").as("stddev")
)
val query = dsKVagg.selectExpr("CAST(value AS STRING)")
.writeStream
.outputMode(OutputMode.Update())
.option("checkpointLocation", "hdfs:///data/checkpoint/directory")
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.1:9092")
.option("topic", "topic")
.trigger(Trigger.ProcessingTime(30.minutes))
.queryName(this.getClass.getSimpleName)
.start()
query.awaitTermination()
}
}
2. 读取 kafka,写入 hbase
2.1 创建 hbase 连接池
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.hbase.HBaseConfiguration
object HbaseConnectionPool {
// _ 等价于null
private var conn: Connection = _
def getConnection: Connection = {
if (conn == null) {
this.synchronized {
if (conn == null) {
createConnection()
}
}
}
conn
}
private def createConnection():Unit={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "localhost")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
conn = ConnectionFactory.createConnection(hbaseConf)
}
}
2.2 创建写入 ForeachWriter
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.ForeachWriter
class HbaseForeachWriter extends ForeachWriter[(String, String)] with Logging {
private var table: Table = null
private val hbase_table_name:String = "table_name"
override def open(partitionId: Long, version: Long): Boolean = {
table = HbaseConnectionPool.getConnection().getTable(TableName.valueOf(hbase_table_name))
true
}
override def process(value: (String, String)): Unit = {
val (name, age) = value
if (name == null || name == "null") {
val date = new SimpleDateFormat("yyyyMMdd").format(new Date())
val mailTo = "cloud@huawei.com"
val mailTittle = s"${date}日, name==null"
val mailText = s"来自项目com/wsy"
val mailInfo = Array(mailTo, mailTittle, mailText)
Message.sendMail(mailInfo)
} else {
val put = new Put(Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("all"), Bytes.toBytes(age))
table.put(put)
}
}
override def close(errorOrNull: Throwable): Unit = {
if (table != null) {
table.close()
}
}
}
2.3 代码实现
import com.wsy.writer.HbaseForeachWriter
import com.wsy.util.{Message, SparkStreamingQueryListener}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger}
import org.apache.spark.sql.functions._
import scala.concurrent.duration._
object Kafka2Hbase extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("yarn")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "200")
.config("spark.sql.shuffle.partitions", "100")
.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "com.wsy.util.SparkAppListener")
.getOrCreate()
spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9020")
.option("subscribe", "topic")
val ds = df.selectExpr("CAST(value AS STRING)")
.select("name", "age")
.as[(String, String)]
val query = ds.writeStream
.outputMode(OutputMode.Update())
.option("checkpointLocation", "hdfs:///data/checkpoint/directory")
.foreach(new HbaseForeachWriter)
.trigger(Trigger.ProcessingTime(10.seconds))
.queryName(this.getClass.getSimpleName)
.start()
query.awaitTermination()
}
}