首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >非协调mergeMap行为

非协调mergeMap行为
EN

Stack Overflow用户
提问于 2018-09-12 08:47:44
回答 1查看 79关注 0票数 0

我目前正在研究一种文件上传方法,它要求我限制并发请求的数量。

我已经开始写一个如何处理它的原型

代码语言:javascript
复制
const items = Array.from({ length: 50 }).map((_, n) => n);
from(items)
  .pipe(
    mergeMap(n => {
      return of(n).pipe(delay(2000));
    }, 5)
  )
  .subscribe(n => {
    console.log(n);
  });

但是,当我用实际调用换掉of时,它确实起了作用。它只处理一个块,所以假设20个文件中有5个

代码语言:javascript
复制
from(files)
  .pipe(mergeMap(handleFile, 5))
  .subscribe(console.log);

handleFile函数返回对我的自定义ajax实现的调用

代码语言:javascript
复制
import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';

const { CancelToken } = axios;

class AjaxSubscriber extends Subscriber {
  constructor(destination, settings) {
    super(destination);
    this.send(settings);
  }

  send(settings) {
    const cancelToken = new CancelToken(cancel => {
      // An executor function receives a cancel function as a parameter
      this.cancel = cancel;
    });
    axios(Object.assign({ cancelToken }, settings))
      .then(resp => this.next([null, resp.data]))
      .catch(e => this.next([e, null]));
  }

  next(config) {
    this.done = true;
    const { destination } = this;
    destination.next(config);
  }

  unsubscribe() {
    if (this.cancel) {
      this.cancel();
    }
    super.unsubscribe();
  }
}

export class AjaxObservable extends Observable {
  static create(settings) {
    return new AjaxObservable(settings);
  }

  constructor(settings) {
    super();
    this.settings = settings;
  }

  _subscribe(subscriber) {
    return new AjaxSubscriber(subscriber, this.settings);
  }
}

所以看起来就像这样

代码语言:javascript
复制
function handleFile() {
  return AjaxObservable.create({
    url: "https://jsonplaceholder.typicode.com/todos/1"
  });
}

CodeSandbox

如果我从合并映射函数中删除并发参数,一切都可以正常工作,但它同时上传所有文件。有办法解决这个问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-13 17:17:43

问题是我没有在complete()中调用AjaxSubscriber方法,所以我修改了代码如下:

代码语言:javascript
复制
pass(response) {
  this.next(response);
  this.complete();
}

并从axios调用:

代码语言:javascript
复制
axios(Object.assign({ cancelToken }, settings))
  .then(resp => this.pass([null, resp.data]))
  .catch(e => this.pass([e, null]));
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52291095

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档