5.1. Sniffer原理及驱动定制

sniffer默认使用的是基于bro流量分析的数据源驱动,但也提供了如:redis、kafka、rabbitmq、logstash、UDPServer、syslog、file等其他数据源驱动的demo供大家参考(无法直接使用,需要根据源数据的格式进行调整)

工作方式


工作方式.png

生成消息


  1. **此部分需根据自己业务进行修改定制**

以Nginx + Logstash为例(Nginx输出日志格式配置仅供参考)

  1. 1) 行格式
  2. log_format log_line '"$remote_addr" "$remote_port" "$server_addr" "$server_port" "$request_length" \
  3. "$content_length" "$body_bytes_sent" "$request_uri" "$host" "$http_user_agent" "$status" "$http_cookie" \
  4. "$request_method" "$http_referer" "$http_x_forwarded_for" "$request_time" "$sent_http_set_cookie" \
  5. "$content_type" "$upstream_http_content_type" "$request_data"';
  6. 例:
  7. <14>Jun 26 17:13:21 10-10-92-198 NGINX[26113]: "114.242.250.233" "65033" "10.10.92.198" "80" "726" "134" \
  8. "9443" "/gateway/shop/getStroeForDistance" "m.lechebang.com" "Mozilla/5.0 (iPhone; CPU iPhone OS 8_1_3
  9. like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12B466 MicroMessenger/6.1.4 NetType/3G+" "200" \
  10. "-" "POST" "http://m.lechebang.com/webapp/shop/list?cityId=10101&locationId=0&brandTypeId=\
  11. 6454&maintenancePlanId=227223&oilInfoId=3906" "0.114"
  12. 2) json格式
  13. log_format log_json '{ "@timestamp": "$time_local", '
  14. '"remote_addr": "$remote_addr", '
  15. '"referer": "$http_referer", '
  16. '"request": "$request", '
  17. '"status": $status, '
  18. '"bytes": $body_bytes_sent, '
  19. '"agent": "$http_user_agent", '
  20. '"x_forwarded": "$http_x_forwarded_for", '
  21. '"up_addr": "$upstream_addr",'
  22. '"up_host": "$upstream_http_host",'
  23. '"up_resp_time": "$upstream_response_time",'
  24. '"request_time": "$request_time"'
  25. ' }';
  26. 例:
  27. { "@timestamp": "12/Dec/2017:14:30:40 +0800", "remote_addr": "10.88.122.108", "referer": "-", "request": "GET / HTTP/1.1", "status": 304, "bytes":0, "agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36", "x_forwarded": "-", "up_addr": "-","up_host": "-","up_resp_time": "-","request_time": "0.000" }
  28. Nginx日志配置请参考:
  29. http://nginx.org/en/docs/http/ngx_http_log_module.html#log_format
  30. 注:
  31. 日志输出的详细字段、字段顺序、字段名,需要与sniffer中对应驱动的消息格式化处理逻辑一致

部署Logstash客户端,抽取Nginx日志输出:

  1. #logstash配置示例:
  2. input {
  3. file {
  4. path => ['/data/nginx/logs/access.log']
  5. start_position => "beginning"
  6. codec => "json"
  7. tags => ['user']
  8. type => "nginx"
  9. }
  10. }
  11. output {
  12. if [type] == "nginx" {
  13. #Sniffer logstash驱动模式
  14. tcp {
  15. host => "127.0.0.1"
  16. port => "5044"
  17. key => "nginx"
  18. db => "10"
  19. data_type => "list"
  20. codec => "line"
  21. },
  22. #Sniffer redislist驱动模式
  23. redis {
  24. host => "127.0.0.1"
  25. port => "6379"
  26. key => "nginx_access_log"
  27. db => "0"
  28. data_type => "list"
  29. codec => "line"
  30. }
  31. }
  32. }

读取数据


  1. **此部分需根据自己业务进行修改定制**

