我正在努力从Rx Java迁移到Java 8 lambdas。我找不到的一个例子是缓冲请求的方法。例如,在Rx Java中,我可以这样说。
Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));我们将20个元素缓冲到一个列表中,或者在1000毫秒时超时,这是最先发生的。
RX中的observable是一种"push“风格的observable,而as Streams使用的是java pull。这是否有可能在流中实现我自己的映射操作,或者无法发出会导致问题,因为doOnNext必须轮询前一个元素?
发布于 2015-02-25 13:32:51
一种方法是使用BlockingQueue和芭乐。使用Queues.drain,您可以创建一个Collection,然后可以对其调用stream()并执行转换。这里有一个链接:Guava Queues.drain
下面是一个简单的例子:
public void transform(BlockingQueue<Something> input)
{
List<Something> buffer = new ArrayList<>(20);
Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
doWrite(buffer);
}发布于 2015-06-12 07:33:01
simple-react有类似的运算符,但不是这个运算符。但是它是非常可扩展的,所以应该可以编写自己的代码。需要注意的是,我没有在IDE中编写或测试过它,使用简单反应的超时操作符按大小划分的缓冲区大致如下所示
import com.aol.simple.react.async.Queue;
import com.aol.simple.react.stream.traits.LazyFutureStream;
import com.aol.simple.react.async.Queue.ClosedQueueException;
import com.aol.simple.react.util.SimpleTimer;
import java.util.concurrent.TimeUnit;
static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) {
Queue queue = stream.toQueue();
Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
return () -> {
SimpleTimer timer = new SimpleTimer();
List<U> list = new ArrayList<>();
try {
do {
if(list.size()==size())
return list;
list.add(s.get());
} while (timer.getElapsedNanoseconds()<unit.toNanos(time));
} catch (ClosedQueueException e) {
throw new ClosedQueueException(list);
}
return list;
};
};
return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
}https://stackoverflow.com/questions/28649767
复制相似问题