我有一个BLE设备的列表,并且正在使用RxJava与它们交互。我需要从列表中发出一个项,重复地向它写入一个特征,直到X发生,然后继续到列表中的下一个项。
当前代码:
Observable.from(mDevices)
.flatMap(new Func1<Device, Observable<?>>() {
@Override
public Observable<?> call(Device device) {
Log.d(TAG, "connecting for policing");
return device.connectForPolicing();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "subscribing... ");
}
});其中.connectForPolicing()看起来像:
public Observable<byte[]> connectForPolice() {
....
return device.establishConnection(mContext, false)
.flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(RxBleConnection rxBleConnection) {
byte[] value = new byte[1];
value[0] = (byte) (3 & 0xFF);
//Buzz the device
return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
}
})
.repeat(3)//ignore
.takeUntil(device.observeConnectionStateChanges().filter(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() {
@Override
public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
return rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTING;
}
}));
}这段代码似乎会立即发出列表中的所有项,因此将同时连接所有项并发出嗡嗡声。我如何一次发出一个项目,以便与它们进行交互?
伪代码应该是这样的:
for(Device device : devices) {
device.connect();
while(device.isConnected()) {
device.beep();
}
}发布于 2016-11-18 08:00:02
用concatMap替换flatMap
.concatMap(device -> device.connectForPolicing())flatMap使用merge运算符。它会立即发出所有的项。虽然concatMap使用concat,但它会按顺序发送项目。Good article about it.
发布于 2016-11-18 15:48:11
您可以使用.flatMap(Observable, int)运算符。
Observable.from(mDevices)
.flatMap(
new Func1<Device, Observable<?>>() {
@Override
public Observable<?> call(Device device) {
Log.d(TAG, "connecting for policing");
return device.connectForPolicing();
}
},
1
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "subscribing... ");
}
});int参数限制最大并发操作数。在这种情况下,它将被顺序处理。
如果你想反复使设备嗡嗡作响,直到它断开连接,那么还需要更改connectForPolice()函数:
public Observable<byte[]> connectForPolice(RxBleDevice device) {
....
return device.establishConnection(mContext, false)
.flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { // once the connection is established ...
@Override
public Observable<byte[]> call(RxBleConnection rxBleConnection) {
byte[] value = new byte[1];
value[0] = (byte) (3 & 0xFF);
//Buzz the device
return Observable // ... we return an observable ...
.defer(new Func0<Observable<byte[]>>() {
@Override
public Observable<byte[]> call() {
return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); // ... (that on each subscription will emit a fresh write characteristic observable) ...
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // ... which we will subscribe (the Observable.defer()) again ...
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.delay(10, TimeUnit.SECONDS); // ... after 10 seconds from the previous complete
}
});
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends byte[]>>() { // if the device will trigger disconnect then a BleDisconnectedException will be thrown ...
@Override
public Observable<? extends byte[]> call(Throwable throwable) {
return Observable.empty(); // ... in which situation we will just finish the Observable
}
});
}https://stackoverflow.com/questions/40667038
复制相似问题