通过订阅getOrderBook,我将通过Netty和Websocket获取数据。
当一个完整的OrderBook第一次到达时,然后是增量更新。因此,我得到了包含所有更改的完整OrderBook。
请告诉我如何创建另一个可观察的,以便我可以通过订阅单独接收更新(incrementalUpdateData)中的数据?
private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();
public Observable<OrderBook> getOrderBook(Instrument instrument) {
return service.subscribeChannel(instrument).flatMap(jsonNode -> {
if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
orderBookMap.put(instrument, orderBook);
return Observable.just(orderBook);
} else { //second update and later - incremental update
OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
orderBook.update(incrementalUpdateData);
return Observable.just(orderBook);
}
});
}我还指望什么呢。
getOrderBook.subscribe(some instrument) -获得完整的orderBook
getOrderBookUpdate.subscribe(some instrument) -只获取增量数据
发布于 2022-11-12 05:57:38
通过出版主体完成
结果代码如下:
private final Map<Instrument, PublishSubject<OrderBookUpdate>> orderBookUpdatesSubscriptions;<br/>
private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();
public Observable<OrderBook> getOrderBook(Instrument instrument) {
return service.subscribeChannel(instrument).flatMap(jsonNode -> {
if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
orderBookMap.put(instrument, orderBook);
return Observable.just(orderBook);
} else { //second update and later - incremental update
OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
orderBookUpdatesSubscriptions.computeIfPresent(instrument, (a,b) -> b.onNext(new OrderBookUpdate(incrementalUpdateData)));
orderBook.update(incrementalUpdateData);
return Observable.just(orderBook);
}
}); }
public Observable<OrderBookUpdate> getOrderBookUpdates(Instrument instrument) {
return orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create());
}https://stackoverflow.com/questions/74380223
复制相似问题