1. 读取 kafka,写入 kafka

1.1 代码实现

  1. import com.wsy.util.SparkStreamingQueryListener
  2. import org.apache.spark.internal.Logging
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.functions._
  5. import org.apache.spark.sql.streaming.{OutputMode, Trigger}
  6. import scala.concurrent.duration._
  7. object Kafka2Kafka extends Logging {
  8. def main(args: Array[String]): Unit = {
  9. val spark = SparkSession.builder()
  10. .appName(this.getClass.getSimpleName)
  11. .master("yarn")
  12. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  13. .config("spark.default.parallelism", "300")
  14. .config("spark.sql.shuffle.partitions", "200")
  15. .config("enableSendEmailOnTaskFail", "true")
  16. .config("spark.extraListeners", "com.wsy.util.SparkAppListener")
  17. .getOrCreate()
  18. spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
  19. import spark.implicits._
  20. val df = spark
  21. .readStream
  22. .format("kafka")
  23. .option("kafka.bootstrap.servers", "127.0.1:9092")
  24. .option("subscribe", "mytopic")
  25. .load()
  26. val ds = df.selectExpr("CAST(value AS STRING)").as[String]
  27. //时间单位参考这个类
  28. //import org.apache.spark.unsafe.types.CalendarInterval
  29. //hour minute second
  30. val dsKVagg = ds.groupBy(window(current_timestamp, "1 hour", "1 hour"), $"name", $"age")
  31. .agg(
  32. count("value").as("count"),
  33. max("value").as("max"),
  34. min("value").as("min"),
  35. avg("value").as("avg"),
  36. variance("value").as("variance"),
  37. stddev("value").as("stddev")
  38. )
  39. val query = dsKVagg.selectExpr("CAST(value AS STRING)")
  40. .writeStream
  41. .outputMode(OutputMode.Update())
  42. .option("checkpointLocation", "hdfs:///data/checkpoint/directory")
  43. .format("kafka")
  44. .option("kafka.bootstrap.servers", "127.0.1:9092")
  45. .option("topic", "topic")
  46. .trigger(Trigger.ProcessingTime(30.minutes))
  47. .queryName(this.getClass.getSimpleName)
  48. .start()
  49. query.awaitTermination()
  50. }
  51. }

2. 读取 kafka,写入 hbase

2.1 创建 hbase 连接池

  1. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
  2. import org.apache.hadoop.hbase.HBaseConfiguration
  3. object HbaseConnectionPool {
  4. // _ 等价于null
  5. private var conn: Connection = _
  6. def getConnection: Connection = {
  7. if (conn == null) {
  8. this.synchronized {
  9. if (conn == null) {
  10. createConnection()
  11. }
  12. }
  13. }
  14. conn
  15. }
  16. private def createConnection():Unit={
  17. val hbaseConf = HBaseConfiguration.create()
  18. hbaseConf.set("hbase.zookeeper.quorum", "localhost")
  19. hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
  20. conn = ConnectionFactory.createConnection(hbaseConf)
  21. }
  22. }

2.2 创建写入 ForeachWriter

  1. import java.text.SimpleDateFormat
  2. import java.util.Date
  3. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
  4. import org.apache.hadoop.hbase.util.Bytes
  5. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
  6. import org.apache.spark.internal.Logging
  7. import org.apache.spark.sql.ForeachWriter
  8. class HbaseForeachWriter extends ForeachWriter[(String, String)] with Logging {
  9. private var table: Table = null
  10. private val hbase_table_name:String = "table_name"
  11. override def open(partitionId: Long, version: Long): Boolean = {
  12. table = HbaseConnectionPool.getConnection().getTable(TableName.valueOf(hbase_table_name))
  13. true
  14. }
  15. override def process(value: (String, String)): Unit = {
  16. val (name, age) = value
  17. if (name == null || name == "null") {
  18. val date = new SimpleDateFormat("yyyyMMdd").format(new Date())
  19. val mailTo = "cloud@huawei.com"
  20. val mailTittle = s"${date}日, name==null"
  21. val mailText = s"来自项目com/wsy"
  22. val mailInfo = Array(mailTo, mailTittle, mailText)
  23. Message.sendMail(mailInfo)
  24. } else {
  25. val put = new Put(Bytes.toBytes(name))
  26. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("all"), Bytes.toBytes(age))
  27. table.put(put)
  28. }
  29. }
  30. override def close(errorOrNull: Throwable): Unit = {
  31. if (table != null) {
  32. table.close()
  33. }
  34. }
  35. }

2.3 代码实现

  1. import com.wsy.writer.HbaseForeachWriter
  2. import com.wsy.util.{Message, SparkStreamingQueryListener}
  3. import org.apache.spark.internal.Logging
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger}
  6. import org.apache.spark.sql.functions._
  7. import scala.concurrent.duration._
  8. object Kafka2Hbase extends Logging {
  9. def main(args: Array[String]): Unit = {
  10. val spark = SparkSession.builder()
  11. .appName(this.getClass.getSimpleName)
  12. .master("yarn")
  13. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  14. .config("spark.default.parallelism", "200")
  15. .config("spark.sql.shuffle.partitions", "100")
  16. .config("enableSendEmailOnTaskFail", "true")
  17. .config("spark.extraListeners", "com.wsy.util.SparkAppListener")
  18. .getOrCreate()
  19. spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
  20. import spark.implicits._
  21. val df = spark
  22. .readStream
  23. .format("kafka")
  24. .option("kafka.bootstrap.servers", "127.0.0.1:9020")
  25. .option("subscribe", "topic")
  26. val ds = df.selectExpr("CAST(value AS STRING)")
  27. .select("name", "age")
  28. .as[(String, String)]
  29. val query = ds.writeStream
  30. .outputMode(OutputMode.Update())
  31. .option("checkpointLocation", "hdfs:///data/checkpoint/directory")
  32. .foreach(new HbaseForeachWriter)
  33. .trigger(Trigger.ProcessingTime(10.seconds))
  34. .queryName(this.getClass.getSimpleName)
  35. .start()
  36. query.awaitTermination()
  37. }
  38. }