多线程批量执行任务

  1. package com.gitee.kooder.utils;
  2. import java.util.List;
  3. import java.util.concurrent.ForkJoinTask;
  4. import java.util.concurrent.RecursiveAction;
  5. import java.util.function.Consumer;
  6. /**
  7. * Batch task action
  8. * @author Winter Lau<javayou@gmail.com>
  9. */
  10. public final class BatchTaskRunner extends RecursiveAction {
  11. protected int threshold = 5;
  12. protected List taskList;
  13. Consumer<List> action;
  14. /**
  15. * @param taskList 任务列表
  16. * @param threshold 每个线程处理的任务数
  17. */
  18. private BatchTaskRunner(List taskList, int threshold, Consumer action) {
  19. this.taskList = taskList;
  20. this.threshold = threshold;
  21. this.action = action;
  22. }
  23. /**
  24. * 多线程批量执行任务
  25. * @param taskList
  26. * @param threshold
  27. * @param action
  28. */
  29. public static <T> void execute(List<T> taskList, int threshold, Consumer<List<T>> action) {
  30. new BatchTaskRunner(taskList, threshold, action).invoke();
  31. }
  32. @Override
  33. protected void compute() {
  34. if (taskList.size() <= threshold) {
  35. this.action.accept(taskList);
  36. }
  37. else {
  38. this.splitFromMiddle(taskList);
  39. }
  40. }
  41. /**
  42. * 任务中分
  43. * @param list
  44. */
  45. private void splitFromMiddle(List list) {
  46. int middle = (int)Math.ceil(list.size() / 2.0);
  47. List leftList = list.subList(0, middle);
  48. List RightList = list.subList(middle, list.size());
  49. BatchTaskRunner left = newInstance(leftList);
  50. BatchTaskRunner right = newInstance(RightList);
  51. ForkJoinTask.invokeAll(left, right);
  52. }
  53. private BatchTaskRunner newInstance(List taskList) {
  54. return new BatchTaskRunner(taskList, threshold, action);
  55. }
  56. }
  1. /**
  2. * 读取指定目录下所有 json 文件并写入索引
  3. * @param type
  4. * @param action
  5. * @param path
  6. * @param thread_count
  7. * @return file count
  8. */
  9. private static int importJsonInPath(String type, String action, Path path, int thread_count) throws IOException {
  10. final AtomicInteger fc = new AtomicInteger(0);
  11. thread_count = Math.min(MAX_THREAD_COUNT, Math.max(thread_count, 1));
  12. try (
  13. IndexWriter writer = StorageFactory.getIndexWriter(type);
  14. TaxonomyWriter taxonomyWriter = StorageFactory.getTaxonomyWriter(type);
  15. Stream<Path> pathStream = Files.list(path);
  16. ) {
  17. List<Path> allFiles = pathStream.filter(p -> p.toString().endsWith(".json") && !Files.isDirectory(p)).collect(Collectors.toList());
  18. int threshold = Math.max(allFiles.size()/thread_count, 1);
  19. BatchTaskRunner.execute(allFiles, threshold, files -> {
  20. files.forEach( jsonFile -> {
  21. importJsonFile(type, action, jsonFile, writer, taxonomyWriter);
  22. fc.addAndGet(1);
  23. });
  24. });
  25. }
  26. return fc.get();
  27. }

