首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何编写异步接收数据的hazelcast jet源程序

如何编写异步接收数据的hazelcast jet源程序
EN

Stack Overflow用户
提问于 2018-09-24 09:31:22
回答 1查看 131关注 0票数 1

我一直在浏览hazelcast-jet文档,寻找一些外部进程异步提供的源的引用--在我的例子中是http post。

我确实查看了Kafka code,因为这似乎是最接近的,但我不知道新到达的事件如何触发任何事情。我假设,这里不会涉及阻塞线程。

我非常感谢任何能更好地理解我如何在一个“stream”元素被点滴输入的环境中使用hazelcast jet的建议。

EN

回答 1

Stack Overflow用户

发布于 2018-09-24 16:45:52

即将发布的Hazelcast Jet 0.7版本引入了source Builder对象,这使得构建自定义源变得更简单。您可以使用它来编写类似以下内容的代码:

代码语言:javascript
复制
public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    StreamSource<String> source = SourceBuilder
            .timestampedStream("http-trickle", x -> new HttpSource())
            .fillBufferFn(HttpSource::addToBuffer)
            .destroyFn(HttpSource::destroy)
            .build();
    StreamStage<String> srcStage = pipeline.drawFrom(source);
}

private static class HttpSource {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
    private final ArrayList<String> buffer = new ArrayList<>();
    private final AsyncClient<String> client = 
        new AsyncClient<String>().addReceiveListener(queue::add);

    void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
        queue.drainTo(buffer);
        for (String line : buffer) {
            sourceBuffer.add(line, extractTimestamp(line));
        }
        buffer.clear();
    }

    void destroy() {
        client.close();
    }
}

在这里,我使用了一个模拟的AsyncClient,它应该代表您实际的异步HTTP客户端。它希望您提供一个回调,以便在传入数据到达时对其进行处理。Jet的源代码构建器要求您提供另一个回调fillBufferFn,该回调将数据发送到处理管道。

您对AsyncClient的回调应该将数据推送到并发队列,并且您的fillBufferFn应该将队列排空到Jet的源缓冲区。

您可能会尝试简化我给出的代码:

代码语言:javascript
复制
void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
    for (String line; (line = queue.poll()) != null;) {
        sourceBuffer.add(line, extractTimestamp(line));
    }
}

这避免了中间缓冲区位于并发队列和Jet的源缓冲区之间。实际上,它在大多数情况下都是有效的,但如果您遇到了一个强大的流量峰值,addToBufferDirect可能永远不会完成。这将违反与Jet的合同,该合同要求您在一秒左右的时间内从fillBufferFn返回。

我们已经认识到,这种使用源构建器和异步客户端API的模式是非常常见的,我们计划提供更多的便利来处理它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52471721

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档