我想外部加入几个(通常是2-10) Kafka主题的关键字,理想情况下使用流式API。所有主题都将具有相同的键和分区。实现此连接的一种方法是为每个主题创建一个KStream,并将调用链接到KStream.outerJoin
stream1
.outerJoin(stream2, ...)
.outerJoin(stream3, ...)
.outerJoin(stream4, ...)然而,KStream.outerJoin的documentation表明,每个对outerJoin的调用都会实现它的两个输入流,因此上面的示例不仅会实现流1到4,还会实现stream1.outerJoin(stream2, ...)和stream1.outerJoin(stream2, ...).outerJoin(stream3, ...)。与直接连接4个流相比,会有很多不必要的序列化、反序列化和I/O。
上述方法的另一个问题是,所有4个输入流的JoinWindow可能不一致:一个JoinWindow将用于联接流1和流2,但随后将使用单独的联接窗口来联接该流和流3,等等。例如,我为每个联接指定了10秒的联接窗口,并且具有特定关键字的条目分别出现在流1中的0秒、流2中的6秒、流3中的12秒和流4中的18秒中,所连接的项将在18秒后输出,从而导致过高的延迟。结果取决于连接的顺序,这似乎不自然。
有没有更好的方法来使用Kafka进行多路连接?
发布于 2017-07-02 00:39:26
我不知道目前在Kafka Stream中有更好的方法,但它正在形成中:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
发布于 2017-06-29 05:33:28
最终,我决定创建一个自定义的轻量级加入器,它避免了物化,并严格遵守过期时间。它平均应该是O(1)。与Stream API相比,它更适合consumer API:对于每个使用者,重复轮询并使用任何接收到的数据更新joiner;如果joiner返回完整的属性集,则将其转发。代码如下:
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
/**
* Inner joins multiple streams of data by key into one stream. It is assumed
* that a key will appear in a stream exactly once. The values associated with
* each key are collected and if all values are received within a certain
* maximum wait time, the joiner returns all values corresponding to that key.
* If not all values are received in time, the joiner never returns any values
* corresponding to that key.
* <p>
* This class is not thread safe: all calls to
* {@link #update(Object, Object, long)} must be synchronized.
* @param <K> The type of key.
* @param <V> The type of value.
*/
class StreamInnerJoiner<K, V> {
private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>();
private final int joinCount;
private final long maxWait;
/**
* Creates a stream inner joiner.
* @param joinCount The number of streams being joined.
* @param maxWait The maximum amount of time after an item has been seen in
* one stream to wait for it to be seen in the remaining streams.
*/
StreamInnerJoiner(final int joinCount, final long maxWait) {
this.joinCount = joinCount;
this.maxWait = maxWait;
}
private static class Vals<A> {
final long firstSeen;
final Collection<A> vals = new ArrayList<>();
private Vals(final long firstSeen) {
this.firstSeen = firstSeen;
}
}
/**
* Updates this joiner with a value corresponding to a key.
* @param key The key.
* @param val The value.
* @param now The current time.
* @return If all values for the specified key have been received, the
* complete collection of values for thaht key; otherwise
* {@link Optional#empty()}.
*/
Optional<Collection<V>> update(final K key, final V val, final long now) {
expireOld(now - maxWait);
final Vals<V> curVals = getOrCreate(key, now);
curVals.vals.add(val);
return expireAndGetIffFull(key, curVals);
}
private Vals<V> getOrCreate(final K key, final long now) {
final Vals<V> existingVals = idToVals.get(key);
if (existingVals != null)
return existingVals;
else {
/*
Note: we assume that the item with the specified ID has not already
been seen and timed out, and therefore that its first seen time is
now. If the item has in fact already timed out, it is doomed and
will time out again with no ill effect.
*/
final Vals<V> curVals = new Vals<>(now);
idToVals.put(key, curVals);
return curVals;
}
}
private void expireOld(final long expireBefore) {
final Iterator<Vals<V>> i = idToVals.values().iterator();
while (i.hasNext() && i.next().firstSeen < expireBefore)
i.remove();
}
private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) {
if (vals.vals.size() == joinCount) {
// as all expired entries were already removed, this entry is valid
idToVals.remove(key);
return Optional.of(vals.vals);
} else
return Optional.empty();
}
}https://stackoverflow.com/questions/44811384
复制相似问题