redis队列

  1. package com.gitee.kooder.queue;
  2. import java.util.Collection;
  3. import java.util.List;
  4. /**
  5. * 定义了获取索引任务的队列接口
  6. * @author Winter Lau<javayou@gmail.com>
  7. */
  8. public interface Queue extends AutoCloseable{
  9. /**
  10. * 队列的唯一名称
  11. * @return
  12. */
  13. String type();
  14. /**
  15. * 添加任务到队列
  16. * @param tasks
  17. */
  18. void push(Collection<QueueTask> tasks) ;
  19. /**
  20. * 从队列获取任务
  21. * @return
  22. */
  23. List<QueueTask> pop(int count) ;
  24. }
  1. package com.gitee.kooder.queue;
  2. import com.gitee.kooder.core.KooderConfig;
  3. import org.apache.commons.lang3.StringUtils;
  4. import java.util.Properties;
  5. /**
  6. * 队列工厂
  7. * @author Winter Lau<javayou@gmail.com>
  8. */
  9. public class QueueFactory {
  10. static QueueProvider provider;
  11. static {
  12. Properties props = KooderConfig.getQueueProperties();
  13. String type = StringUtils.trim(props.getProperty("provider"));
  14. if("redis".equalsIgnoreCase(type))
  15. provider = new RedisQueueProvider(props);
  16. else if("embed".equalsIgnoreCase(type))
  17. provider = new EmbedQueueProvider(props);
  18. }
  19. public final static QueueProvider getProvider() {
  20. return provider;
  21. }
  22. }
  1. package com.gitee.kooder.queue;
  2. import com.gitee.kooder.core.Constants;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. /**
  6. * 定义了获取索引任务的队列接口
  7. * @author Winter Lau<javayou@gmail.com>
  8. */
  9. public interface QueueProvider extends AutoCloseable {
  10. List<String> TYPES = Arrays.asList(Constants.TYPE_REPOSITORY, Constants.TYPE_ISSUE, Constants.TYPE_CODE);
  11. /**
  12. * Provider 唯一标识
  13. * @return
  14. */
  15. String name();
  16. /**
  17. * 获取支持的所有任务类型
  18. * @return
  19. */
  20. default List<String> getAllTypes() {
  21. return TYPES;
  22. }
  23. /**
  24. * 获取某个任务类型的队列
  25. * @param type
  26. * @return
  27. */
  28. Queue queue(String type);
  29. }
  1. package com.gitee.kooder.queue;
  2. import io.lettuce.core.RedisClient;
  3. import io.lettuce.core.RedisURI;
  4. import io.lettuce.core.api.StatefulRedisConnection;
  5. import io.lettuce.core.api.sync.RedisCommands;
  6. import org.apache.commons.lang3.math.NumberUtils;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.ArrayList;
  10. import java.util.Collection;
  11. import java.util.List;
  12. import java.util.Properties;
  13. /**
  14. * 使用 Redis 队列
  15. * @author Winter Lau<javayou@gmail.com>
  16. */
  17. public class RedisQueueProvider implements QueueProvider {
  18. private final static Logger log = LoggerFactory.getLogger(RedisQueueProvider.class);
  19. private String host;
  20. private int port;
  21. private int database;
  22. private String baseKey;
  23. private String username;
  24. private String password;
  25. private RedisClient client;
  26. /**
  27. * Connect to redis
  28. * @param props
  29. */
  30. public RedisQueueProvider(Properties props) {
  31. this.host = props.getProperty("redis.host", "127.0.0.1");
  32. this.port = NumberUtils.toInt(props.getProperty("redis.port"), 6379);
  33. this.database = NumberUtils.toInt(props.getProperty("redis.database"), 1);
  34. this.baseKey = props.getProperty("redis.key", "gsearch-queue");
  35. this.username = props.getProperty("username");
  36. this.password = props.getProperty("password");
  37. RedisURI uri = RedisURI.create(host,port);
  38. uri.setDatabase(this.database);
  39. if(password != null)
  40. uri.setPassword(password.toCharArray());
  41. if(username != null)
  42. uri.setUsername(username);
  43. this.client = RedisClient.create(uri);
  44. log.info("Connected to {} at {}}:{}}\n", getRedisVersion(), this.host, this.port);
  45. }
  46. private String getRedisVersion() {
  47. try (StatefulRedisConnection<String, String> connection = client.connect()) {
  48. RedisCommands<String, String> cmd = connection.sync();
  49. return cmd.info("redis_version");
  50. }
  51. }
  52. @Override
  53. public String name() {
  54. return "redis";
  55. }
  56. @Override
  57. public Queue queue(String type) {
  58. return new Queue() {
  59. private String key = type + '@' + baseKey;
  60. @Override
  61. public String type() {
  62. return type;
  63. }
  64. @Override
  65. public void push(Collection<QueueTask> tasks) {
  66. try (StatefulRedisConnection<String, String> connection = client.connect()) {
  67. RedisCommands<String, String> cmd = connection.sync();
  68. cmd.rpush(key, tasks.stream().map(t -> t.json()).toArray(String[]::new));
  69. }
  70. }
  71. @Override
  72. public List<QueueTask> pop(int count) {
  73. String json = null;
  74. List<QueueTask> tasks = new ArrayList<>();
  75. try (StatefulRedisConnection<String, String> connection = client.connect()) {
  76. RedisCommands<String, String> cmd = connection.sync();
  77. do{
  78. json = cmd.lpop(key);
  79. if(json == null)
  80. break;
  81. QueueTask task = QueueTask.parse(json);
  82. if(task != null)
  83. tasks.add(task);
  84. }while(tasks.size() < count);
  85. }
  86. return tasks;
  87. }
  88. @Override
  89. public void close() {}
  90. };
  91. }
  92. @Override
  93. public void close() {
  94. client.shutdown();
  95. }
  96. }
  1. package com.gitee.kooder.queue;
  2. import com.gitee.kooder.core.KooderConfig;
  3. import org.apache.commons.lang3.math.NumberUtils;
  4. import org.infobip.lib.popout.FileQueue;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.io.IOException;
  8. import java.nio.file.Files;
  9. import java.nio.file.Path;
  10. import java.util.*;
  11. import java.util.concurrent.ConcurrentHashMap;
  12. /**
  13. * 实现 Gitee Search 内嵌式的队列,不依赖第三方服务,通过 HTTP 方式提供对象获取
  14. * @author Winter Lau<javayou@gmail.com>
  15. */
  16. public class EmbedQueueProvider implements QueueProvider {
  17. private final static Logger log = LoggerFactory.getLogger(EmbedQueueProvider.class);
  18. private Map<String, FileQueue<QueueTask>> fileQueues = new ConcurrentHashMap<>();
  19. public EmbedQueueProvider(Properties props) {
  20. int batch_size = NumberUtils.toInt(props.getProperty("embed.batch_size", "10000"), 10000);
  21. Path path = checkoutPath(KooderConfig.getPath(props.getProperty("embed.path")));
  22. for(String type : getAllTypes()) {
  23. Path typePath = checkoutPath(path.resolve(type));
  24. fileQueues.put(type, FileQueue.<QueueTask>batched().name(type)
  25. .folder(typePath)
  26. .restoreFromDisk(true)
  27. .batchSize(batch_size)
  28. .build());
  29. }
  30. }
  31. private static Path checkoutPath(Path path) {
  32. if(!Files.exists(path) || !Files.isDirectory(path)) {
  33. log.warn("Path '{}' for queue storage not exists, created it!", path);
  34. try {
  35. Files.createDirectories(path);
  36. } catch(IOException e) {
  37. log.error("Failed to create directory '{}'", path, e);
  38. }
  39. }
  40. return path;
  41. }
  42. /**
  43. * 队列的唯一名称
  44. *
  45. * @return
  46. */
  47. @Override
  48. public String name() {
  49. return "embed";
  50. }
  51. /**
  52. * 获取某个任务类型的队列
  53. *
  54. * @param type
  55. * @return
  56. */
  57. @Override
  58. public Queue queue(String type) {
  59. return new Queue() {
  60. @Override
  61. public String type() {
  62. return type;
  63. }
  64. @Override
  65. public void push(Collection<QueueTask> tasks) {
  66. fileQueues.get(type).addAll(tasks);
  67. }
  68. @Override
  69. public List<QueueTask> pop(int count) {
  70. List<QueueTask> tasks = new ArrayList<>();
  71. QueueTask task;
  72. while(tasks.size() < count && (task = fileQueues.get(type).poll()) != null)
  73. tasks.add(task);
  74. return tasks;
  75. }
  76. @Override
  77. public void close() {
  78. fileQueues.get(type).close();
  79. }
  80. };
  81. }
  82. @Override
  83. public void close() {
  84. fileQueues.values().forEach(q -> q.close());
  85. }
  86. }
  1. package com.gitee.kooder.queue;
  2. import java.io.IOException;
  3. import java.io.Serializable;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. import java.util.Map;
  8. import com.fasterxml.jackson.annotation.JsonIgnore;
  9. import com.fasterxml.jackson.annotation.JsonProperty;
  10. import com.fasterxml.jackson.core.type.TypeReference;
  11. import com.gitee.kooder.models.CodeRepository;
  12. import com.gitee.kooder.core.Constants;
  13. import com.gitee.kooder.index.IndexManager;
  14. import com.gitee.kooder.models.Issue;
  15. import com.gitee.kooder.models.Repository;
  16. import com.gitee.kooder.models.Searchable;
  17. import com.gitee.kooder.utils.JsonUtils;
  18. import org.apache.commons.beanutils.BeanUtils;
  19. import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
  20. import org.apache.lucene.index.IndexWriter;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. /**
  24. * 队列中的任务
  25. * @author Winter Lau<javayou@gmail.com>
  26. */
  27. public class QueueTask implements Serializable {
  28. private transient final static Logger log = LoggerFactory.getLogger(QueueTask.class);
  29. public transient final static List<String> types = Arrays.asList(
  30. Constants.TYPE_CODE,
  31. Constants.TYPE_REPOSITORY,
  32. Constants.TYPE_ISSUE,
  33. Constants.TYPE_PR,
  34. Constants.TYPE_COMMIT,
  35. Constants.TYPE_WIKI,
  36. Constants.TYPE_USER
  37. );
  38. public transient final static String ACTION_ADD = "add"; //添加
  39. public transient final static String ACTION_UPDATE = "update"; //修改
  40. public transient final static String ACTION_DELETE = "delete"; //删除
  41. private String type; //对象类型
  42. private String action; //动作(添加、删除、修改)
  43. private List<Searchable> objects = new ArrayList<>(); //objects list
  44. public QueueTask(){}
  45. public static void push(String type, String action, Searchable...obj){
  46. QueueTask task = new QueueTask();
  47. task.type = type;
  48. task.action = action;
  49. task.objects.addAll(Arrays.asList(obj));
  50. QueueFactory.getProvider().queue(type).push(Arrays.asList(task));
  51. }
  52. public static void add(String type, Searchable...obj) {
  53. push(type, ACTION_ADD, obj);
  54. }
  55. public static void update(String type, Searchable...obj) {
  56. push(type, ACTION_UPDATE, obj);
  57. }
  58. public static void delete(String type, Searchable...obj) {
  59. push(type, ACTION_DELETE, obj);
  60. }
  61. public String getType() {
  62. return type;
  63. }
  64. public void setType(String type) {
  65. this.type = type;
  66. }
  67. public final static boolean isAvailType(String p_type) {
  68. return (p_type!=null) && types.contains(p_type.toLowerCase());
  69. }
  70. public final static boolean isAvailAction(String p_action) {
  71. return ACTION_ADD.equalsIgnoreCase(p_action) || ACTION_DELETE.equalsIgnoreCase(p_action) || ACTION_UPDATE.equalsIgnoreCase(p_action);
  72. }
  73. public boolean isCodeTask() {
  74. return Constants.TYPE_CODE.equals(type);
  75. }
  76. public String getAction() {
  77. return action;
  78. }
  79. public void setAction(String action) {
  80. this.action = action;
  81. }
  82. public List<Searchable> getObjects() {
  83. return objects;
  84. }
  85. @JsonProperty("objects")
  86. public void readObjects(Map<String,Object>[] values) throws Exception {
  87. for(Map<String, Object> value : values) {
  88. Searchable obj = null;
  89. switch(type){
  90. case Constants.TYPE_CODE:
  91. obj = new CodeRepository();
  92. break;
  93. case Constants.TYPE_REPOSITORY:
  94. obj = new Repository();
  95. break;
  96. case Constants.TYPE_ISSUE:
  97. obj = new Issue();
  98. }
  99. BeanUtils.populate(obj, value);
  100. objects.add(obj);
  101. }
  102. }
  103. public void addObject(Searchable obj) {
  104. objects.add(obj);
  105. }
  106. @JsonIgnore
  107. public void setJsonObjects(String json) {
  108. TypeReference typeRefer;
  109. switch(type) {
  110. case Constants.TYPE_CODE:
  111. typeRefer = new TypeReference<List<CodeRepository>>(){};
  112. break;
  113. case Constants.TYPE_REPOSITORY:
  114. typeRefer = new TypeReference<List<Repository>>() {};
  115. break;
  116. case Constants.TYPE_ISSUE:
  117. typeRefer = new TypeReference<List<Issue>>() {};
  118. break;
  119. default:
  120. throw new IllegalArgumentException("Illegal task type: " + type);
  121. }
  122. this.objects = (List<Searchable>)JsonUtils.readValue(json, typeRefer);
  123. }
  124. /**
  125. * 写入索引库
  126. * @return
  127. * @exception
  128. */
  129. public int write() throws IOException {
  130. return IndexManager.write(this);
  131. }
  132. /**
  133. * 用于多线程环境下共享 IndexWriter 写入
  134. * @param i_writer
  135. * @param t_writer
  136. * @return
  137. * @throws IOException
  138. */
  139. public int write(IndexWriter i_writer, TaxonomyWriter t_writer) throws IOException {
  140. return IndexManager.write(this, i_writer, t_writer);
  141. }
  142. /**
  143. * 生成 json
  144. * @return
  145. */
  146. public String json() {
  147. return JsonUtils.toJson(this);
  148. }
  149. public static QueueTask parse(String json) {
  150. return JsonUtils.readValue(json, QueueTask.class);
  151. }
  152. @Override
  153. public String toString() {
  154. return "QueueTask{" +
  155. "type='" + type + '\'' +
  156. ", action='" + action + '\'' +
  157. ", objects=" + objects +
  158. '}';
  159. }
  160. public static void main(String[] args) {
  161. String json = "{\"type\":\"code\",\"action\":\"add\",\"objects\":[{\"id\":379,\"doc_id\":0,\"doc_score\":0.0,\"enterprise\":10,\"scm\":\"git\",\"vender\":\"gitea\",\"name\":\"xxxxx\",\"url\":\"http://git.xxxxxx.com:3000/xxxx/xxxxx\",\"timestamp\":0,\"document\":{\"fields\":[{\"char_sequence_value\":\"379\"},{\"char_sequence_value\":\"gitea\"},{\"char_sequence_value\":\"10\"},{\"char_sequence_value\":\"http://git.xxxxx.com:3000/xxxx/xxxxx\"},{\"char_sequence_value\":\"xxxxx\"},{\"char_sequence_value\":\"git\"},{\"char_sequence_value\":\"1620462113883\"}]},\"relative_path\":\"000/000/000/xxxxx_379\",\"id_as_string\":\"379\"}],\"code_task\":true}";
  162. QueueTask task = parse(json);
  163. System.out.println(task);
  164. }
  165. }
  1. public static void push(String type, String action, Searchable...obj){
  2. QueueTask task = new QueueTask();
  3. task.type = type;
  4. task.action = action;
  5. task.objects.addAll(Arrays.asList(obj));
  6. QueueFactory.getProvider().queue(type).push(Arrays.asList(task));
  7. }
  1. List<QueueTask> tasks = provider.queue(type).pop(batch_fetch_count);