首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有一种有效的方法来连接几个(超过2个)的kafka主题?

有没有一种有效的方法来连接几个(超过2个)的kafka主题?
EN

Stack Overflow用户
提问于 2017-06-29 03:49:36
回答 2查看 704关注 0票数 2

我想外部加入几个(通常是2-10) Kafka主题的关键字,理想情况下使用流式API。所有主题都将具有相同的键和分区。实现此连接的一种方法是为每个主题创建一个KStream,并将调用链接到KStream.outerJoin

代码语言:javascript
复制
stream1
    .outerJoin(stream2, ...)
    .outerJoin(stream3, ...)
    .outerJoin(stream4, ...)

然而,KStream.outerJoindocumentation表明,每个对outerJoin的调用都会实现它的两个输入流,因此上面的示例不仅会实现流1到4,还会实现stream1.outerJoin(stream2, ...)stream1.outerJoin(stream2, ...).outerJoin(stream3, ...)。与直接连接4个流相比,会有很多不必要的序列化、反序列化和I/O。

上述方法的另一个问题是,所有4个输入流的JoinWindow可能不一致:一个JoinWindow将用于联接流1和流2,但随后将使用单独的联接窗口来联接该流和流3,等等。例如,我为每个联接指定了10秒的联接窗口,并且具有特定关键字的条目分别出现在流1中的0秒、流2中的6秒、流3中的12秒和流4中的18秒中,所连接的项将在18秒后输出,从而导致过高的延迟。结果取决于连接的顺序,这似乎不自然。

有没有更好的方法来使用Kafka进行多路连接?

EN

回答 2

Stack Overflow用户

发布于 2017-07-02 00:39:26

我不知道目前在Kafka Stream中有更好的方法,但它正在形成中:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup

票数 1
EN

Stack Overflow用户

发布于 2017-06-29 05:33:28

最终,我决定创建一个自定义的轻量级加入器,它避免了物化,并严格遵守过期时间。它平均应该是O(1)。与Stream API相比,它更适合consumer API:对于每个使用者,重复轮询并使用任何接收到的数据更新joiner;如果joiner返回完整的属性集,则将其转发。代码如下:

代码语言:javascript
复制
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
 * Inner joins multiple streams of data by key into one stream. It is assumed
 * that a key will appear in a stream exactly once. The values associated with
 * each key are collected and if all values are received within a certain
 * maximum wait time, the joiner returns all values corresponding to that key.
 * If not all values are received in time, the joiner never returns any values
 * corresponding to that key.
 * <p>
 * This class is not thread safe: all calls to
 * {@link #update(Object, Object, long)} must be synchronized.
 * @param <K> The type of key.
 * @param <V> The type of value.
 */
class StreamInnerJoiner<K, V> {

    private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>();
    private final int joinCount;
    private final long maxWait;

    /**
     * Creates a stream inner joiner.
     * @param joinCount The number of streams being joined.
     * @param maxWait The maximum amount of time after an item has been seen in
     * one stream to wait for it to be seen in the remaining streams.
     */
    StreamInnerJoiner(final int joinCount, final long maxWait) {
        this.joinCount = joinCount;
        this.maxWait = maxWait;
    }

    private static class Vals<A> {
        final long firstSeen;
        final Collection<A> vals = new ArrayList<>();
        private Vals(final long firstSeen) {
            this.firstSeen = firstSeen;
        }
    }

    /**
     * Updates this joiner with a value corresponding to a key.
     * @param key The key.
     * @param val The value.
     * @param now The current time.
     * @return If all values for the specified key have been received, the
     * complete collection of values for thaht key; otherwise
     * {@link Optional#empty()}.
     */
    Optional<Collection<V>> update(final K key, final V val, final long now) {
        expireOld(now - maxWait);
        final Vals<V> curVals = getOrCreate(key, now);
        curVals.vals.add(val);
        return expireAndGetIffFull(key, curVals);
    }

    private Vals<V> getOrCreate(final K key, final long now) {
        final Vals<V> existingVals = idToVals.get(key);
        if (existingVals != null)
            return existingVals;
        else {
            /*
            Note: we assume that the item with the specified ID has not already
            been seen and timed out, and therefore that its first seen time is
            now. If the item has in fact already timed out, it is doomed and
            will time out again with no ill effect.
             */
            final Vals<V> curVals = new Vals<>(now);
            idToVals.put(key, curVals);
            return curVals;
        }
    }

    private void expireOld(final long expireBefore) {
        final Iterator<Vals<V>> i = idToVals.values().iterator();
        while (i.hasNext() && i.next().firstSeen < expireBefore)
            i.remove();
    }

    private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) {
        if (vals.vals.size() == joinCount) {
            // as all expired entries were already removed, this entry is valid
            idToVals.remove(key);
            return Optional.of(vals.vals);
        } else
            return Optional.empty();
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44811384

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档