本人是个资深彩民,每周都会在彩票上花上50-100块钱买彩票,虽说一直没中。 上网时,经常听到别人讨论说,彩票是8点钟禁售,9点15分开奖,很多人都会想,这一个半时内,福彩中心会不会算一个最小人买的彩票呢。 刚好,最近在学买流式计算,尝试着用这个来算一下最小得奖。 当然,写这个东东没有说彩票造假,也没有特别的意思,只是想将学到的东西用起来的尝试。

    • 设计:
    1. 把各个彩票站做一个客户端,所有彩票站都发送到一个消息队列(kafak),发送单注消息到kafka中。有人会说,彩票有复式的,这里我不管,因为复式也可以拆成多个单式.
    2. flink读取kafka的彩票数据,进行统计,统计最小的得奖结果,并输出结果

    image.png
    人员架构.jpg

    • 算法: 如何用最简单的办法,算出哪一注是最小人买 的呢。 这里我换了个概念,具体如下:
    1. 将所有有可能选择的彩票结果都初始为1,都认为有人买了,共1107568组合
    2. 随机生成彩票,发送到kafka中
    3. flink把所有彩票都当成字符串(注意这个字符串是有序的),进行统计,统计出现最小的彩票 这样彩票的统计就变成统计最小的词频,这个统计词频的例子在flink里就有了。

    随机生成彩票 示例生成代码如下:

    1. public class BallCase {
    2. /**
    3. * 初始化所有红球
    4. */
    5. private static ArrayList<Integer> redBalls = new ArrayList<Integer>();
    6. /*
    7. 初始化所有蓝球
    8. */
    9. private static ArrayList<Integer> blueBalls = new ArrayList<Integer>();
    10. /**
    11. * 初始化数据
    12. */
    13. static {
    14. for (int i = 1; i <= 33; i++) {
    15. redBalls.add(i);
    16. }
    17. for (int i = 1; i <= 16; i++) {
    18. blueBalls.add(i);
    19. }
    20. }
    21. private static Properties kafkaProps = new Properties();
    22. static {
    23. kafkaProps.put("bootstrap.servers", "127.0.0.1:9092");
    24. kafkaProps.put("acks", "all");
    25. kafkaProps.put("retries", 0);
    26. kafkaProps.put("batch.size", 16384);
    27. kafkaProps.put("linger.ms", 1);
    28. kafkaProps.put("buffer.memory", 33554432);
    29. kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    30. kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    31. }
    32. /**
    33. * 随机生成6个红球
    34. * @return
    35. */
    36. private List<String> chooseRedBall() {
    37. List<Integer> resultRedBalls = new ArrayList<>();
    38. List<Integer> redBallsClone = (ArrayList<Integer>) redBalls.clone();
    39. int[] ballorder = {33, 32, 31, 30, 29, 28};
    40. for (int i = 0; i < ballorder.length; i++) {
    41. int choseRed = this.chooseBall(ballorder[i]);
    42. resultRedBalls.add(redBallsClone.get(choseRed));
    43. redBallsClone.remove(choseRed);
    44. }
    45. return resultRedBalls.stream().sorted().map(String::valueOf).collect(Collectors.toList());
    46. }
    47. /**
    48. * 随机生成1个蓝球
    49. * @return
    50. */
    51. private String chooseBlueBall() {
    52. int choseBlue = this.chooseBall(16);
    53. return blueBalls.get(choseBlue) + "";
    54. }
    55. /**
    56. * 生成随机数
    57. * @param ranger
    58. * @return
    59. */
    60. private Integer chooseBall(int ranger) {
    61. Random random = new Random();
    62. return random.nextInt(ranger);
    63. }
    64. /**
    65. * 获取随机双色球
    66. * @return
    67. */
    68. private String getRandomBall() {
    69. List<String> balls = this.chooseRedBall();
    70. balls.add(this.chooseBlueBall());
    71. return String.join(",", balls);
    72. }
    73. public static void main(String[] args) {
    74. BallCase ballCase = new BallCase();
    75. Long statTime = Instant.now().getEpochSecond();
    76. //初始化所有双色球选项
    77. for (int ball1 = 1; ball1 < 29; ball1++) {
    78. for (int ball2 = ball1+1; ball2 < 30; ball2++) {
    79. for (int ball3 = ball2+1; ball3 < 31; ball3++) {
    80. for (int ball4 = ball3+1; ball4 < 32; ball4++) {
    81. for (int ball5 = ball4+1; ball5 < 33; ball5++) {
    82. for (int ball6 = ball5+1; ball6 < 34; ball6++) {
    83. for (int ball7 =1; ball7<=16; ball7++) {
    84. ballCase.sendKafka(ball1+","+ball2+","+ball3+","+ball4+","+ball5+","+ball6+","+ball7 );
    85. }
    86. }
    87. }
    88. }
    89. }
    90. }
    91. }
    92. Long endTime = Instant.now().getEpochSecond();
    93. System.out.println(endTime - statTime);
    94. System.out.println(statTime);
    95. System.out.println(endTime);
    96. System.out.println("开始时间:"+new Date());
    97. //随机生成双色球数目
    98. for (int ball1 = 1; ball1 < 186864001; ball1++) {
    99. ballCase.sendKafka(ballCase.getRandomBall());
    100. }
    101. System.out.println("结束时间:"+new Date());
    102. }
    103. /**
    104. * 发送到kafka
    105. * @param balls
    106. */
    107. public void sendKafka(String balls) {
    108. System.out.println(balls);
    109. Producer<String, String> producer = new KafkaProducer<>(kafkaProps);
    110. producer.send(new ProducerRecord<>("test", balls), new Callback() {
    111. @Override
    112. public void onCompletion(RecordMetadata metadata, Exception exception) {
    113. if (exception != null) {
    114. System.out.println("Failed to send message with exception " + exception);
    115. }
    116. }
    117. });
    118. producer.close();
    119. }
    120. }

    流式计算统计最小复奖 flink写这个很简单,代码如下:

    1. public class KafkaDeme {
    2. public static void main(String[] args) throws Exception {
    3. // set up the streaming execution environment
    4. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. // 告诉系统按照 EventTime 处理
    6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    7. // 为了打印到控制台的结果不乱序,我们配置全局的并发为1,改变并发对结果正确性没有影响
    8. env.setParallelism(1);
    9. //默认情况下,检查点被禁用。要启用检查点,请在StreamExecutionEnvironment上调用enableCheckpointing(n)方法,
    10. // 其中n是以毫秒为单位的检查点间隔。每隔5000 ms进行启动一个检查点,则下一个检查点将在上一个检查点完成后5秒钟内启动
    11. env.enableCheckpointing(5000);
    12. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    13. Properties properties = new Properties();
    14. properties.setProperty("bootstrap.servers", "127.0.0.1:9092");//kafka的节点的IP或者hostName,多个使用逗号分隔
    15. properties.setProperty("zookeeper.connect", "127.0.0.1:2181");//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔
    16. properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消费者的group.id
    17. FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),
    18. properties);//test0是kafka中开启的topic
    19. DataStream<String> keyedStream = env.addSource(myConsumer);//将kafka生产者发来的数据进行处理,本例子我进任何处理
    20. //计数
    21. DataStream<Tuple2<String, Integer>> ds= keyedStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    22. @Override
    23. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
    24. System.out.println(s);
    25. collector.collect(new Tuple2<String, Integer>(s, 1));
    26. }
    27. });
    28. DataStream<Tuple2<String, Integer>> wcount = ds
    29. .keyBy(0) //按照Tuple2<String, Integer>的第一个元素为key,也就是单词
    30. .window(SlidingProcessingTimeWindows.of(Time.minutes(2),Time.minutes(1)))
    31. //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
    32. .sum(1);;
    33. DataStream<Tuple2<String, Integer>> ret = wcount
    34. .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    35. //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
    36. .process(new TopNAllFunction(1));//计算该窗口TopN
    37. ret.writeAsText("D:\\logs\\log.txt",FileSystem.WriteMode.OVERWRITE);
    38. // execute program
    39. env.execute("Flink Streaming Java API Skeleton");
    40. }
    41. private static class TopNAllFunction
    42. extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
    43. private int topSize = 10;
    44. public TopNAllFunction(int topSize) {
    45. this.topSize = topSize;
    46. }
    47. @Override
    48. public void process(
    49. ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
    50. Iterable<Tuple2<String, Integer>> input,
    51. Collector<Tuple2<String, Integer>> out) throws Exception {
    52. TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
    53. new Comparator<Integer>() {
    54. @Override
    55. public int compare(Integer y, Integer x) {
    56. return (x < y) ? 1 : -1;
    57. }
    58. }); //treemap按照key降序排列,相同count值不覆盖
    59. for (Tuple2<String, Integer> element : input) {
    60. treemap.put(element.f1, element);
    61. if (treemap.size() > topSize) { //只保留前面TopN个元素
    62. treemap.pollLastEntry();
    63. }
    64. }
    65. for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
    66. .entrySet()) {
    67. out.collect(entry.getValue());
    68. }
    69. }
    70. }
    71. }

    验证

    1. 当数据很小时,很快就算出结果,基本是秒出。
    2. 当以大数据处理时,并且时间窗口调成一个小时,发现kafka的数据处理不过来了。当然这个可能是我本机即是服务端,客户端有关系.如果有更好的机子,估计会更快. flink是刚学习的,不一定正确,只是个玩票的,如果有发现问题,请留言。 文章写完了,赶紧再去买几注彩票压压惊。