首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何用开放遥测跟踪两个异步go例程

如何用开放遥测跟踪两个异步go例程
EN

Stack Overflow用户
提问于 2021-12-21 16:28:36
回答 1查看 749关注 0票数 1

我试图跟踪一个方法,它有两个使用开放遥测的Go例程。第一个围棋例行公事从卡夫卡开始,创造了一份持久的工作(从1秒到1分不等)。然后,第二个Go例程侦听已完成的作业。

什么是正确的跟踪方式,以便我们知道哪个作业结果(在第二个例程中)对应于哪个kafka消息(来自第一个例程)?

我的猜测是,在Go例程中创建的两个跨度必须通过同一个traceId链接。

代码语言:javascript
复制
func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for message := range kafkaEvents {
    // process message, create long job
    // create span here with traceID?
  }  

func waitForJobs(ctx) {
  for results := range finishedJobs
    // process result
    // create span here with traceID?
  }
}

任何建议都是非常感谢的!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-19 17:35:37

事实上,答案比我想象的要容易。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。

在我的示例中,由于我使用的是文本traceparent头,因此是propagation.TextMapPropagatorpropagation.TraceContext{}实现,所以我决定发送整个traceparent头(尽管我可能需要对tracestate做同样的操作),然后在处理完成的作业时使用Extract方法来解码头部。但是为了使用Extract方法,您需要实现propagation.TextMapCarrier接口。

代码语言:javascript
复制
func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for msg := range kafkaEvents {
    // extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))

    // create span 
    tr := otel.Tracer("consumer")
    _, span := tr.Start(ctx, "consume message", trace.WithAttributes(
        semconv.MessagingOperationProcess,
    ))
    defer span.End()
   
    // get just the traceparent header
    carrier := otelsarama.NewConsumerMessageCarrier(&msg)
    traceparentHeader := carrier.Get("traceparent")

    // process message, create long job and attach the header
    jobs.enqueue{TraceparentHeader: traceparentHeader}
  }  

func waitForJobs(ctx) {
  for result := range finishedJobs {
    ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
    ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
        attribute.String("jobName", result.JobName),
    ))
    defer span.End()
 
    // do more work 
  }
}
代码语言:javascript
复制
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
    S string
}

func (c PseudoCarrier) Get(_ string) string {
    return c.S
}

func (c PseudoCarrier) Set(string, string) {}

func (c PseudoCarrier) Keys() []string {
    return []string{"traceparent"}
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70438619

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档