修改sniffer.conf

  1. #支持同时多源
  2. sources: [logstash,redislist]
  3. #对应redislistdriver.py
  4. redislist:
  5. driver: redislist
  6. host: 127.0.0.1
  7. port: 6379
  8. interface: any
  9. instances: 1
  10. parser:
  11. name: test
  12. module: testparser
  13. #对应logstashdriver.py
  14. #其实是TCPServer
  15. logstash:
  16. driver: logstash
  17. port: 5044
  18. instances: 1
  19. interface: any
  20. parser:
  21. name: test
  22. module: testparser
  23. #对应rabbitmqdriver.py
  24. rabbitmq:
  25. driver: rabbitmq
  26. amqp_url: redis://localhost:6379/
  27. queue_name: test_queue
  28. exchange_name: test_queue
  29. exchange_type: direct
  30. durable: true
  31. routing_key: test
  32. instances: 1
  33. interface: any
  34. parser:
  35. name: test
  36. module: testparser
  37. #对应brohttpdriver.py
  38. default:
  39. driver: bro
  40. interface: eth0
  41. ports: [80, 81, 1080, 3128, 8000, 8080, 8888, 9001, 8081]
  42. start_port: 48880
  43. instances: 1
  44. parser:
  45. name: test
  46. module: testparser
  47. #对应syslogdriver.py
  48. syslog:
  49. driver: syslog
  50. interface: eth0
  51. port: 9514
  52. parser:
  53. name: test
  54. module: testparser
  55. #对应kafkadriver.py
  56. kafka:
  57. driver: kafka
  58. interface: any
  59. bootstrap_servers: 127.0.0.1:9092
  60. parser:
  61. name: test
  62. module: testparser
  63. ...其他省略...

驱动定制(消息格式化)


所有驱动位于:

  1. 目录:
  2. sniffer/nebula_sniffer/nebula_sniffer/drivers/
  3. #bro驱动
  4. brohttpdriver.py
  5. #基于文件的驱动
  6. filedriver.py
  7. #kafka驱动
  8. kafkadriver.py
  9. #logstash驱动
  10. logstashdriver.py
  11. #UDPServer驱动
  12. pktdriver.py
  13. #rabbitmq驱动
  14. rabbitmqdriver.py
  15. #redis驱动
  16. redislistdriver.py
  17. #syslog驱动
  18. syslogdriver.py
  19. syslogtextdriver.py
  20. #tshark驱动
  21. tsharkhttpsdriver.py

