本人是个资深彩民,每周都会在彩票上花上50-100块钱买彩票,虽说一直没中。 上网时,经常听到别人讨论说,彩票是8点钟禁售,9点15分开奖,很多人都会想,这一个半时内,福彩中心会不会算一个最小人买的彩票呢。 刚好,最近在学买流式计算,尝试着用这个来算一下最小得奖。 当然,写这个东东没有说彩票造假,也没有特别的意思,只是想将学到的东西用起来的尝试。
- 设计:
- 把各个彩票站做一个客户端,所有彩票站都发送到一个消息队列(kafak),发送单注消息到kafka中。有人会说,彩票有复式的,这里我不管,因为复式也可以拆成多个单式.
- flink读取kafka的彩票数据,进行统计,统计最小的得奖结果,并输出结果
人员架构.jpg
- 算法: 如何用最简单的办法,算出哪一注是最小人买 的呢。 这里我换了个概念,具体如下:
- 将所有有可能选择的彩票结果都初始为1,都认为有人买了,共1107568组合
- 随机生成彩票,发送到kafka中
- flink把所有彩票都当成字符串(注意这个字符串是有序的),进行统计,统计出现最小的彩票 这样彩票的统计就变成统计最小的词频,这个统计词频的例子在flink里就有了。
随机生成彩票 示例生成代码如下:
public class BallCase {
/**
* 初始化所有红球
*/
private static ArrayList<Integer> redBalls = new ArrayList<Integer>();
/*
初始化所有蓝球
*/
private static ArrayList<Integer> blueBalls = new ArrayList<Integer>();
/**
* 初始化数据
*/
static {
for (int i = 1; i <= 33; i++) {
redBalls.add(i);
}
for (int i = 1; i <= 16; i++) {
blueBalls.add(i);
}
}
private static Properties kafkaProps = new Properties();
static {
kafkaProps.put("bootstrap.servers", "127.0.0.1:9092");
kafkaProps.put("acks", "all");
kafkaProps.put("retries", 0);
kafkaProps.put("batch.size", 16384);
kafkaProps.put("linger.ms", 1);
kafkaProps.put("buffer.memory", 33554432);
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
/**
* 随机生成6个红球
* @return
*/
private List<String> chooseRedBall() {
List<Integer> resultRedBalls = new ArrayList<>();
List<Integer> redBallsClone = (ArrayList<Integer>) redBalls.clone();
int[] ballorder = {33, 32, 31, 30, 29, 28};
for (int i = 0; i < ballorder.length; i++) {
int choseRed = this.chooseBall(ballorder[i]);
resultRedBalls.add(redBallsClone.get(choseRed));
redBallsClone.remove(choseRed);
}
return resultRedBalls.stream().sorted().map(String::valueOf).collect(Collectors.toList());
}
/**
* 随机生成1个蓝球
* @return
*/
private String chooseBlueBall() {
int choseBlue = this.chooseBall(16);
return blueBalls.get(choseBlue) + "";
}
/**
* 生成随机数
* @param ranger
* @return
*/
private Integer chooseBall(int ranger) {
Random random = new Random();
return random.nextInt(ranger);
}
/**
* 获取随机双色球
* @return
*/
private String getRandomBall() {
List<String> balls = this.chooseRedBall();
balls.add(this.chooseBlueBall());
return String.join(",", balls);
}
public static void main(String[] args) {
BallCase ballCase = new BallCase();
Long statTime = Instant.now().getEpochSecond();
//初始化所有双色球选项
for (int ball1 = 1; ball1 < 29; ball1++) {
for (int ball2 = ball1+1; ball2 < 30; ball2++) {
for (int ball3 = ball2+1; ball3 < 31; ball3++) {
for (int ball4 = ball3+1; ball4 < 32; ball4++) {
for (int ball5 = ball4+1; ball5 < 33; ball5++) {
for (int ball6 = ball5+1; ball6 < 34; ball6++) {
for (int ball7 =1; ball7<=16; ball7++) {
ballCase.sendKafka(ball1+","+ball2+","+ball3+","+ball4+","+ball5+","+ball6+","+ball7 );
}
}
}
}
}
}
}
Long endTime = Instant.now().getEpochSecond();
System.out.println(endTime - statTime);
System.out.println(statTime);
System.out.println(endTime);
System.out.println("开始时间:"+new Date());
//随机生成双色球数目
for (int ball1 = 1; ball1 < 186864001; ball1++) {
ballCase.sendKafka(ballCase.getRandomBall());
}
System.out.println("结束时间:"+new Date());
}
/**
* 发送到kafka
* @param balls
*/
public void sendKafka(String balls) {
System.out.println(balls);
Producer<String, String> producer = new KafkaProducer<>(kafkaProps);
producer.send(new ProducerRecord<>("test", balls), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Failed to send message with exception " + exception);
}
}
});
producer.close();
}
}
流式计算统计最小复奖 flink写这个很简单,代码如下:
public class KafkaDeme {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 告诉系统按照 EventTime 处理
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 为了打印到控制台的结果不乱序,我们配置全局的并发为1,改变并发对结果正确性没有影响
env.setParallelism(1);
//默认情况下,检查点被禁用。要启用检查点,请在StreamExecutionEnvironment上调用enableCheckpointing(n)方法,
// 其中n是以毫秒为单位的检查点间隔。每隔5000 ms进行启动一个检查点,则下一个检查点将在上一个检查点完成后5秒钟内启动
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");//kafka的节点的IP或者hostName,多个使用逗号分隔
properties.setProperty("zookeeper.connect", "127.0.0.1:2181");//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔
properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消费者的group.id
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),
properties);//test0是kafka中开启的topic
DataStream<String> keyedStream = env.addSource(myConsumer);//将kafka生产者发来的数据进行处理,本例子我进任何处理
//计数
DataStream<Tuple2<String, Integer>> ds= keyedStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
System.out.println(s);
collector.collect(new Tuple2<String, Integer>(s, 1));
}
});
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0) //按照Tuple2<String, Integer>的第一个元素为key,也就是单词
.window(SlidingProcessingTimeWindows.of(Time.minutes(2),Time.minutes(1)))
//key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
.sum(1);;
DataStream<Tuple2<String, Integer>> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
//所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(1));//计算该窗口TopN
ret.writeAsText("D:\\logs\\log.txt",FileSystem.WriteMode.OVERWRITE);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
private static class TopNAllFunction
extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) {
this.topSize = topSize;
}
@Override
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input,
Collector<Tuple2<String, Integer>> out) throws Exception {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
return (x < y) ? 1 : -1;
}
}); //treemap按照key降序排列,相同count值不覆盖
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN个元素
treemap.pollLastEntry();
}
}
for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
}
验证
- 当数据很小时,很快就算出结果,基本是秒出。
- 当以大数据处理时,并且时间窗口调成一个小时,发现kafka的数据处理不过来了。当然这个可能是我本机即是服务端,客户端有关系.如果有更好的机子,估计会更快. flink是刚学习的,不一定正确,只是个玩票的,如果有发现问题,请留言。 文章写完了,赶紧再去买几注彩票压压惊。