参考:https://www.jianshu.com/p/ce737d2fc8fc

    Receiver提高吞吐方式:

    • 提高并行度:修改 block time,分区数 = batch time / block time
    • 提高batch time读取kafka的量(前提是kafka吞吐足够):通过创建多个streaming,然后再 union

      1. JavaPairDStream<String, String> streaming = null;
      2. if (numPartitions > 1) {
      3. List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList(numPartitions);
      4. for(int i = 0; i < receiverNum; ++i) {
      5. kafkaStreams.add(KafkaUtilsExt.createStream(jssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER()));
      6. }
      7. streaming = jssc.union((JavaPairDStream)kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
      8. } else {
      9. streaming = KafkaUtilsExt.createStream(jssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER());
      10. }