sniffer会根据sniffer配置加载对应驱动:

  1. def get_driver(config, interface, parser, idx):
  2. """ global c """
  3. from complexconfig.configcontainer import configcontainer
  4. #不同driver,初始化方式不同
  5. name = config['driver']
  6. if name == "bro":
  7. from nebula_sniffer.drivers.brohttpdriver import BroHttpDriver
  8. embedded = config.get("embedded", True)
  9. ports = config['ports']
  10. from nebula_sniffer.utils import expand_ports
  11. ports = expand_ports(ports) # extend it
  12. start_port = int(config['start_port'])
  13. bpf_filter = config.get("bpf_filter", "")
  14. home = configcontainer.get_config("sniffer").get_string("sniffer.bro.home")
  15. if ports and home:
  16. driver = BroHttpDriver(interface=interface, embedded_bro=embedded, idx=idx, ports=ports, bro_home=home,
  17. start_port=start_port, bpf_filter=bpf_filter)
  18. elif ports:
  19. driver = BroHttpDriver(interface=interface, embedded_bro=embedded, idx=idx, ports=ports,
  20. start_port=start_port, bpf_filter=bpf_filter)
  21. elif home:
  22. driver = BroHttpDriver(interface=interface, embedded_bro=embedded, idx=idx, bro_home=home,
  23. start_port=start_port, bpf_filter=bpf_filter)
  24. else:
  25. driver = BroHttpDriver(interface=interface, embedded_bro=embedded, idx=idx,
  26. start_port=start_port, bpf_filter=bpf_filter)
  27. return driver
  28. if name == "tshark":
  29. from nebula_sniffer.drivers.tsharkhttpsdriver import TsharkHttpsDriver
  30. interface = interface
  31. ports = config["ports"]
  32. bpf_filter = config.get("bpf_filter", "")
  33. if ports:
  34. driver = TsharkHttpsDriver(interface=interface, ports=ports, bpf_filter=bpf_filter)
  35. else:
  36. driver = TsharkHttpsDriver(interface=interface, bpf_filter=bpf_filter)
  37. return driver
  38. if name == "syslog":
  39. from nebula_sniffer.drivers.syslogdriver import SyslogDriver
  40. port = int(config["port"])
  41. driver = SyslogDriver(port)
  42. return driver
  43. if name == "packetbeat":
  44. from nebula_sniffer.drivers.pktdriver import PacketbeatDriver
  45. port = int(config["port"])
  46. driver = PacketbeatDriver(port)
  47. return driver
  48. if name == "redislist":
  49. from nebula_sniffer.drivers.redislistdriver import RedisListDriver
  50. host = config["host"]
  51. port = int(config['port'])
  52. password = config.get('password', '')
  53. driver = RedisListDriver(host, port, password)
  54. return driver
  55. if name == "logstash":
  56. from nebula_sniffer.drivers.logstashdriver import LogstashDriver
  57. port = int(config['port'])
  58. driver = LogstashDriver(port)
  59. return driver
  60. if name == "rabbitmq":
  61. from nebula_sniffer.drivers.rabbitmqdriver import RabbitmqDriver
  62. amqp_url = config['amqp_url']
  63. queue_name = config['queue_name']
  64. exchange_name = config['exchange_name']
  65. exchange_type = config['exchange_type']
  66. durable = bool(config['durable'])
  67. routing_key = config['routing_key']
  68. driver = RabbitmqDriver(amqp_url, queue_name, exchange_name, exchange_type, durable, routing_key)
  69. return driver
  70. if name == "kafka":
  71. from nebula_sniffer.drivers.kafkadriver import KafkaDriver
  72. topics = config['topics']
  73. #config['bootstrap_servers']
  74. #kafka支持的配置参数
  75. #请参考python kafka库的使用方法
  76. driver = KafkaDriver(topics, **config)
  77. return driver
  78. return None

