1、将本地代码打包上传S3并使用创建的lambda函数从S3中加载代码:
打包代码:
python程序运行需要一个独立的运行环境,在本地将所需要的所有依赖包下载打包,一块上传:
# 安装依赖
root@kali-orderplus:~# apt-get -y install python-pip zip
root@kali-orderplus:~# pip install virtualenv
# 配置一个虚拟环境
root@kali-orderplus:~# mkdir -p /var/s3-to-es && cd /var/s3-to-es
root@kali-orderplus:/var/s3-to-es# virtualenv /var/s3-to-es
New python executable in /var/s3-to-es/bin/python
Installing setuptools, pip, wheel...
done.
root@kali-orderplus:/var/s3-to-es# cd /var/s3-to-es && source bin/activate
(s3-to-es) root@kali-orderplus:/var/s3-to-es# pip install requests_aws4auth -t .
(s3-to-es) root@kali-orderplus:/var/s3-to-es# pip freeze > requirements.txt
# python脚本添加执行权限
(s3-to-es) root@kali-orderplus:/var/s3-to-es# chmod 754 s3-to-es.py
# 打包lambda运行环境
(s3-to-es) root@kali-orderplus:/var/s3-to-es# zip -r /var/s3-to-es.zip *
# 通过aws s3工具上传代码压缩包到S3桶中
(s3-to-es) root@kali-orderplus:/var/s3-to-es# aws s3 cp /var/s3-to-es.zip s3://cf.us.xxxx.com/test-cflog/
python脚本详情:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
from __future__ import print_function
import boto3
import json
import datetime
import gzip
import urllib
import urllib3
import logging
from pprint import pprint
from requests_aws4auth import AWS4Auth
from botocore.vendored import requests
from io import BytesIO
"""
Can Override the global variables using Lambda Environment Parameters
"""
globalVars = {}
globalVars['Owner'] = "Mystique"
globalVars['Environment'] = "Prod"
globalVars['awsRegion'] = "us-west-1"
globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
globalVars['service'] = "es"
globalVars['esIndexPrefix'] = "s3-to-es-"
globalVars['esIndexDocType'] = "s3_to_es_docs"
globalVars['esHosts'] = {
'test': '' ,
'prod': 'https://search-cloudflare-log-t7duyuq4dgxjw4x4fn7jj7wina.us-west-1.es.amazonaws.com'
}
# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def indexDocElement(es_Url, awsauth, docData):
try:
#headers = { "Content-Type": "application/json" }
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
if resp.status_code == 201:
logger.info('INFO: Successfully inserted element into ES')
else:
logger.error('FAILURE: Unable to index element')
except Exception as e:
logger.error('ERROR: {0}'.format( str(e) ) )
logger.error('ERROR: Unable to index line:"{0}"'.format(docData['content']) )
print (e)
def lambda_handler(event, context):
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth( credentials.access_key,
credentials.secret_key,
globalVars['awsRegion'],
globalVars['service'],
session_token=credentials.token
)
logger.info("Received event: " + json.dumps(event, indent=2))
try:
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
# Get documet (obj) form S3
obj = s3.get_object(Bucket=bucket, Key=key)
except Exception as e:
logger.error('ERROR: {0}'.format( str(e) ) )
logger.error('ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format(key, bucket) )
if (key.endswith('.gz')) or (key.endswith('.tar.gz')):
mycontentzip = gzip.GzipFile(fileobj=BytesIO(obj['Body'].read())).read()
lines = mycontentzip.decode("utf-8").replace("'", '"')
# print('unziped file')
else:
lines = obj['Body'].read().decode("utf-8").replace("'", '"')
logger.info('SUCCESS: Retreived object from S3')
# Split (S3 object/Log File) by lines
lines = lines.splitlines()
if (isinstance(lines, str)):
lines = [lines]
# Index each line to ES Domain
indexName = globalVars['esIndexPrefix'] + str( datetime.date.today().year ) + '-' + str( datetime.date.today().month )
es_Url = globalVars['esHosts'].get('prod') + '/' + indexName + '/' + globalVars['esIndexDocType']
docData = {}
docData['objectKey'] = str(key)
docData['createdDate'] = str(obj['LastModified'])
docData['content_type'] = str(obj['ContentType'])
docData['content_length'] = str(obj['ContentLength'])
for line in lines:
# docData['content'] = str(line)
docData['content'] = json.loads(line)
indexDocElement(es_Url, awsauth, docData )
logger.info('SUCCESS: Successfully indexed the entire doc into ES')
if __name__ == '__main__':
lambda_handler(None, None)