非线程安全版

  1. import com.amazonaws.annotation.NotThreadSafe;
  2. import com.amazonaws.services.s3.AmazonS3;
  3. import com.amazonaws.services.s3.model.*;
  4. import java.util.*;
  5. @NotThreadSafe
  6. public class S3ObjectsScanner implements Iterable<String> {
  7. private final static int DEFAULT_MAX_KEYS = 1000;
  8. /**
  9. * S3客户端对象
  10. */
  11. private AmazonS3 client;
  12. /**
  13. * 桶名
  14. */
  15. private String bucketName;
  16. /**
  17. * 起始索引
  18. */
  19. private String startPrefix;
  20. /**
  21. * 终止索引
  22. */
  23. private String endPrefix;
  24. /**
  25. * key的最大缓存个数
  26. */
  27. private int maxKeys;
  28. public S3ObjectsScanner() {
  29. this.maxKeys = DEFAULT_MAX_KEYS;
  30. }
  31. public S3ObjectsScanner(AmazonS3 client, String bucketName) {
  32. this();
  33. this.client = client;
  34. this.bucketName = bucketName;
  35. }
  36. @NotThreadSafe
  37. private class Itr implements Iterator<String> {
  38. private final ListObjectsV2Request request;
  39. private final Queue<String> queue;
  40. private final String endPrefix;
  41. private boolean lastPage = false;
  42. Itr() {
  43. this.endPrefix = S3ObjectsScanner.this.endPrefix;
  44. request = new ListObjectsV2Request()
  45. .withBucketName(bucketName)
  46. .withStartAfter(startPrefix)
  47. .withMaxKeys(maxKeys);
  48. queue = new ArrayDeque<>(maxKeys);
  49. }
  50. @Override
  51. public synchronized boolean hasNext() {
  52. if (queue.isEmpty() && !lastPage) {
  53. loadMore();
  54. }
  55. return !queue.isEmpty();
  56. }
  57. @Override
  58. public synchronized String next() {
  59. if (!hasNext()) {
  60. throw new NoSuchElementException();
  61. }
  62. return queue.poll();
  63. }
  64. private void loadMore() {
  65. // 发送请求
  66. ListObjectsV2Result result = client.listObjectsV2(request);
  67. // 准备下一次请求
  68. prepareNextReq(result);
  69. // 提取Keys
  70. extraKeys(result.getObjectSummaries());
  71. }
  72. private void prepareNextReq(ListObjectsV2Result result) {
  73. // 检查是否还有剩余数据
  74. if (result.isTruncated()) {
  75. request.setContinuationToken(result.getNextContinuationToken());
  76. } else {
  77. // 没有剩余数据,置位
  78. lastPage = true;
  79. }
  80. }
  81. private void extraKeys(List<S3ObjectSummary> summaries) {
  82. for (S3ObjectSummary summary : summaries) {
  83. String key = summary.getKey();
  84. // 检查是否到达结束的位置
  85. if (isOver(key)) {
  86. // 结束,标志位置位,结束循环
  87. lastPage = true;
  88. return;
  89. }
  90. // 未结束,入队
  91. queue.offer(key);
  92. }
  93. }
  94. private boolean isOver(String key) {
  95. // 如果没有指定“终止前缀”,无论什么key都不算结束
  96. if (endPrefix == null) {
  97. return false;
  98. }
  99. // 如果key小于“终止前缀”,也不算结束
  100. if (endPrefix.compareTo(key) > 0) {
  101. return false;
  102. }
  103. return true;
  104. }
  105. }
  106. @Override
  107. public Iterator<String> iterator() {
  108. return new Itr();
  109. }
  110. public AmazonS3 getClient() {
  111. return client;
  112. }
  113. public S3ObjectsScanner withClient(AmazonS3 client) {
  114. this.client = client;
  115. return this;
  116. }
  117. public String getBucketName() {
  118. return bucketName;
  119. }
  120. public S3ObjectsScanner withBucketName(String bucketName) {
  121. this.bucketName = bucketName;
  122. return this;
  123. }
  124. public String getStartPrefix() {
  125. return startPrefix;
  126. }
  127. public S3ObjectsScanner withStartPrefix(String startPrefix) {
  128. this.startPrefix = startPrefix;
  129. return this;
  130. }
  131. public String getEndPrefix() {
  132. return endPrefix;
  133. }
  134. public S3ObjectsScanner withEndPrefix(String endPrefix) {
  135. this.endPrefix = endPrefix;
  136. return this;
  137. }
  138. public int getMaxKeys() {
  139. return maxKeys;
  140. }
  141. public S3ObjectsScanner withMaxKeys(int maxKeys) {
  142. this.maxKeys = maxKeys;
  143. return this;
  144. }
  145. }