我正在尝试将Jaeger跟踪集成到K流中。我计划在我的几条最重要的管道上添加追踪,并想知道什么是从一条线到另一条线的好方法?
以下是我到目前为止所拥有的--在流处理管道的开始时,我启动了一个服务器跨,并将traceid保存到一个状态存储中。稍后,在transform管道中,我访问statestore并从transform()方法中捕获跟踪。这是处理流处理中的跟踪的好方法吗?
input
.mapValues(toSomethingThatMyAppUnderstands)
.mapValues(this::startStreamTrace)
.filter((k, v) -> v.isPresent())
.mapValues(Optional::get)
.mapValues(doSomethingHereWith)
.flatMapValues(doSomethingElse)
.filter((k, v) -> isInterestingEvent(v))
.transform(() -> new TransformerWithTracing<SomeObjectA, SomeObjectB>(IN_MEM_STORE_NAME, someFunction), IN_MEM_STORE_NAME)
.flatMapValues(c -> c)
.to(outTopic, Produced.with(Serdes.String(), new EventSerde()));
public class TransformerWithTracing<V, VR> implements Transformer<String, V, KeyValue<String, VR>> {
final Function valueAction;
final String storeId;
private KeyValueStore<String, String> traceIdStore;
public TransformerWithTracing(String storeId, Function valueAction) {
this.storeId = storeId;
this.valueAction = valueAction;
}
@Override
public void init(ProcessorContext context) {
// KeyValueStore store = ((KeyValueStore<String, String>) context.getStateStore(storeId));
InMemoryKeyValueStore inMemoryKeyValueStore = (InMemoryKeyValueStore) store;
this.traceIdStore = store;
}
@Override
public KeyValue<String, VR> transform(String key, V value) {
System.out.println(traceIdStore.get(key));
// BuildTraceHeader
try(Scope scope = serviceTracer.startServerSpan(traceHeader, "Converting to Enterprise Event")) {
return KeyValue.pair(key, (VR) valueAction.apply(value));
}
}
@Override
public KeyValue<String, VR> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
// if (streamId != null) traceIdStore.delete(streamId);
}
}发布于 2018-10-20 18:37:17
在这个@jeqo的zipkin/勇敢的回购中也有类似的想法。
https://github.com/jeqo/brave/tree/kafka-streams-processor/instrumentation/kafka-streams
也有一些东西在开放赛车-控制回购,但它似乎只在跟踪生产者/消费者水平。
https://github.com/opentracing-contrib/java-kafka-client/tree/master/opentracing-kafka-streams
https://stackoverflow.com/questions/52875462
复制相似问题