在.NET中使用OpenTelemetry实现链路追踪(二)之Activity和DiagnosticSource发挥的作用
使用.Net原生对象ActivitySource和Activity实现链路追踪
在上一篇文章中,我们演示了如何添加自定义的Span到链路中,首先我们创建一个
Tracer
对象,然后在此基础上创建一个TelemetrySpan
,用来代表一个链路块。其实在其内部,Tracer持有了一个System.Diagnostics.ActivitySource对象,而TelemetrySpan则持有了一个System.Diagnostics.Activity对象,分别代表Tracer和Span,接下来我们直接使用这2个对象来实现链路追踪。添加一段如下的代码到Controller中
1
2
3
4
5
6
7
8
9
10
11
12
13
14private static readonly ActivitySource activitySource = new ActivitySource("MySourceName");
[HttpGet("trace")]
public string GetTrace()
{
using (var activity = activitySource.StartActivity("GetTraceActivity"))
{
if (activity != null)
{
activity.SetTag("custom.tag", "value");
activity.AddEvent(new ActivityEvent("Custom event"));
}
return "value";
}
}注意代码中,我们定义了一个全局静态变量activitySource,构造它的时候传递了一个名称MySourceName。因为对于ActivitySource对象,我们需要保证一个名称全局只有一个。
然后在配置OpenTelemetry的地方,加上这段代码
.AddSource("MySourceName")
,这一点其实是和我们使用OpenTelemetry的类库是一样的,都相当于注册这个名称到OpenTelemetry系统中如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public static void ConfigureServices(IServiceCollection services)
{
services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("ServiceA", serviceVersion: "1.0.0"))
.WithTracing(builder => builder
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddSource("ServiceA")
.AddSource("MySourceName")
.AddOtlpExporter());
services.AddHttpClient("ServiceB", client =>
{
client.BaseAddress = new Uri("http://localhost:5062");
});
}运行起来,并调用API就会得到如下的调用链路:
为什么要调用AddSource方法?
从上面的代码我们可以看出,OpenTelemetry类库内部是最终是通过Activity这个类来收集链路数据的。那么这个Activity对象又是如何变成OpenTelemetry支持的Trace数据格式的呢?有人会想,一定有一个地方会做这个转化。没错,的确是的,而做这个转化就需要一个入口,而入口就由ActivityListener 这个类提供,我们重点关注它的3个Property:
名称 | 描述 |
---|---|
ShouldListenTo | Gets or sets the callback that allows deciding if activity object events that were created using the activity source object should be listened or not. |
ActivityStarted | Gets or sets the callback used to listen to the activity start event. |
ActivityStopped | Gets or sets the callback used to listen to the activity stop event. |
当有Activity这个对象产生时,系统就会通过ShouldListenTo来判断是否有listener监听了这个Activity,如果有,那么当这个Activity对象调用Start和Stop这2个方法时,分别触发listener的ActivityStarted和ActivityStopped这2个回调。基于这个设计,OpenTelemetry会创建ActivityListener来监听Activity的创建,而我们调用AddSource方法,就是为了告诉OpenTelemetry哪些Activity是需要监听的,会被应用在ShouldListenTo的委托上面,最终Activity被转化成OpenTelemetry支持的Trace数据格式的。相关的代码,可以在这个类 TracerProviderSdk 的构造方法里找到。
如何优雅的创建Activity
看到这里我们不禁会想,这些内置的类库,比如:OpenTelemetry.Instrumentation.SqlClient,他们是如何创建Activity的,难道直接在类库中直接创建Activity吗?很显然不是的!接下来我们就介绍另外两个类:DiagnosticSource 和 DiagnosticListener。
DiagnosticSource和DiagnosticListener
下面是关于DiagnosticSource的一段官方的原话:
System.Diagnostics.DiagnosticSource is a module that allows code to be instrumented for production-time logging of rich data payloads for consumption within the process that was instrumented. At run time, consumers can dynamically discover data sources and subscribe to the ones of interest. System.Diagnostics.DiagnosticSource was designed to allow in-process tools to access rich data. When using System.Diagnostics.DiagnosticSource, the consumer is assumed to be within the same process and as a result, non-serializable types (for example,
HttpResponseMessage
orHttpContext
) can be passed, giving customers plenty of data to work with.简而言之,DiagnosticSource是一个进程内通信的订阅机制,传递不可序列化的数据。另外配合DiagnosticListener实现进程内的发布订阅机制,但是需要注意的是这种机制是同步的,所以一定要小心,并且在订阅时不能抛出异常。下面是一段关于这个机制的入门代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72using System.Diagnostics;
MyListener TheListener = new MyListener();
TheListener.Listening();
HTTPClient Client = new HTTPClient();
Client.SendWebRequest("https://learn.microsoft.com/dotnet/core/diagnostics/");
class HTTPClient
{
private static DiagnosticSource httpLogger = new DiagnosticListener("System.Net.Http");
public byte[] SendWebRequest(string url)
{
if (httpLogger.IsEnabled("RequestStart"))
{
httpLogger.Write("RequestStart", new { Url = url });
}
//Pretend this sends an HTTP request to the url and gets back a reply.
byte[] reply = new byte[] { };
return reply;
}
}
class Observer<T> : IObserver<T>
{
public Observer(Action<T> onNext, Action onCompleted)
{
_onNext = onNext ?? new Action<T>(_ => { });
_onCompleted = onCompleted ?? new Action(() => { });
}
public void OnCompleted() { _onCompleted(); }
public void OnError(Exception error) { }
public void OnNext(T value) { _onNext(value); }
private Action<T> _onNext;
private Action _onCompleted;
}
class MyListener
{
IDisposable networkSubscription;
IDisposable listenerSubscription;
private readonly object allListeners = new();
public void Listening()
{
Action<KeyValuePair<string, object>> whenHeard = delegate (KeyValuePair<string, object> data)
{
Console.WriteLine($"Data received: {data.Key}: {data.Value}");
};
Action<DiagnosticListener> onNewListener = delegate (DiagnosticListener listener)
{
Console.WriteLine($"New Listener discovered: {listener.Name}");
//Subscribe to the specific DiagnosticListener of interest.
if (listener.Name == "System.Net.Http")
{
//Use lock to ensure the callback code is thread safe.
lock (allListeners)
{
if (networkSubscription != null)
{
networkSubscription.Dispose();
}
IObserver<KeyValuePair<string, object>> iobserver = new Observer<KeyValuePair<string, object>>(whenHeard, null);
networkSubscription = listener.Subscribe(iobserver);
}
}
};
//Subscribe to discover all DiagnosticListeners
IObserver<DiagnosticListener> observer = new Observer<DiagnosticListener>(onNewListener, null);
//When a listener is created, invoke the onNext function which calls the delegate.
listenerSubscription = DiagnosticListener.AllListeners.Subscribe(observer);
}
// Typically you leave the listenerSubscription subscription active forever.
// However when you no longer want your callback to be called, you can
// call listenerSubscription.Dispose() to cancel your subscription to the IObservable.
}最终他会有如下输出:
1
2New Listener discovered: System.Net.Http
Data received: RequestStart: { Url = https://learn.microsoft.com/dotnet/core/diagnostics/ }关于这个机制就不在这里详细,可以参考这篇入门文章学习:DiagnosticSource 和 DiagnosticListener - .NET | Microsoft Learn
DiagnosticSource机制与Activity的结合
在.Net 框架的源代码中有许多内置的DiagnosticSource对象,他们会在发布很多消息,所以如果我们使用DiagnosticListener对它进行订阅的话,我们就可以创建Activity,来实现无侵入式的链路追踪。需要了解的一个点是DiagnosticListener同时也是DiagnosticSource的一个实现类。接下面我们就以Microsoft.Data.SqlClient为例解析一下原理:
在Microsoft.Data.SqlClient中,它会发布创建SourceName为:
SqlClientDiagnosticListener
的DiagnosticListener对象,通过调用父类的Write方法发布特定的消息,比如:Microsoft.Data.SqlClient.WriteCommandBefore,源代码:SqlDiagnosticListener,截图:在OpenTelemetry.Instrumentation.SqlClient中,就会订阅这个消息,并且创建Activity,源代码:SqlClientDiagnosticListener,截图:
这就是.Net框架内部实现链路追踪的原理。