练习题背景
- 目标
- 过滤出TaxiRide流数据行程中开始坐标和结束坐标都在纽约的行程,并输出到控制台。
- 工具方法可以帮助判断是否在纽约,GeoUtils.isInNYC(float lon, float lat)
- 期望输出
- DataStream
输出到标准输出
- DataStream
- 涉及到的类
- RideCleansingIntegrationTest
- RideCleansingUnitTest
- RideCleansingExercise
- RideCleansingSolution
- 其他说明
- 所有的exercise都包括3个目录,main、solution、test。其中main是主要练习方法,其中有部分函数需要用户自己实现后才能运行,solution是已经完整实现好的代码,test用于案例测试。
代码解析
exercise
先整体看一下RideCleansingExercise.java这个主类。
private final SourceFunction<TaxiRide> source;
private final SinkFunction<TaxiRide> sink;
/** Creates a job using the source and sink provided. */
public RideCleansingExercise(SourceFunction<TaxiRide> source, SinkFunction<TaxiRide> sink) {
this.source = source;
this.sink = sink;
}
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
RideCleansingExercise job =
new RideCleansingExercise(new TaxiRideGenerator(), new PrintSinkFunction<>());
job.execute();
}
RideCleansingExercise的构造函数,接受一个source,接收一个sink。在main方法中实例化了一个对象,传入TaxiRideGenerator作为source,PrintSinkFunction作为sink。
/**
* Creates and executes the long rides pipeline.
*
* @return {JobExecutionResult}
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up the pipeline
env.addSource(source).filter(new NYCFilter()).addSink(sink);
// run the pipeline and return the result
return env.execute("Taxi Ride Cleansing");
}
execute执行方法中,对TaxiRide event进行接收、过滤、输出结果。注意execute()方法返回的结果是JobExecutionResult,这个结果后面在test中有用。上面代码中filter里面接收了一个NYCFilter对象作为参数,是需要自己实现的。下面仅仅是扔出了一个MissingSolutionException异常,需要填入真实的业务逻辑。
/** Keep only those rides and both start and end in NYC. */
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
throw new MissingSolutionException();
}
}
tests
- 大部分练习都有一些单元测试加端到端的集成测试,例如
- RideCleansingUnitTest:主要测试filter函数
- RideCleansingIntegrationTest:测试整个完整的输入、操作、到输出完整的pipeline
- 所有的project中的tests都会测试Exercise类,如果exercise抛出MissingSolutionException异常,那么程序会吞掉异常,将tests应用到对应的solution类上。也就是说,工程拿过来,即使不填代码也是可以直接test跑起来的。
- 我们重点分析Java测试,scala测试暂时不做过多分析,有兴趣的自行查看