首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >一次发出一个项目,与其交互直到满足条件,然后继续下一个项目

一次发出一个项目,与其交互直到满足条件,然后继续下一个项目
EN

Stack Overflow用户
提问于 2016-11-18 07:41:53
回答 2查看 590关注 0票数 1

我有一个BLE设备的列表,并且正在使用RxJava与它们交互。我需要从列表中发出一个项,重复地向它写入一个特征,直到X发生,然后继续到列表中的下一个项。

当前代码:

代码语言:javascript
复制
Observable.from(mDevices)
                .flatMap(new Func1<Device, Observable<?>>() {
                    @Override
                    public Observable<?> call(Device device) {
                        Log.d(TAG, "connecting for policing");
                        return device.connectForPolicing();
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        Log.d(TAG, "subscribing... ");
                    }
                });

其中.connectForPolicing()看起来像:

代码语言:javascript
复制
public Observable<byte[]> connectForPolice() {

        ....

        return device.establishConnection(mContext, false)
                .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                        byte[] value = new byte[1];
                        value[0] = (byte) (3 & 0xFF);
                        //Buzz the device
                        return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
                    }
                })
                .repeat(3)//ignore
                .takeUntil(device.observeConnectionStateChanges().filter(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
                    @Override
                    public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {

                        return rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTING;
                    }
                }));
    }

这段代码似乎会立即发出列表中的所有项,因此将同时连接所有项并发出嗡嗡声。我如何一次发出一个项目,以便与它们进行交互?

伪代码应该是这样的:

代码语言:javascript
复制
for(Device device : devices) {
    device.connect();
    while(device.isConnected()) {
        device.beep();
    }
}
EN

回答 2

Stack Overflow用户

发布于 2016-11-18 08:00:02

concatMap替换flatMap

代码语言:javascript
复制
.concatMap(device -> device.connectForPolicing())

flatMap使用merge运算符。它会立即发出所有的项。虽然concatMap使用concat,但它会按顺序发送项目。Good article about it.

票数 1
EN

Stack Overflow用户

发布于 2016-11-18 15:48:11

您可以使用.flatMap(Observable, int)运算符。

代码语言:javascript
复制
Observable.from(mDevices)
            .flatMap(
                new Func1<Device, Observable<?>>() {
                    @Override
                    public Observable<?> call(Device device) {
                        Log.d(TAG, "connecting for policing");
                        return device.connectForPolicing();
                    }
                },
                1
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    Log.d(TAG, "subscribing... ");
                }
            });

int参数限制最大并发操作数。在这种情况下,它将被顺序处理。

如果你想反复使设备嗡嗡作响,直到它断开连接,那么还需要更改connectForPolice()函数:

代码语言:javascript
复制
public Observable<byte[]> connectForPolice(RxBleDevice device) {

    ....

    return device.establishConnection(mContext, false)
            .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { // once the connection is established ...
                @Override
                public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                    byte[] value = new byte[1];
                    value[0] = (byte) (3 & 0xFF);
                    //Buzz the device
                    return Observable // ... we return an observable ...
                            .defer(new Func0<Observable<byte[]>>() {
                                @Override
                                public Observable<byte[]> call() {
                                    return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); // ... (that on each subscription will emit a fresh write characteristic observable) ...
                                }
                            })
                            .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // ... which we will subscribe (the Observable.defer()) again ...
                                @Override
                                public Observable<?> call(Observable<? extends Void> observable) {
                                    return observable.delay(10, TimeUnit.SECONDS); // ... after 10 seconds from the previous complete
                                }
                            });
                }
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<? extends byte[]>>() { // if the device will trigger disconnect then a BleDisconnectedException will be thrown ...
                @Override
                public Observable<? extends byte[]> call(Throwable throwable) {
                    return Observable.empty(); // ... in which situation we will just finish the Observable
                }
            });
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40667038

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档