首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java 8 lambda api

Java 8 lambda api
EN

Stack Overflow用户
提问于 2015-02-22 02:41:40
回答 2查看 838关注 0票数 10

我正在努力从Rx Java迁移到Java 8 lambdas。我找不到的一个例子是缓冲请求的方法。例如,在Rx Java中,我可以这样说。

代码语言:javascript
复制
Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));

我们将20个元素缓冲到一个列表中,或者在1000毫秒时超时,这是最先发生的。

RX中的observable是一种"push“风格的observable,而as Streams使用的是java pull。这是否有可能在流中实现我自己的映射操作,或者无法发出会导致问题,因为doOnNext必须轮询前一个元素?

EN

回答 2

Stack Overflow用户

发布于 2015-02-25 13:32:51

一种方法是使用BlockingQueue和芭乐。使用Queues.drain,您可以创建一个Collection,然后可以对其调用stream()并执行转换。这里有一个链接:Guava Queues.drain

下面是一个简单的例子:

代码语言:javascript
复制
public void transform(BlockingQueue<Something> input) 
{
    List<Something> buffer = new ArrayList<>(20);
    Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
    doWrite(buffer);
}
票数 2
EN

Stack Overflow用户

发布于 2015-06-12 07:33:01

simple-react有类似的运算符,但不是这个运算符。但是它是非常可扩展的,所以应该可以编写自己的代码。需要注意的是,我没有在IDE中编写或测试过它,使用简单反应的超时操作符按大小划分的缓冲区大致如下所示

代码语言:javascript
复制
  import com.aol.simple.react.async.Queue;
  import com.aol.simple.react.stream.traits.LazyFutureStream;
  import com.aol.simple.react.async.Queue.ClosedQueueException;
  import com.aol.simple.react.util.SimpleTimer;
  import java.util.concurrent.TimeUnit;

  static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) { 
    Queue queue = stream.toQueue();
    Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
        return () -> {
            SimpleTimer timer = new SimpleTimer();
            List<U> list = new ArrayList<>();
            try {
                do {
                    if(list.size()==size())
                        return list;
                    list.add(s.get());
                } while (timer.getElapsedNanoseconds()<unit.toNanos(time));
            } catch (ClosedQueueException e) {

                throw new ClosedQueueException(list);
            }
            return list;
        };
    };
    return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28649767

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档