1、将本地代码打包上传S3并使用创建的lambda函数从S3中加载代码:
打包代码:
python程序运行需要一个独立的运行环境,在本地将所需要的所有依赖包下载打包,一块上传:
# 安装依赖root@kali-orderplus:~# apt-get -y install python-pip ziproot@kali-orderplus:~# pip install virtualenv# 配置一个虚拟环境root@kali-orderplus:~# mkdir -p /var/s3-to-es && cd /var/s3-to-esroot@kali-orderplus:/var/s3-to-es# virtualenv /var/s3-to-esNew python executable in /var/s3-to-es/bin/pythonInstalling setuptools, pip, wheel...done.root@kali-orderplus:/var/s3-to-es# cd /var/s3-to-es && source bin/activate(s3-to-es) root@kali-orderplus:/var/s3-to-es# pip install requests_aws4auth -t .(s3-to-es) root@kali-orderplus:/var/s3-to-es# pip freeze > requirements.txt# python脚本添加执行权限(s3-to-es) root@kali-orderplus:/var/s3-to-es# chmod 754 s3-to-es.py# 打包lambda运行环境(s3-to-es) root@kali-orderplus:/var/s3-to-es# zip -r /var/s3-to-es.zip *# 通过aws s3工具上传代码压缩包到S3桶中(s3-to-es) root@kali-orderplus:/var/s3-to-es# aws s3 cp /var/s3-to-es.zip s3://cf.us.xxxx.com/test-cflog/
python脚本详情:
#!/usr/bin/python3# -*- coding: utf-8 -*-from __future__ import print_functionimport boto3import jsonimport datetimeimport gzipimport urllibimport urllib3import loggingfrom pprint import pprintfrom requests_aws4auth import AWS4Authfrom botocore.vendored import requestsfrom io import BytesIO"""Can Override the global variables using Lambda Environment Parameters"""globalVars = {}globalVars['Owner'] = "Mystique"globalVars['Environment'] = "Prod"globalVars['awsRegion'] = "us-west-1"globalVars['tagName'] = "serverless-s3-to-es-log-ingester"globalVars['service'] = "es"globalVars['esIndexPrefix'] = "s3-to-es-"globalVars['esIndexDocType'] = "s3_to_es_docs"globalVars['esHosts'] = {'test': '' ,'prod': 'https://search-cloudflare-log-t7duyuq4dgxjw4x4fn7jj7wina.us-west-1.es.amazonaws.com'}# Initialize Loggerlogger = logging.getLogger()logger.setLevel(logging.INFO)def indexDocElement(es_Url, awsauth, docData):try:#headers = { "Content-Type": "application/json" }headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)if resp.status_code == 201:logger.info('INFO: Successfully inserted element into ES')else:logger.error('FAILURE: Unable to index element')except Exception as e:logger.error('ERROR: {0}'.format( str(e) ) )logger.error('ERROR: Unable to index line:"{0}"'.format(docData['content']) )print (e)def lambda_handler(event, context):s3 = boto3.client('s3')credentials = boto3.Session().get_credentials()awsauth = AWS4Auth( credentials.access_key,credentials.secret_key,globalVars['awsRegion'],globalVars['service'],session_token=credentials.token)logger.info("Received event: " + json.dumps(event, indent=2))try:bucket = event['Records'][0]['s3']['bucket']['name']key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])# Get documet (obj) form S3obj = s3.get_object(Bucket=bucket, Key=key)except Exception as e:logger.error('ERROR: {0}'.format( str(e) ) )logger.error('ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format(key, bucket) )if (key.endswith('.gz')) or (key.endswith('.tar.gz')):mycontentzip = gzip.GzipFile(fileobj=BytesIO(obj['Body'].read())).read()lines = mycontentzip.decode("utf-8").replace("'", '"')# print('unziped file')else:lines = obj['Body'].read().decode("utf-8").replace("'", '"')logger.info('SUCCESS: Retreived object from S3')# Split (S3 object/Log File) by lineslines = lines.splitlines()if (isinstance(lines, str)):lines = [lines]# Index each line to ES DomainindexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']docData = {}docData['objectKey'] = str(key)docData['createdDate'] = str(obj['LastModified'])docData['content_type'] = str(obj['ContentType'])docData['content_length'] = str(obj['ContentLength'])for line in lines:# docData['content'] = str(line)docData['content'] = json.loads(line)indexDocElement(es_Url, awsauth, docData )logger.info('SUCCESS: Successfully indexed the entire doc into ES')if __name__ == '__main__':lambda_handler(None, None)
