练习题背景

  • 目标
    • 过滤出TaxiRide流数据行程中开始坐标和结束坐标都在纽约的行程,并输出到控制台。
    • 工具方法可以帮助判断是否在纽约,GeoUtils.isInNYC(float lon, float lat)
  • 期望输出
    • DataStream输出到标准输出
  • 涉及到的类
    • RideCleansingIntegrationTest
    • RideCleansingUnitTest
    • RideCleansingExercise
    • RideCleansingSolution
  • 其他说明
    • 所有的exercise都包括3个目录,main、solution、test。其中main是主要练习方法,其中有部分函数需要用户自己实现后才能运行,solution是已经完整实现好的代码,test用于案例测试。

代码解析

exercise

先整体看一下RideCleansingExercise.java这个主类。

  1. private final SourceFunction<TaxiRide> source;
  2. private final SinkFunction<TaxiRide> sink;
  3. /** Creates a job using the source and sink provided. */
  4. public RideCleansingExercise(SourceFunction<TaxiRide> source, SinkFunction<TaxiRide> sink) {
  5. this.source = source;
  6. this.sink = sink;
  7. }
  8. /**
  9. * Main method.
  10. *
  11. * @throws Exception which occurs during job execution.
  12. */
  13. public static void main(String[] args) throws Exception {
  14. RideCleansingExercise job =
  15. new RideCleansingExercise(new TaxiRideGenerator(), new PrintSinkFunction<>());
  16. job.execute();
  17. }

RideCleansingExercise的构造函数,接受一个source,接收一个sink。在main方法中实例化了一个对象,传入TaxiRideGenerator作为source,PrintSinkFunction作为sink。

  1. /**
  2. * Creates and executes the long rides pipeline.
  3. *
  4. * @return {JobExecutionResult}
  5. * @throws Exception which occurs during job execution.
  6. */
  7. public JobExecutionResult execute() throws Exception {
  8. // set up streaming execution environment
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. // set up the pipeline
  11. env.addSource(source).filter(new NYCFilter()).addSink(sink);
  12. // run the pipeline and return the result
  13. return env.execute("Taxi Ride Cleansing");
  14. }

execute执行方法中,对TaxiRide event进行接收、过滤、输出结果。注意execute()方法返回的结果是JobExecutionResult,这个结果后面在test中有用。上面代码中filter里面接收了一个NYCFilter对象作为参数,是需要自己实现的。下面仅仅是扔出了一个MissingSolutionException异常,需要填入真实的业务逻辑。

  1. /** Keep only those rides and both start and end in NYC. */
  2. public static class NYCFilter implements FilterFunction<TaxiRide> {
  3. @Override
  4. public boolean filter(TaxiRide taxiRide) throws Exception {
  5. throw new MissingSolutionException();
  6. }
  7. }

tests

  • 大部分练习都有一些单元测试加端到端的集成测试,例如
    • RideCleansingUnitTest:主要测试filter函数
    • RideCleansingIntegrationTest:测试整个完整的输入、操作、到输出完整的pipeline
  • 所有的project中的tests都会测试Exercise类,如果exercise抛出MissingSolutionException异常,那么程序会吞掉异常,将tests应用到对应的solution类上。也就是说,工程拿过来,即使不填代码也是可以直接test跑起来的。
  • 我们重点分析Java测试,scala测试暂时不做过多分析,有兴趣的自行查看