代码

  1. import com.mongodb.MongoClient;
  2. import com.mongodb.MongoClientURI;
  3. import com.mongodb.client.MongoCollection;
  4. import com.mongodb.client.MongoCursor;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapreduce.InputSplit;
  9. import org.apache.hadoop.mapreduce.JobContext;
  10. import org.apache.hadoop.mapreduce.RecordReader;
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.bson.Document;
  14. import java.io.DataInput;
  15. import java.io.DataOutput;
  16. import java.io.IOException;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. public class MongoInputFormat extends FileInputFormat<LongWritable, Document> {
  20. @Override
  21. public RecordReader<LongWritable, Document> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
  22. MongoRecordReader mongoRecordReader = new MongoRecordReader();
  23. mongoRecordReader.initialize(inputSplit, taskAttemptContext);
  24. return mongoRecordReader;
  25. }
  26. @Override
  27. public List<InputSplit> getSplits(JobContext job) throws IOException {
  28. Configuration configuration = job.getConfiguration();
  29. MongoClient mongoClient = new MongoClient(new MongoClientURI(configuration.get("mongo_uri")));
  30. MongoCollection<Document> collection = mongoClient.getDatabase(configuration.get("dbName"))
  31. .getCollection(configuration.get("collectionName"));
  32. long countDocuments = collection.countDocuments();
  33. long chunk = 1000;
  34. long chunkSize = (long) Math.ceil((countDocuments / (double)chunk));
  35. System.out.println(chunkSize);
  36. ArrayList<InputSplit> inputSplits = new ArrayList<>();
  37. for (int i = 0; i < chunkSize; i++) {
  38. MongoInputSplit mongoInputSplit = null;
  39. if (i + 1 == chunkSize) {
  40. mongoInputSplit = new MongoInputSplit(i * chunk, countDocuments);
  41. } else {
  42. mongoInputSplit = new MongoInputSplit(i * chunk, i * chunk + chunk);
  43. }
  44. inputSplits.add(mongoInputSplit);
  45. }
  46. return inputSplits;
  47. }
  48. public static class MongoInputSplit extends InputSplit implements Writable {
  49. private long start;
  50. private long end;
  51. public MongoInputSplit() {}
  52. public MongoInputSplit(long start, long end) {
  53. this.start = start;
  54. this.end = end;
  55. }
  56. @Override
  57. public void write(DataOutput dataOutput) throws IOException {
  58. dataOutput.writeLong(this.start);
  59. dataOutput.writeLong(this.end);
  60. }
  61. @Override
  62. public void readFields(DataInput dataInput) throws IOException {
  63. this.start = dataInput.readLong();
  64. this.end = dataInput.readLong();
  65. }
  66. @Override
  67. public long getLength() throws IOException, InterruptedException {
  68. return this.end - this.start;
  69. }
  70. @Override
  71. public String[] getLocations() throws IOException, InterruptedException {
  72. return new String[0];
  73. }
  74. public long getStart() {
  75. return start;
  76. }
  77. public void setStart(long start) {
  78. this.start = start;
  79. }
  80. public long getEnd() {
  81. return end;
  82. }
  83. public void setEnd(long end) {
  84. this.end = end;
  85. }
  86. }
  87. public static class MongoRecordReader extends RecordReader<LongWritable, Document> {
  88. private LongWritable key = new LongWritable();
  89. private Document value = new Document();
  90. private MongoInputSplit split;
  91. private MongoCursor<Document> dbCursor;
  92. private int index;
  93. private Configuration configuration;
  94. private MongoClient mongoClient;
  95. private MongoCollection<Document> collection;
  96. @Override
  97. public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
  98. this.split = (MongoInputSplit)inputSplit;
  99. this.configuration = taskAttemptContext.getConfiguration();
  100. mongoClient = new MongoClient(new MongoClientURI(this.configuration.get("mongo_uri")));
  101. this.collection = mongoClient.getDatabase(this.configuration.get("dbName"))
  102. .getCollection(this.configuration.get("collectionName"));
  103. }
  104. @Override
  105. public boolean nextKeyValue() throws IOException, InterruptedException {
  106. if (this.dbCursor == null) {
  107. this.dbCursor = this.collection.find()
  108. .skip((int) this.split.start).limit((int) this.split.getLength()).iterator();
  109. }
  110. if (!this.dbCursor.hasNext()) {
  111. return false;
  112. }
  113. this.key.set(this.split.start + this.index);
  114. this.value = this.dbCursor.next();
  115. System.out.println(this.value);
  116. return true;
  117. }
  118. @Override
  119. public LongWritable getCurrentKey() throws IOException, InterruptedException {
  120. return key;
  121. }
  122. @Override
  123. public Document getCurrentValue() throws IOException, InterruptedException {
  124. return value;
  125. }
  126. @Override
  127. public float getProgress() throws IOException, InterruptedException {
  128. return 0;
  129. }
  130. @Override
  131. public void close() throws IOException {
  132. this.mongoClient.close();
  133. }
  134. }
  135. }