我有多个数据源,它们是可观察到的(BehaviorSubject)
const data1$ = new BehaviorSubject({key: 'a', value: 1})
const data2$ = new BehaviorSubject({key: 'b', value: 2})
const data3$ = new BehaviorSubject({key: 'c', value: 3})我有一个聚合其他数据源的根目录
const root$ = new BehaviorSubject({ field: {/* Stores the values of other data sources*/ } })这样做的想法是,当我订阅root$时,我得到了他们的数据的聚合,当dataX$被更新(下一步)时,我得到了更新的聚合。
此外,我希望在添加/删除数据源时接收更新。
到目前为止,这是我的代码,我想不出实现它的方法。
const a1$ = new BehaviorSubject({key: 'a', value: 1})
const a2$ = new BehaviorSubject({key: 'b', value: 2})
const trunk$ = new BehaviorSubject([a1$, a2$])
function add(dataSourch) {
trunk$.next([...trunk$.getValue(), dataSourch])
}
function remove(key) {
const dataArr = trunk$.getValue()
const newArr = dataArr.filter((sourch) => sourch.key !== key)
trunk$.next([...newArr])
}?
当前的:我的问题是在下面的代码中添加一个新的数据源,最终的数据源将触发多个更新
我添加了有关此问题的其他信息和代码。
我有一些基本的数据源,它们由不同的角色订阅,由不同的角色进行修改。同时,我有一个聚合数据源,它将基本数据源的数据聚合到一个字段(聚合数据源本身有其他数据),并由某些角色订阅。基本数据源管理模块支持数据源的添加和删除。

const data1$ = new BehaviorSubject({key: 'a', value: 1})
const data2$ = new BehaviorSubject({key: 'b', value: 1})
const data3$ = new BehaviorSubject({key: 'c', value: 1})
const data4$ = new BehaviorSubject({key: 'd', value: 1})
const addSourch = (curr: any) => (prev: any) => ([curr, ...prev])
const addSourch$ = new Subject()
const deleteSourchByOb = (curr: any) => (arr: any) => {
const newArr = arr.filter((ele: any) => ele !== curr)
return newArr
}
const deleteSourchByOb$ = new Subject()
const deleteAll = () => (prev: any) => ([])
const deleteAll$ = new Subject()
const sourch$ = new BehaviorSubject([data1$, data2$, data3$])
const root$ = merge(
sourch$.pipe(map(arr => (curr: any) => [...arr, ...curr])),
addSourch$.pipe(map(addSourch)),
deleteSourchByOb$.pipe(map(deleteSourchByOb)),
deleteAll$.pipe(map(deleteAll))
).pipe(
scan((state, fn: Function) => fn(state), [])
)
const thunk$ = new BehaviorSubject({ field: { }})
const root2$= root$.pipe(
mergeMap(arr => merge(...arr)),
map((v: any) => (curr: any) => {
const obj = cloneDeep(curr)
obj[v.key] = v
return obj
})
).pipe(
scan((state, fn) => fn(state), {})
)
const final$ = combineLatest([root2$, thunk$]).pipe(
map((arr) => {
arr[1].field = arr[0]
return arr[1]
})
)
final$.subscribe(console.log)发布于 2021-01-14 17:59:20
您可以通过使用扫描运算符来实现此行为。
const { BehaviorSubject, Subject, merge } = rxjs;
const { map, scan } = rxjs.operators;
// Your functions that mutate your state/data aggregation
const add = (data) => (state) => ([...state, data]);
const deleteAll = () => (state) => ([]);
const data1$ = new BehaviorSubject({key: 'a', value: 1})
const data2$ = new BehaviorSubject({key: 'b', value: 2})
const data3$ = new BehaviorSubject({key: 'c', value: 3})
// Observable that represents all add events
const add$ = merge(data1$, data2$, data3$)
// Observable that represents all deleteAll events
const deleteAll$ = new Subject();
const data$ = merge(
// Applies the first (outer) mutate function to your event observables
add$.pipe(map(add)),
deleteAll$.pipe(map(deleteAll))
).pipe(
// Applies the second (inner) mutate function to finally mutate and return your updated state
scan((state, fn) => fn(state), [])
)
data$.subscribe(console.log)
// Sample further events
data1$.next({key: 'd', value: 4})
deleteAll$.next();
data1$.next({key: 'e', value: 5})<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
如果您需要更多的细节,请在评论中要求,我尝试将它们添加到答案中。FYI:考虑到提出问题的主要原因(在rxjs中随时间变化状态),我没有像您在问题中那样实现删除函数。您可以轻松地将此函数调整为给定的add和deleteAll函数。
https://stackoverflow.com/questions/65723432
复制相似问题