我试图跟踪一个方法,它有两个使用开放遥测的Go例程。第一个围棋例行公事从卡夫卡开始,创造了一份持久的工作(从1秒到1分不等)。然后,第二个Go例程侦听已完成的作业。
什么是正确的跟踪方式,以便我们知道哪个作业结果(在第二个例程中)对应于哪个kafka消息(来自第一个例程)?
我的猜测是,在Go例程中创建的两个跨度必须通过同一个traceId链接。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for message := range kafkaEvents {
// process message, create long job
// create span here with traceID?
}
func waitForJobs(ctx) {
for results := range finishedJobs
// process result
// create span here with traceID?
}
}任何建议都是非常感谢的!
发布于 2022-01-19 17:35:37
事实上,答案比我想象的要容易。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。
在我的示例中,由于我使用的是文本traceparent头,因此是propagation.TextMapPropagator的propagation.TraceContext{}实现,所以我决定发送整个traceparent头(尽管我可能需要对tracestate做同样的操作),然后在处理完成的作业时使用Extract方法来解码头部。但是为了使用Extract方法,您需要实现propagation.TextMapCarrier接口。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for msg := range kafkaEvents {
// extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
// create span
tr := otel.Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
// get just the traceparent header
carrier := otelsarama.NewConsumerMessageCarrier(&msg)
traceparentHeader := carrier.Get("traceparent")
// process message, create long job and attach the header
jobs.enqueue{TraceparentHeader: traceparentHeader}
}
func waitForJobs(ctx) {
for result := range finishedJobs {
ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
attribute.String("jobName", result.JobName),
))
defer span.End()
// do more work
}
}// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
S string
}
func (c PseudoCarrier) Get(_ string) string {
return c.S
}
func (c PseudoCarrier) Set(string, string) {}
func (c PseudoCarrier) Keys() []string {
return []string{"traceparent"}
}https://stackoverflow.com/questions/70438619
复制相似问题