最近有需求要把存放在S3里的csv文件通过Lambda导入到DynamoDB中。
AWS Lamda限制
- 最多运行15分钟
- 最大内存3G
- 最大临时存储512M
因为该对象是大容量的压缩文件,解压缩之后肯定超过512M,所以不考虑把文件下载之后再操作。
另外由于DynamoDB的批量请求每次最多25个Request,所以决定读取对像输入流,每25行提交一次。
但是在单线程的情况下,使用on demand模式不考虑Write Capacity,写入50万记录肯定会超过15分钟。(实测从Lambda读取S3输入流再持续批量写入15分钟总共30万条记录)
解决方案
伪多线程方式,把对象文件解压缩后拆分成若干个小文件,每个小文件触发Lambda,写入DynamoDB。
def process_zip_file(bucket_name, folder_key, object_key):file = s3res.Object(bucket_name, folder_key + '/' + object_key)buffer = io.BytesIO(file.get()["Body"].read())with zipfile.ZipFile(buffer) as z:for filename in z.namelist():with z.open(filename) as result_lines:generate_partial_files(bucket_name, folder_key, object_key, result_lines, '.txt.zip')def generate_partial_files(bucket_name, folder_key, object_key, result_lines, file_extension):line_number = 0partial_number = 0partial_lines = ''for line_bytes in result_lines:line_number = line_number + 1new_folder_key = folder_key + '/processing/'partial_lines += line_bytes.decode()partial_object_key = new_folder_key + object_key.replace(file_extension,'_' + str(partial_number) + '.txt')# print(line_number)if line_number % 30000 == 0:# print(partial_lines)s3.upload_fileobj(io.BytesIO(partial_lines.encode()), bucket_name, partial_object_key)partial_lines = ''partial_number += 1if partial_lines:s3.upload_fileobj(io.BytesIO(partial_lines.encode()), bucket_name,partial_object_key)
备用方案
- 使用CloudFormation启动EMR跑Hive Script
- 启动一台EC2跑程序可以无视Lambda运行时间和内存限制
DynamoDB相关考虑
- 分成多个文件后并行运行Lambda程序会增加DynamoDB的吞吐量,所以需要根据对象大小调整Write Capacity。
- 使用on demand模式
- 使用provision model临时调大Write Capacity,写完一定要记得调回来,否则看到月底的账单时候就要哭了。
