row_number方法的实现类是GenericUDAFRowNumber,本质就是一个UDAF函数。
    源码:

    1. public class GenericUDAFRowNumber extends AbstractGenericUDAFResolver {
    2. static final Logger LOG = LoggerFactory.getLogger(GenericUDAFRowNumber.class.getName());
    3. @Override
    4. public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
    5. if (parameters.length != 0) {
    6. throw new UDFArgumentTypeException(parameters.length - 1, "No argument is expected.");
    7. }
    8. return new GenericUDAFRowNumberEvaluator();
    9. }
    10. static class RowNumberBuffer implements AggregationBuffer {
    11. ArrayList<IntWritable> rowNums;
    12. int nextRow;
    13. void init() {
    14. rowNums = new ArrayList<IntWritable>();
    15. }
    16. RowNumberBuffer() {
    17. init();
    18. nextRow = 1;
    19. }
    20. void incr() {
    21. rowNums.add(new IntWritable(nextRow++));
    22. }
    23. }
    24. public static class GenericUDAFRowNumberEvaluator extends GenericUDAFEvaluator {
    25. @Override
    26. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
    27. super.init(m, parameters);
    28. if (m != Mode.COMPLETE) {
    29. throw new HiveException("Only COMPLETE mode supported for row_number function");
    30. }
    31. return ObjectInspectorFactory.getStandardListObjectInspector(
    32. PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    33. }
    34. @Override
    35. public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    36. return new RowNumberBuffer();
    37. }
    38. @Override
    39. public void reset(AggregationBuffer agg) throws HiveException {
    40. ((RowNumberBuffer) agg).init();
    41. }
    42. @Override
    43. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    44. ((RowNumberBuffer) agg).incr();
    45. }
    46. @Override
    47. public Object terminatePartial(AggregationBuffer agg) throws HiveException {
    48. throw new HiveException("terminatePartial not supported");
    49. }
    50. @Override
    51. public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    52. throw new HiveException("merge not supported");
    53. }
    54. @Override
    55. public Object terminate(AggregationBuffer agg) throws HiveException {
    56. return ((RowNumberBuffer) agg).rowNums;
    57. }
    58. }
    59. }

    row_number函数的buffer是一个列表:ArrayList rowNums,除此之外,还有一个指向下一行的索引:int nextRow。
    下面是行号的关键逻辑:

    1. @Override
    2. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    3. ((RowNumberBuffer) agg).incr();
    4. }

    实际上是调用RowNumberBuffer的累加方法incr(),将前的行号加进列表,并移动下一行的索引。

    1. void incr() {
    2. rowNums.add(new IntWritable(nextRow++));
    3. }


    方法最终将列表返回,如下:

    1. @Override
    2. public Object terminate(AggregationBuffer agg) throws HiveException {
    3. return ((RowNumberBuffer) agg).rowNums;
    4. }

    从源码里面可以看出,terminatePartial、merge是没有实现的,也就是说row_number只会在reduce上执行。