1、将本地代码打包上传S3并使用创建的lambda函数从S3中加载代码:
    打包代码:
    python程序运行需要一个独立的运行环境,在本地将所需要的所有依赖包下载打包,一块上传:

    1. # 安装依赖
    2. root@kali-orderplus:~# apt-get -y install python-pip zip
    3. root@kali-orderplus:~# pip install virtualenv
    4. # 配置一个虚拟环境
    5. root@kali-orderplus:~# mkdir -p /var/s3-to-es && cd /var/s3-to-es
    6. root@kali-orderplus:/var/s3-to-es# virtualenv /var/s3-to-es
    7. New python executable in /var/s3-to-es/bin/python
    8. Installing setuptools, pip, wheel...
    9. done.
    10. root@kali-orderplus:/var/s3-to-es# cd /var/s3-to-es && source bin/activate
    11. (s3-to-es) root@kali-orderplus:/var/s3-to-es# pip install requests_aws4auth -t .
    12. (s3-to-es) root@kali-orderplus:/var/s3-to-es# pip freeze > requirements.txt
    13. # python脚本添加执行权限
    14. (s3-to-es) root@kali-orderplus:/var/s3-to-es# chmod 754 s3-to-es.py
    15. # 打包lambda运行环境
    16. (s3-to-es) root@kali-orderplus:/var/s3-to-es# zip -r /var/s3-to-es.zip *
    17. # 通过aws s3工具上传代码压缩包到S3桶中
    18. (s3-to-es) root@kali-orderplus:/var/s3-to-es# aws s3 cp /var/s3-to-es.zip s3://cf.us.xxxx.com/test-cflog/

    python脚本详情:
    carbon (1).png

    1. #!/usr/bin/python3
    2. # -*- coding: utf-8 -*-
    3. from __future__ import print_function
    4. import boto3
    5. import json
    6. import datetime
    7. import gzip
    8. import urllib
    9. import urllib3
    10. import logging
    11. from pprint import pprint
    12. from requests_aws4auth import AWS4Auth
    13. from botocore.vendored import requests
    14. from io import BytesIO
    15. """
    16. Can Override the global variables using Lambda Environment Parameters
    17. """
    18. globalVars = {}
    19. globalVars['Owner'] = "Mystique"
    20. globalVars['Environment'] = "Prod"
    21. globalVars['awsRegion'] = "us-west-1"
    22. globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
    23. globalVars['service'] = "es"
    24. globalVars['esIndexPrefix'] = "s3-to-es-"
    25. globalVars['esIndexDocType'] = "s3_to_es_docs"
    26. globalVars['esHosts'] = {
    27. 'test': '' ,
    28. 'prod': 'https://search-cloudflare-log-t7duyuq4dgxjw4x4fn7jj7wina.us-west-1.es.amazonaws.com'
    29. }
    30. # Initialize Logger
    31. logger = logging.getLogger()
    32. logger.setLevel(logging.INFO)
    33. def indexDocElement(es_Url, awsauth, docData):
    34. try:
    35. #headers = { "Content-Type": "application/json" }
    36. headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
    37. resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
    38. if resp.status_code == 201:
    39. logger.info('INFO: Successfully inserted element into ES')
    40. else:
    41. logger.error('FAILURE: Unable to index element')
    42. except Exception as e:
    43. logger.error('ERROR: {0}'.format( str(e) ) )
    44. logger.error('ERROR: Unable to index line:"{0}"'.format(docData['content']) )
    45. print (e)
    46. def lambda_handler(event, context):
    47. s3 = boto3.client('s3')
    48. credentials = boto3.Session().get_credentials()
    49. awsauth = AWS4Auth( credentials.access_key,
    50. credentials.secret_key,
    51. globalVars['awsRegion'],
    52. globalVars['service'],
    53. session_token=credentials.token
    54. )
    55. logger.info("Received event: " + json.dumps(event, indent=2))
    56. try:
    57. bucket = event['Records'][0]['s3']['bucket']['name']
    58. key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
    59. # Get documet (obj) form S3
    60. obj = s3.get_object(Bucket=bucket, Key=key)
    61. except Exception as e:
    62. logger.error('ERROR: {0}'.format( str(e) ) )
    63. logger.error('ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format(key, bucket) )
    64. if (key.endswith('.gz')) or (key.endswith('.tar.gz')):
    65. mycontentzip = gzip.GzipFile(fileobj=BytesIO(obj['Body'].read())).read()
    66. lines = mycontentzip.decode("utf-8").replace("'", '"')
    67. # print('unziped file')
    68. else:
    69. lines = obj['Body'].read().decode("utf-8").replace("'", '"')
    70. logger.info('SUCCESS: Retreived object from S3')
    71. # Split (S3 object/Log File) by lines
    72. lines = lines.splitlines()
    73. if (isinstance(lines, str)):
    74. lines = [lines]
    75. # Index each line to ES Domain
    76. indexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )
    77. es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']
    78. docData = {}
    79. docData['objectKey'] = str(key)
    80. docData['createdDate'] = str(obj['LastModified'])
    81. docData['content_type'] = str(obj['ContentType'])
    82. docData['content_length'] = str(obj['ContentLength'])
    83. for line in lines:
    84. # docData['content'] = str(line)
    85. docData['content'] = json.loads(line)
    86. indexDocElement(es_Url, awsauth, docData )
    87. logger.info('SUCCESS: Successfully indexed the entire doc into ES')
    88. if __name__ == '__main__':
    89. lambda_handler(None, None)