下面是logstash驱动示例:

  1. #其他部分代码略过,仅贴出核心代码
  2. #此处示例是对nginx json格式log的处理
  3. class LogstashDriver(Driver):
  4. ...其他省略...
  5. #对logstash客户端发送过来的消息进行格式化
  6. def _recv_msg_fn_in(self, msg, addr):
  7. """
  8. log-line:
  9. 36.7.130.69 - [16/Jul/2017:23:58:42 +0800] ffp.hnair.com 1 "GET /FFPClub/upload/index/e9b1bb4a-e1dd-47e1-8699-9828685004b4.jpg HTTP/1.1" 200 487752 - "http://ffp.hnair.com/FFPClub/cn/index.html" "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0; NetworkBench/8.0.1.309-4992258-2148837) like Gecko" "-"
  10. log-json
  11. {"@timestamp_scs": "2017-09-21T12:18:17+08:00", "scs_request_uri": "/site-wap/my/transferin.htm",
  12. "scs_status": "200", "scs_bytes_sent": "26829", "scs_upstream_cache_status": "-", "scs_request_time": "0.570",
  13. "scs_upstream_response_time": "0.570", "scs_host": "pay.autohome.com.cn", "scs_remote_addr": "10.20.2.23",
  14. "scs_server_addr": "10.20.252.33", "scs_upstream_addr": "10.20.252.20:8253", "scs_upstream_status": "200",
  15. "scs_http_referer": "https://pay.autohome.com.cn/site-wap/activity/upin.htm?__sub_from=A2002027782510100",
  16. "scs_http_user_agent": "Mozilla/5.0 (Linux; Android 5.1; OPPO A59m Build/LMY47I; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/43.0.2357.121 Mobile Safari/537.36 autohomeapp/1.0+%28auto_android%3B8.4.0%3BOi-CR_rnyywoODk5jv0ve5luKLNxl7AfnEsGsBBPNdWdDtP8ZDdRHA3ePFiDlWOr%3B5.1%3BOPPO%2BA59m%29 auto_android/8.4.0 nettype/wifi",
  17. "scs_http_X_Forwarded_For": "220.166.199.167"}
  18. """
  19. try:
  20. if not msg:
  21. return
  22. msg = msg.strip()
  23. if not msg:
  24. return
  25. self.logger.debug("get log msg %s from address %s", msg, addr)
  26. #收到消息后,根据消息格式进行解析处理
  27. try:
  28. msg = json.loads(msg)
  29. except Exception as e:
  30. return
  31. #从消息中提取字段数据
  32. c_ip = msg.get('scs_http_X_Forwarded_For', '')
  33. if c_ip:
  34. c_ip_group = c_ip.split(',')
  35. if c_ip_group:
  36. c_ip = c_ip_group[-1]
  37. c_port = 0
  38. s_port = 80
  39. c_bytes = 0
  40. s_bytes = msg.get('scs_bytes_sent', 0)
  41. if s_bytes == '-':
  42. s_bytes = 0
  43. else:
  44. s_bytes = int(s_bytes)
  45. status = int(msg.get('scs_status', 0))
  46. req_body = ''
  47. args = dict()
  48. args["method"] = 'GET'
  49. args["host"] = msg.get('scs_host', '').lower()
  50. args["uri"] = msg.get('scs_request_uri', '').lower()
  51. args["referer"] = msg.get('scs_http_referer', '').lower()
  52. args["user_agent"] = msg.get('scs_http_user_agent', '').lower()
  53. args["status_code"] = status
  54. args["status_msg"] = ""
  55. args["source_ip"] = c_ip
  56. args["source_port"] = c_port
  57. args["dest_ip"] = ''
  58. args["dest_port"] = s_port
  59. request_time = 0.0
  60. try:
  61. ctime = msg['@timestamp_scs']
  62. ctime = ctime.replace('T', ' ').replace('+08:00', '')
  63. time_array = time.strptime(ctime, "%Y-%m-%d %H:%M:%S")
  64. # 转换成时间戳
  65. request_time = time.mktime(time_array)
  66. except Exception as e:
  67. pass
  68. args["req_time"] = int(request_time * 1000)
  69. # headers
  70. args["req_headers"] = {}
  71. args["resp_headers"] = {}
  72. # no body for logstash
  73. args["log_body"] = False
  74. args["req_body"] = ""
  75. args["resp_body"] = ""
  76. args["req_body_len"] = c_bytes
  77. args["resp_body_len"] = s_bytes
  78. args["req_content_type"] = ''
  79. args["resp_content_type"] = ''
  80. args["req_body"] = req_body
  81. args["debug_processing"] = False
  82. self.logger.debug("get http data from logstash: %s", args)
  83. try:
  84. #最终格式化为Httpmsg格式
  85. new_msg = HttpMsg(**args)
  86. except BeFilteredException as bfe:
  87. return
  88. except Exception as err:
  89. self.logger.debug("fail to parse: %s", args)
  90. return
  91. self.logger.debug("get http msg from logstash: %s", new_msg)
  92. #丢到队列,进行事件提取
  93. self.put_msg(new_msg)
  94. self.count += 1
  95. if self.count % 1000 == 0:
  96. print "has put {}".format(self.count)
  97. return new_msg
  98. except Exception as ex:
  99. self.logger.error("fail to parse logstash data: %s", ex)
  100. ...其他省略...

事件提取


  1. **通用模块,无需修改定制**
  1. #源码:
  2. # sniffer/nebula_sniffer/nebula_sniffer/main.py
  3. class Main(object):
  4. def event_processor(self):
  5. ...其他省略...
  6. events = []
  7. if isinstance(msg, HttpMsg):
  8. # 对http信息进行处理,返回一个events(事件列表)
  9. events = self.parser.get_events_from_http_msg(msg)
  10. ...其他省略...

输出事件


  1. **通用模块,无需修改定制**