1. 读取 kafka,写入 kafka
1.1 代码实现
import com.wsy.util.SparkStreamingQueryListenerimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode, Trigger}import scala.concurrent.duration._object Kafka2Kafka extends Logging { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("yarn") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", "300") .config("spark.sql.shuffle.partitions", "200") .config("enableSendEmailOnTaskFail", "true") .config("spark.extraListeners", "com.wsy.util.SparkAppListener") .getOrCreate() spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName)) import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "127.0.1:9092") .option("subscribe", "mytopic") .load() val ds = df.selectExpr("CAST(value AS STRING)").as[String] //时间单位参考这个类 //import org.apache.spark.unsafe.types.CalendarInterval //hour minute second val dsKVagg = ds.groupBy(window(current_timestamp, "1 hour", "1 hour"), $"name", $"age") .agg( count("value").as("count"), max("value").as("max"), min("value").as("min"), avg("value").as("avg"), variance("value").as("variance"), stddev("value").as("stddev") ) val query = dsKVagg.selectExpr("CAST(value AS STRING)") .writeStream .outputMode(OutputMode.Update()) .option("checkpointLocation", "hdfs:///data/checkpoint/directory") .format("kafka") .option("kafka.bootstrap.servers", "127.0.1:9092") .option("topic", "topic") .trigger(Trigger.ProcessingTime(30.minutes)) .queryName(this.getClass.getSimpleName) .start() query.awaitTermination() }}
2. 读取 kafka,写入 hbase
2.1 创建 hbase 连接池
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}import org.apache.hadoop.hbase.HBaseConfigurationobject HbaseConnectionPool { // _ 等价于null private var conn: Connection = _ def getConnection: Connection = { if (conn == null) { this.synchronized { if (conn == null) { createConnection() } } } conn } private def createConnection():Unit={ val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "localhost") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") conn = ConnectionFactory.createConnection(hbaseConf) }}
2.2 创建写入 ForeachWriter
import java.text.SimpleDateFormatimport java.util.Dateimport org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.ForeachWriterclass HbaseForeachWriter extends ForeachWriter[(String, String)] with Logging { private var table: Table = null private val hbase_table_name:String = "table_name" override def open(partitionId: Long, version: Long): Boolean = { table = HbaseConnectionPool.getConnection().getTable(TableName.valueOf(hbase_table_name)) true } override def process(value: (String, String)): Unit = { val (name, age) = value if (name == null || name == "null") { val date = new SimpleDateFormat("yyyyMMdd").format(new Date()) val mailTo = "cloud@huawei.com" val mailTittle = s"${date}日, name==null" val mailText = s"来自项目com/wsy" val mailInfo = Array(mailTo, mailTittle, mailText) Message.sendMail(mailInfo) } else { val put = new Put(Bytes.toBytes(name)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("all"), Bytes.toBytes(age)) table.put(put) } } override def close(errorOrNull: Throwable): Unit = { if (table != null) { table.close() } }}
2.3 代码实现
import com.wsy.writer.HbaseForeachWriterimport com.wsy.util.{Message, SparkStreamingQueryListener}import org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger}import org.apache.spark.sql.functions._import scala.concurrent.duration._object Kafka2Hbase extends Logging { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("yarn") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", "200") .config("spark.sql.shuffle.partitions", "100") .config("enableSendEmailOnTaskFail", "true") .config("spark.extraListeners", "com.wsy.util.SparkAppListener") .getOrCreate() spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName)) import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "127.0.0.1:9020") .option("subscribe", "topic") val ds = df.selectExpr("CAST(value AS STRING)") .select("name", "age") .as[(String, String)] val query = ds.writeStream .outputMode(OutputMode.Update()) .option("checkpointLocation", "hdfs:///data/checkpoint/directory") .foreach(new HbaseForeachWriter) .trigger(Trigger.ProcessingTime(10.seconds)) .queryName(this.getClass.getSimpleName) .start() query.awaitTermination() }}