代码
import com.mongodb.MongoClient;import com.mongodb.MongoClientURI;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoCursor;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.bson.Document;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class MongoInputFormat extends FileInputFormat<LongWritable, Document> { @Override public RecordReader<LongWritable, Document> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { MongoRecordReader mongoRecordReader = new MongoRecordReader(); mongoRecordReader.initialize(inputSplit, taskAttemptContext); return mongoRecordReader; } @Override public List<InputSplit> getSplits(JobContext job) throws IOException { Configuration configuration = job.getConfiguration(); MongoClient mongoClient = new MongoClient(new MongoClientURI(configuration.get("mongo_uri"))); MongoCollection<Document> collection = mongoClient.getDatabase(configuration.get("dbName")) .getCollection(configuration.get("collectionName")); long countDocuments = collection.countDocuments(); long chunk = 1000; long chunkSize = (long) Math.ceil((countDocuments / (double)chunk)); System.out.println(chunkSize); ArrayList<InputSplit> inputSplits = new ArrayList<>(); for (int i = 0; i < chunkSize; i++) { MongoInputSplit mongoInputSplit = null; if (i + 1 == chunkSize) { mongoInputSplit = new MongoInputSplit(i * chunk, countDocuments); } else { mongoInputSplit = new MongoInputSplit(i * chunk, i * chunk + chunk); } inputSplits.add(mongoInputSplit); } return inputSplits; } public static class MongoInputSplit extends InputSplit implements Writable { private long start; private long end; public MongoInputSplit() {} public MongoInputSplit(long start, long end) { this.start = start; this.end = end; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(this.start); dataOutput.writeLong(this.end); } @Override public void readFields(DataInput dataInput) throws IOException { this.start = dataInput.readLong(); this.end = dataInput.readLong(); } @Override public long getLength() throws IOException, InterruptedException { return this.end - this.start; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[0]; } public long getStart() { return start; } public void setStart(long start) { this.start = start; } public long getEnd() { return end; } public void setEnd(long end) { this.end = end; } } public static class MongoRecordReader extends RecordReader<LongWritable, Document> { private LongWritable key = new LongWritable(); private Document value = new Document(); private MongoInputSplit split; private MongoCursor<Document> dbCursor; private int index; private Configuration configuration; private MongoClient mongoClient; private MongoCollection<Document> collection; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.split = (MongoInputSplit)inputSplit; this.configuration = taskAttemptContext.getConfiguration(); mongoClient = new MongoClient(new MongoClientURI(this.configuration.get("mongo_uri"))); this.collection = mongoClient.getDatabase(this.configuration.get("dbName")) .getCollection(this.configuration.get("collectionName")); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (this.dbCursor == null) { this.dbCursor = this.collection.find() .skip((int) this.split.start).limit((int) this.split.getLength()).iterator(); } if (!this.dbCursor.hasNext()) { return false; } this.key.set(this.split.start + this.index); this.value = this.dbCursor.next(); System.out.println(this.value); return true; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Document getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { this.mongoClient.close(); } }}