概述
在本文中,我们会从基础开始讲解,讲解针对一个最普通的 Python 程序而言,如何实现接入 Jaeger 和 OpenTracing,从而能够进行信息追踪。
依赖库说明
在本文的讲解中,我们主要依赖两个 Python 第三方库:
- opentracing:基于 Python 语言的 OpenTracing 官方库,它是所有 Python OpenTracing 第三方库的基础库。
- jaeger_client:OpenTracing 中的 Jaeger 实现库,用于将 OpenTracing 中得到的 Trace 信息发送给 Jaeger 后端服务。
前置条件
为了能够更好的了解每一步后得到的结果和效果,建议在开始本文的学习之前,首先先搭建一台 Jaeger 后端环境,从而能够更好的体会每一步的作用。
Jaeger Client 介绍
安装
和其他大部分的 Python 第三方库一样,Jaeger Client 的安装非常简单,直接用 Python 的 pip 包管理工具安装即可:
pip install jaeger-client
快速上手
import logging
import time
from jaeger_client import Config
if __name__ == "__main__":
log_level = logging.DEBUG
logging.getLogger('').handlers = []
logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)
config = Config(
config={ # Jaeger 配置
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='debug',
validate=True,
)
# 同时会创建一个 opentracing.tracer 对象
tracer = config.initialize_tracer()
with tracer.start_span('TestSpan') as span:
span.log_kv({'event': 'test message', 'life': 42})
with tracer.start_span('ChildSpan', child_of=span) as child_span:
child_span.log_kv({'event': 'down below'})
time.sleep(2) # 等待数据接入 jaeger 后端
tracer.close() # 将 buffer 中的 span 数据刷入存储
下面,我们来了解一下这段代码:
- 首先,我们将 log 的级别设置为 DEBUG 级别,从而能够看到 jaeger-agent 打印的相关日志。
- 接下来,我们创建一个 jaeger_client.Config 的实例,这个是使用 JaegerAgent 的第一步,就是根据指定配置来创建一个 JaegerClient Config 对象。
- 然后,我们根据 config 对象的 initialize_tracer 方法,可以得到一个 Tracer 对象,而这个 Tracer 对象其实是 Jaeger 基于 OpenTracing 库中的 Tracer 对象继承扩展得到的。
- 得到 tracer 对象之后,我们就可以用 start_span 方法等来创建 span 并针对得到的 span 进行相关的操作了。
注意事项
一. 在 import 过程中不要初始化tracer,这样可能会导致死锁。
推荐方法:定义一个返回 Tracer 的函数,并在所有 import 完成之后,显示调用该函数。
Ps: 需要注意的是,为了防止该函数在多次调用时返回为空的问题,可以在返回前判断 tracer 是否已经被初始化了,如果是,则直接返回当前的 tracer 即可,示例如下:
import opentracing
from jaeger_client import Config
def get_tracer():
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name="debug",
validate=True,
)
# this call also sets opentracing.tracer
if not config.initialized():
tracer = config.initialize_tracer()
else:
tracer = opentracing.global_tracer()
return tracer
二. 尽量在一个程序中只定义一个Tracer,如果必须需要多个 Tracer ,需要使用 new_tracer 方法。
示例如下:
def new_tracer(service_name):
"""
# 创建一个新的 Tracer
:param service_name:
:return:
"""
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name=service_name,
validate=True,
)
return config.new_tracer()
if __name__ == "__main__":
tracer = get_tracer()
tracer1 = get_tracer() # tracer1 和 tracer 是同一个 tracer
tracer2 = new_tracer("debug_new") # tracer2 是一个新的 tracer
with tracer2.start_span("new_tracer") as span:
span.set_tag("tag_new", "value_new")
with tracer.start_span('TestSpan') as span:
span.log_kv({'event': 'test message', 'life': 42})
time.sleep(2) # 等待数据接入 jaeger 后端
tracer.close() # 将 buffer 中的 span 数据刷入存储
说明:
可以看到,在上述代码中,我们首先连续调用了两次 get_tracer 函数,由于我们在 get_tracer 函数内部做了判断,因此,我们可以知道,tracer 和 tracer1 本质上是同一个对象。
另外,我们用 new_tracer 的方法又额外创建了一个新的 tracer 对象 tracer2,tracer2 和之前的 tracer 对象就不是同一个对象了。
OpenTracing 介绍
安装
OpenTracing 的 Python 库安装也非常简单,直接用 pip 安装即可。
事实上,如果你已经安装了 jaeger-client 的话,其实 opentracing 的 Python 库其实也已经安装好了,因为 jaeger-client 其实是依赖 opentracing Python 库的。
pip install opentracing
快速上手
OpenTracing 的注入库其实主要会在三个场景中进行调用:
- 当服务接收到一个新的请求时(无论是HTTP协议还是其他类型),使用 OpenTracing 的 extract API 来从请求信息中提取对应的 span 信息,并创建一个新的 span 来继续向之前的 trace 中增加信息。如果是一个新的 trace 的开始,则创建一个新的 trace 和根 span。
- 当前服务将要调用另外一个其他服务时,需要从当前内存中读取当前 span 信息,并创建一个子 span ,同时将子 span 通过 inject API 将 span 信息注入到出站请求(如HTTP Header)中带出去。
- 当需要创建一个子 span 用于某个事件场景中时。
下面,我们依次对三个场景来进行演示。
接收新请求时
def before_request(request, tracer):
"""
# 使用 extract API 来从请求信息中提取对应的 span 信息
# 创建一个新的 span 并返回该 span
"""
span_context = tracer.extract(
format=Format.HTTP_HEADERS,
carrier=request.headers,
)
span = tracer.start_span(
operation_name=request.operation,
child_of=span_context)
span.set_tag('http.url', request.full_url)
remote_ip = request.remote_ip
if remote_ip:
span.set_tag(tags.PEER_HOST_IPV4, remote_ip)
caller_name = request.caller_name
if caller_name:
span.set_tag(tags.PEER_SERVICE, caller_name)
remote_port = request.remote_port
if remote_port:
span.set_tag(tags.PEER_PORT, remote_port)
return span
def handle_request(request):
# 得到当前作用域的 span
span = before_request(request, opentracing.global_tracer())
# 使用 scope_manager 将 span 信息存储在本地存储中,返回的 scope 可以作为上下文管理器
# 当跳出 scope 范围时,其对应的 span 会自动调用 finish() 方法
with tracer.scope_manager.activate(span, True) as scope:
# 具体的业务逻辑
handle_request_for_real(request)
调用其他服务时
from opentracing import tags
from opentracing.propagation import Format
from opentracing_instrumentation import request_context
def before_http_request(request, current_span_extractor):
op = request.operation
# 获取当前的 span
parent_span = current_span_extractor()
# 基于当前的 span 创建一个子 span
outbound_span = opentracing.global_tracer().start_span(
operation_name=op,
child_of=parent_span
)
# 设置子 span 的属性
outbound_span.set_tag('http.url', request.full_url)
service_name = request.service_name
host, port = request.host_port
if service_name:
outbound_span.set_tag(tags.PEER_SERVICE, service_name)
if host:
outbound_span.set_tag(tags.PEER_HOST_IPV4, host)
if port:
outbound_span.set_tag(tags.PEER_PORT, port)
# 将子 span 信息序列化后写入 request 的 header 中
http_header_carrier = {}
opentracing.global_tracer().inject(
span_context=outbound_span.context,
format=Format.HTTP_HEADERS,
carrier=http_header_carrier)
for key, value in http_header_carrier.iteritems():
request.add_header(key, value)
return outbound_span
# 创建一个子 span 并将其序列化,同时作为上下文环境执行该请求
with before_http_request(request=out_request, current_span_extractor=request_context.get_current_span):
# 真实调用其他服务
return urllib2.urlopen(request)
其中:
- request_context.get_current_span() 可以获取当前上下文所在的 span。
- opentracing.global_tracer() 可以获取当前程序中的 tracer。
scope介绍
为了能够保证程序可以随时获取到当前所在的 span ,OpenTracing 给每个 Tracer 都分配了一个 ScopeManager 的对象,它通过对 Scope 的管理来实现随时获取当前所在的 span。
对于一个 Span 而言,它所属的任务或者线程后可能会发生变化,但是所属的 Scope 一定不会变化。
例如,我们可以通过如下方式直接获取当前所在的 span:
scope = tracer.scope_manager.active
if scope is not None:
scope.span.set_tag('...', '...')
Scope 最常用的用法是启动一个 Scope 作为上下文,从而自动在进程内进行传播(只有scope作为上下文后,才能自动在当前线程内自动传播,跨线程时,需要在通过的线程之间传递scope变量):
# 方式1: 手动开启一个 span ,针对该 span 得到 scope 上下文,并在上下文内执行
span = tracer.start_span(operation_name='someWork')
with tracer.scope_manager.activate(span, True) as scope:
# Do things.
# 方式2: 通过 start_active_span 函数来自动开启一个 span 并得到 scope 上下文
# 其中: finish_on_close is 是一个必填参数
with tracer.start_active_span('someWork', finish_on_close=True) as scope:
# Do things.
# scope 可以在 try-except-finally 中使用
span = tracer.start_span(operation_name='someWork')
scope = tracer.scope_manager.activate(span, True)
try:
# Do things.
except Exception as e:
span.set_tag('error', '...')
finally:
scope.close()
此外,需要说明的是,在 start_span 时,如果当前已经处于某个 scope 时,会自动将当前 scope 对应的 span 作为其 parent span,除非显示指定 ignore_active_span=True 参数,例如:
scope = tracer.start_active_span('someWork', ignore_active_span=True)
每个不同的服务框架其实都应该提供一种对应的 ScopeManager 。例如线程内部存储和基于协程的异步存储等方式。
默认情况下使用的是 ThreadLocalScopeManager 线程内部的存储。此外,还提供了 gevent, tornado, asyncio 以及 contextvars 等多种不同的 ScopeManager。
跨线程传递 span
很多时候,我们都会在 Fork 出一些线程来并发的执行一些任务,示例如下:
import logging
import time
from threading import Thread
from jaeger_utils import get_tracer
def print_hello(name):
"""
# print hello
:param name:
:return:
"""
with tracer.start_span('print_hello') as thread_span:
thread_span.set_tag('task', "print_hello")
print("hello %s" % name)
if __name__ == "__main__":
log_level = logging.DEBUG
logging.getLogger('').handlers = []
logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)
# 同时会创建一个 opentracing.tracer 对象
tracer = get_tracer()
with tracer.start_span('main') as span:
span.log_kv({'event': 'test message', 'life': 42})
# fork 一个线程执行子任务
Thread(target=print_hello, args=("wangzhe", )).start()
time.sleep(2) # 等待数据接入 jaeger 后端
tracer.close() # 将 buffer 中的 span 数据刷入存储
可以看到,我们是在 main span 的上下文中 fork 出来了一个 thread 来执行 print hello 任务。
那么,我们是不是期望 print_hello span 应该是 main span 的子 span 呢?
然而,从 jaeger UI 页面上查询一下数据,可以看到:
诶!不对,两个 span 成为了完全独立的 span 了。
说明 Fork 出来了 thread 其实并没有默认继承原始线程的 context 上下文,这种情况应该怎么办呢?
也能简单,只需要把父 span 作为 thread 函数作为参数传递到线程函数内部,并在线程中创建 span 时,显示指定对应的父 span 即可,修改后的代码如下:
import logging
import time
from threading import Thread
from jaeger_utils import get_tracer
def print_hello(name, parent_span):
"""
# print hello
:param name:
:param parent_span:
:return:
"""
# 显示指定对应的父 span
with tracer.start_span('print_hello', child_of=parent_span) as thread_span:
thread_span.set_tag('task', "print_hello")
print("hello %s" % name)
if __name__ == "__main__":
log_level = logging.DEBUG
logging.getLogger('').handlers = []
logging.basicConfig(format='%(asctime)s %(message)s', level=log_level)
# 同时会创建一个 opentracing.tracer 对象
tracer = get_tracer()
with tracer.start_span('main') as span:
span.log_kv({'event': 'test message', 'life': 42})
# fork 一个线程执行子任务
Thread(target=print_hello, args=("wangzhe", span)).start()
time.sleep(2) # 等待数据接入 jaeger 后端
tracer.close() # 将 buffer 中的 span 数据刷入存储
重新运行一下看看呢?
棒!现在已经达到我们想要的效果了!