我一直在浏览hazelcast-jet文档,寻找一些外部进程异步提供的源的引用--在我的例子中是http post。
我确实查看了Kafka code,因为这似乎是最接近的,但我不知道新到达的事件如何触发任何事情。我假设,这里不会涉及阻塞线程。
我非常感谢任何能更好地理解我如何在一个“stream”元素被点滴输入的环境中使用hazelcast jet的建议。
发布于 2018-09-24 16:45:52
即将发布的Hazelcast Jet 0.7版本引入了source Builder对象,这使得构建自定义源变得更简单。您可以使用它来编写类似以下内容的代码:
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
StreamSource<String> source = SourceBuilder
.timestampedStream("http-trickle", x -> new HttpSource())
.fillBufferFn(HttpSource::addToBuffer)
.destroyFn(HttpSource::destroy)
.build();
StreamStage<String> srcStage = pipeline.drawFrom(source);
}
private static class HttpSource {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
private final ArrayList<String> buffer = new ArrayList<>();
private final AsyncClient<String> client =
new AsyncClient<String>().addReceiveListener(queue::add);
void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
queue.drainTo(buffer);
for (String line : buffer) {
sourceBuffer.add(line, extractTimestamp(line));
}
buffer.clear();
}
void destroy() {
client.close();
}
}在这里,我使用了一个模拟的AsyncClient,它应该代表您实际的异步HTTP客户端。它希望您提供一个回调,以便在传入数据到达时对其进行处理。Jet的源代码构建器要求您提供另一个回调fillBufferFn,该回调将数据发送到处理管道。
您对AsyncClient的回调应该将数据推送到并发队列,并且您的fillBufferFn应该将队列排空到Jet的源缓冲区。
您可能会尝试简化我给出的代码:
void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
for (String line; (line = queue.poll()) != null;) {
sourceBuffer.add(line, extractTimestamp(line));
}
}这避免了中间缓冲区位于并发队列和Jet的源缓冲区之间。实际上,它在大多数情况下都是有效的,但如果您遇到了一个强大的流量峰值,addToBufferDirect可能永远不会完成。这将违反与Jet的合同,该合同要求您在一秒左右的时间内从fillBufferFn返回。
我们已经认识到,这种使用源构建器和异步客户端API的模式是非常常见的,我们计划提供更多的便利来处理它。
https://stackoverflow.com/questions/52471721
复制相似问题