row_number方法的实现类是GenericUDAFRowNumber,本质就是一个UDAF函数。
源码:
public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver {static final Logger LOG = LoggerFactory.getLogger(GenericUDAFRowNumber.class.getName());@Overridepublic GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {if (parameters.length != 0) {throw new UDFArgumentTypeException(parameters.length - 1, "No argument is expected.");}return new GenericUDAFRowNumberEvaluator();}static class RowNumberBuffer implements AggregationBuffer {ArrayList<IntWritable> rowNums;int nextRow;void init() {rowNums = new ArrayList<IntWritable>();}RowNumberBuffer() {init();nextRow = 1;}void incr() {rowNums.add(new IntWritable(nextRow++));}}public static class GenericUDAFRowNumberEvaluator extends GenericUDAFEvaluator {@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);if (m != Mode.COMPLETE) {throw new HiveException("Only COMPLETE mode supported for row_number function");}return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableIntObjectInspector);}@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {return new RowNumberBuffer();}@Overridepublic void reset(AggregationBuffer agg) throws HiveException {((RowNumberBuffer) agg).init();}@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {((RowNumberBuffer) agg).incr();}@Overridepublic Object terminatePartial(AggregationBuffer agg) throws HiveException {throw new HiveException("terminatePartial not supported");}@Overridepublic void merge(AggregationBuffer agg, Object partial) throws HiveException {throw new HiveException("merge not supported");}@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {return ((RowNumberBuffer) agg).rowNums;}}}
row_number函数的buffer是一个列表:ArrayList
下面是行号的关键逻辑:
@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {((RowNumberBuffer) agg).incr();}
实际上是调用RowNumberBuffer的累加方法incr(),将前的行号加进列表,并移动下一行的索引。
void incr() {rowNums.add(new IntWritable(nextRow++));}
方法最终将列表返回,如下:
@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {return ((RowNumberBuffer) agg).rowNums;}
从源码里面可以看出,terminatePartial、merge是没有实现的,也就是说row_number只会在reduce上执行。
