最近有需求要把存放在S3里的csv文件通过Lambda导入到DynamoDB中。

AWS Lamda限制

  • 最多运行15分钟
  • 最大内存3G
  • 最大临时存储512M

因为该对象是大容量的压缩文件,解压缩之后肯定超过512M,所以不考虑把文件下载之后再操作。

另外由于DynamoDB的批量请求每次最多25个Request,所以决定读取对像输入流,每25行提交一次。

但是在单线程的情况下,使用on demand模式不考虑Write Capacity,写入50万记录肯定会超过15分钟。(实测从Lambda读取S3输入流再持续批量写入15分钟总共30万条记录)

解决方案

伪多线程方式,把对象文件解压缩后拆分成若干个小文件,每个小文件触发Lambda,写入DynamoDB。

  1. def process_zip_file(bucket_name, folder_key, object_key):
  2. file = s3res.Object(bucket_name, folder_key + '/' + object_key)
  3. buffer = io.BytesIO(file.get()["Body"].read())
  4. with zipfile.ZipFile(buffer) as z:
  5. for filename in z.namelist():
  6. with z.open(filename) as result_lines:
  7. generate_partial_files(bucket_name, folder_key, object_key, result_lines, '.txt.zip')
  8. def generate_partial_files(bucket_name, folder_key, object_key, result_lines, file_extension):
  9. line_number = 0
  10. partial_number = 0
  11. partial_lines = ''
  12. for line_bytes in result_lines:
  13. line_number = line_number + 1
  14. new_folder_key = folder_key + '/processing/'
  15. partial_lines += line_bytes.decode()
  16. partial_object_key = new_folder_key + object_key.replace(file_extension,
  17. '_' + str(partial_number) + '.txt')
  18. # print(line_number)
  19. if line_number % 30000 == 0:
  20. # print(partial_lines)
  21. s3.upload_fileobj(io.BytesIO(partial_lines.encode()), bucket_name, partial_object_key)
  22. partial_lines = ''
  23. partial_number += 1
  24. if partial_lines:
  25. s3.upload_fileobj(io.BytesIO(partial_lines.encode()), bucket_name,
  26. partial_object_key)

备用方案

  • 使用CloudFormation启动EMR跑Hive Script
  • 启动一台EC2跑程序可以无视Lambda运行时间和内存限制

DynamoDB相关考虑

  • 分成多个文件后并行运行Lambda程序会增加DynamoDB的吞吐量,所以需要根据对象大小调整Write Capacity。
  • 使用on demand模式
  • 使用provision model临时调大Write Capacity,写完一定要记得调回来,否则看到月底的账单时候就要哭了。