首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使rxjs暂停/恢复?

如何使rxjs暂停/恢复?
EN

Stack Overflow用户
提问于 2020-02-20 13:00:02
回答 3查看 2K关注 0票数 1

现在有一个数组,数组值是图像链接,例如:

代码语言:javascript
复制
const imageList = [
  'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
  // more...
];

我想使用rxjs按顺序下载(我是一个电子应用程序,所以我可以下载)

下载完第一张图片后,再下载第二张图片。下载第三张图片时,用户单击“暂停”按钮,等待第三张图片的下载完成。那就不能再下载了。当用户单击“继续”按钮时,从第四张图片开始下载。

我参考了本文:Buffering (lossless)一节( https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd )。本文的代码是:

代码语言:javascript
复制
merge(
  source$.pipe( bufferToggle(off$, ()=>on$)  ),
  source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
  // then flatten buffer arrays and window Observables
  flatMap(x => x)
)

演示网址是:https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd

但是在这个代码中有两个问题,它们不能满足我的需要。我不知道怎么修改它。

  1. 我使用redux-observable,所以我想用两个操作来触发它们:(this.props.start() / this.props.pause())
  2. 恢复后,数据仍然一个接一个地传输,而不是一次性发布。

当前的代码如下所示:

代码语言:javascript
复制
export const epicDownloadResources = (
  action$: ActionsObservable<AnyAction>,
  store$: StateObservable<RootState>,
) => {
  return action$.pipe(
    ofType(appActions.other.start()),
    of([
      'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
    ]),
    mergeMap(() => {
      // code
    }),
    mergeMap((url: string) => {
      // downloading
    })
}

在真正的产品中,它将下载公司的内部图片资源,而不是其他非版权图片。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-02-21 09:44:29

我采取了完全不同的方法。

如果我正确理解,您希望在用户恢复后继续进行。这实际上意味着做窗口或缓冲是没有意义的。

在我看来,简单地使用concatMap嵌套就足够了。

代码语言:javascript
复制
const pause$ = fromEvent(pauseButton, "click").pipe(
 mapTo(false),
);

const resume$ = fromEvent(resumeButton, "click").pipe(
 mapTo(true),
);

const pauseResume$ = merge(pause$,resume$).pipe(
 startWith(true),
 shareReplay(1),
)

const source = of(...imageList).pipe(
 concatMap((url, i) =>
   pauseResume$.pipe(
     tap(v => console.log(`should resume ${v}`)),
     filter(v => v), // Only resume if true
     take(1),
     concatMap(() =>
       from(fetch(url)).pipe(
         delay(1000), // Simulate slow request
         mapTo(i) // just for logging which request we just completed
       )
     )
   )
 )
);
source.subscribe(x => console.log(x));

这将暂停启动新请求,直到简历$发出新值为止。我相信这是你根据你的情况想要的。

我不确定您是否希望在用户暂停的情况下完成第三个请求。我假设您会这样做,但如果没有,则可以在请求后使用另一个concatMap来暂停使用过滤器的pauseResume$ with filter。

stackblitz

票数 3
EN

Stack Overflow用户

发布于 2020-02-20 19:36:59

以下是我的尝试:

代码语言:javascript
复制
const urlArr = Array.from({ length: 10 }, (_, idx) => 'url/' + idx);
let idx = 0;

const urlEmitter = new Subject();
const url$ = urlEmitter.asObservable();
const stopEmitter = new Subject();
const stopValues$ = stopEmitter.asObservable();

const start$ = fromEvent(start, 'click');
start$.pipe(take(1)).subscribe(() => (stopEmitter.next(false), urlEmitter.next(urlArr[idx++]))); // Start emitting valeus

const stopSubscription = fromEvent(stop, 'click').pipe(mapTo(true)).subscribe(stopEmitter);

const shouldContinue$ = stopValues$.pipe(map(shouldStop => !shouldStop));

const subsequentStartClicks$ = start$.pipe(
  skip(1), // Skip the first time the `start` button is clicked
  observeOn(asyncScheduler), // Make sure it emits after the buffer has been initialized
  tap(() => stopEmitter.next(false)), // shouldContinue$ will emit `true`
);

const downloadedUrls$ = url$.pipe(
  mergeMap(url => of(url).pipe(delay(idx * 500))), // Simulate a file downloading
  combineLatest(shouldContinue$), // Make sure it acts according to `shouldContinue$`
  filter(([_, shouldContinue]) => shouldContinue),
  map(([v]) => v),
  tap((v) => console.warn(v)), // Debugging purposes...

  // Because of `combineLatest`
  // If you click `start` and wait some time, then you click `stop`
  // then you click again `start`, you might get the last value added to the array
  // this is because `shouldContinue$` emitted a new value
  // So you want to make sure you won't get the same value multiple times
  distinctUntilChanged(), 

  tap(() => urlEmitter.next(urlArr[idx++])),

  bufferToggle(
    start$,
    () => stopValues$.pipe(filter(v => !!v)),
  )
);

merge(
  subsequentStartClicks$.pipe(mapTo(false)), // Might not be interested in click events 
  downloadedUrls$
)
  .pipe(filter(v => !!v))
  .subscribe(console.log);

我受到了罗氏图表的启发。

我的想法是遵循同样的方法,但只有在start$流发出时才发出值,stop$发出时才应该停止。

代码语言:javascript
复制
----X--X----------------------------------> urls$

-Y----------------------------------------> start$

-----------Z------------------------------> end$


-----------[X, X]-------------------------------> urls$

每次按stop按钮时,都会将一个true值推入stopValues$流中。shouldContinue$确定url$流是否应该继续推送值,这取决于stopValues$

StackBlitz

票数 1
EN

Stack Overflow用户

发布于 2020-10-06 12:41:46

暂停并恢复一条可观测的溪流,请提出更好的选择。

delayWhen是一个非常强大的运算符。我的解决方案使用mergeMapdelayWhen。 特点:重试,节气,暂停,恢复

  1. 创建和订阅可观察到的
代码语言:javascript
复制
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));

const throttledTask$ = source$.pipe(
  mergeMap((item) => {
    return of(item).pipe(
      delayWhen(() => pass$),
      mergeMap(async (item) => {
         // you can also throw some errors
         return await new Promise((resolve)=>
             setTimeout(resolve(item), Math.random()*1000))
      }),
      retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
    );
  }, concurrentLimit)

const subscription = throttledTask$.subscribe(x => console.log(x))

  1. 添加暂停/恢复事件处理程序
代码语言:javascript
复制
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }

解释:

  1. delayWhen将暂停流,直到pass$信号发出。
  2. BehaviorSubject用于合成pass$信号,在订阅时将发出最后一个值。
  3. mergeMap可以处理异步任务,并具有并发线程计数限制参数。当delayWhen暂停一个流时,该流将留在mergeMap中,并占用一个并发的“线程”。
  4. retryWhen将重新订阅,直到errors$.pipe(delay(1000), take(retryLimit))发出完整或错误。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60320538

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档