1. 读取 kafka,写入 kafka
1.1 代码实现
import com.wsy.util.SparkStreamingQueryListenerimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode, Trigger}import scala.concurrent.duration._object Kafka2Kafka extends Logging {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .appName(this.getClass.getSimpleName)      .master("yarn")      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .config("spark.default.parallelism", "300")      .config("spark.sql.shuffle.partitions", "200")      .config("enableSendEmailOnTaskFail", "true")      .config("spark.extraListeners", "com.wsy.util.SparkAppListener")      .getOrCreate()    spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))    import spark.implicits._    val df = spark      .readStream      .format("kafka")      .option("kafka.bootstrap.servers", "127.0.1:9092")      .option("subscribe", "mytopic")      .load()    val ds = df.selectExpr("CAST(value AS STRING)").as[String]    //时间单位参考这个类    //import org.apache.spark.unsafe.types.CalendarInterval    //hour minute second    val dsKVagg = ds.groupBy(window(current_timestamp, "1 hour", "1 hour"), $"name", $"age")      .agg(        count("value").as("count"),        max("value").as("max"),        min("value").as("min"),        avg("value").as("avg"),        variance("value").as("variance"),        stddev("value").as("stddev")      )    val query = dsKVagg.selectExpr("CAST(value AS STRING)")      .writeStream      .outputMode(OutputMode.Update())      .option("checkpointLocation", "hdfs:///data/checkpoint/directory")      .format("kafka")      .option("kafka.bootstrap.servers", "127.0.1:9092")      .option("topic", "topic")      .trigger(Trigger.ProcessingTime(30.minutes))      .queryName(this.getClass.getSimpleName)      .start()    query.awaitTermination()  }}
2. 读取 kafka,写入 hbase
2.1 创建 hbase 连接池
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}import org.apache.hadoop.hbase.HBaseConfigurationobject HbaseConnectionPool {  // _ 等价于null    private var conn: Connection = _    def getConnection: Connection = {        if (conn == null) {            this.synchronized {                if (conn == null) {                    createConnection()                }            }        }        conn    }    private def createConnection():Unit={        val hbaseConf = HBaseConfiguration.create()        hbaseConf.set("hbase.zookeeper.quorum", "localhost")        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")        conn = ConnectionFactory.createConnection(hbaseConf)    }}
2.2 创建写入 ForeachWriter
import java.text.SimpleDateFormatimport java.util.Dateimport org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.ForeachWriterclass HbaseForeachWriter extends ForeachWriter[(String, String)] with Logging {  private var table: Table = null  private val hbase_table_name:String = "table_name"  override def open(partitionId: Long, version: Long): Boolean = {    table = HbaseConnectionPool.getConnection().getTable(TableName.valueOf(hbase_table_name))    true  }  override def process(value: (String, String)): Unit = {    val (name, age) = value    if (name == null || name == "null") {      val date = new SimpleDateFormat("yyyyMMdd").format(new Date())      val mailTo = "cloud@huawei.com"      val mailTittle = s"${date}日, name==null"      val mailText = s"来自项目com/wsy"      val mailInfo = Array(mailTo, mailTittle, mailText)      Message.sendMail(mailInfo)    } else {      val put = new Put(Bytes.toBytes(name))      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("all"), Bytes.toBytes(age))      table.put(put)    }  }  override def close(errorOrNull: Throwable): Unit = {    if (table != null) {      table.close()    }  }}
2.3 代码实现
import com.wsy.writer.HbaseForeachWriterimport com.wsy.util.{Message, SparkStreamingQueryListener}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger}import org.apache.spark.sql.functions._import scala.concurrent.duration._object Kafka2Hbase extends Logging {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .appName(this.getClass.getSimpleName)      .master("yarn")      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      .config("spark.default.parallelism", "200")      .config("spark.sql.shuffle.partitions", "100")      .config("enableSendEmailOnTaskFail", "true")      .config("spark.extraListeners", "com.wsy.util.SparkAppListener")      .getOrCreate()    spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))    import spark.implicits._    val df = spark      .readStream      .format("kafka")      .option("kafka.bootstrap.servers", "127.0.0.1:9020")      .option("subscribe", "topic")    val ds = df.selectExpr("CAST(value AS STRING)")      .select("name", "age")      .as[(String, String)]    val query = ds.writeStream      .outputMode(OutputMode.Update())      .option("checkpointLocation", "hdfs:///data/checkpoint/directory")      .foreach(new HbaseForeachWriter)      .trigger(Trigger.ProcessingTime(10.seconds))      .queryName(this.getClass.getSimpleName)      .start()    query.awaitTermination()  }}