首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有自定义钩子的热流订阅

带有自定义钩子的热流订阅
EN

Stack Overflow用户
提问于 2022-01-04 12:10:07
回答 1查看 156关注 0票数 1

我需要订阅一个热流在一个反应自定义钩子。这意味着Ajax请求尚未完成。它不断地从服务器接收数据块(格式良好的JSON)。

我的源代码如下所示:

代码语言:javascript
复制
export const useQData = (
  resourceId: string,
): {
  qDataResult: Result<QData>;
} => {
  const [qData, setQData] = useState<Result<QData>>({ status: "initial" });

  function handleQData(qData: Result<QData>) {
    setQData(qData);
  }

  // get qData initially
  useEffect(() => {
    debugger;
    const observable = ajax
      .getJSON<QData>(`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`)
      .pipe(startWith(<QData>{ tileStatus: "NOTILE" }));
    const c = connectable<QData>(observable);
    c.connect();
    const subscription = c.subscribe({
      next: (response) => handleQData({ status: "update", payload: response }),
      error: (error) => handleQData({ status: "error", responseErrorCode: 404, messageId: error.message }),
      complete: () => handleQData({ status: "offline" }), // stream completed
    });
    return () => subscription.unsubscribe();
  }, []);

  return {
    qDataResult: qData,
  };
};

由于某种原因,流没有启动。即使是startWith()设置的初始值也不被处理。我是不是错过了开始处理的东西?

更新:

通过为Ajax请求设置includeDownloadProgress: true,我已经实现了运行处理。以下是源代码:

代码语言:javascript
复制
useEffect(() => {
    observable = ajax<string>({
        url: `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`,
        includeDownloadProgress: true,
        responseType: "text",
      }).pipe(
        // Extract response body from response object
        map((response) => response.response),
        filter((responseString) => responseString !== ""),
        // Extract response objects
        map((responseString) => responseString.split("\n")),
        filter((responseArray) => responseArray.length > 0),
        // Find last valid qdata object
        map((responseArray) => extractValidQDataString(responseArray)),
        filter((qDataString) => qDataString !== ""),
        map((qDataString) => JSON.parse(qDataString) as QData),
        startWith(<QData>{ tileStatus: "NOTILE" }),
      );
    const c = connectable(observable);
    const s = c.subscribe({
      next: (response) => setQData({ status: "update", payload: response }),
      error: (error) => handleError(error),
      complete: () => setQData({ status: "offline" }), // stream completed
    });
    c.connect();
    setSubscription(s);
    return () => s.unsubscribe();
  }, []);

不幸的是,下载进度响应的结果有几个缺点:

  • 它包含越来越多的响应。
  • 由于JSON不能直接检索响应,是否有人知道如何解决这些缺陷?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-03 19:37:31

它包含越来越多的响应。

核心问题是ajax()发出的ajax()(每次都是相同的对象)发出的response属性是随每一块生长。我猜这么长的轮询是为了维护对旧浏览器的支持吧?

如果不是,我建议使用,而不是使用--我刚才链接的那篇文章将详细解释。

代码语言:javascript
复制
const streamResponseBody = (url: string): Observable<string> =>
  from(fetch(url)).pipe(
    op.concatMap((resp) => {
      const reader = resp.body?.getReader();
      if (!reader) {
        return throwError(() => new Error('response had no body to read'));
      }
      const encoder = new TextDecoder();
      return defer(() => reader.read()).pipe(
        op.repeat(),
        op.takeWhile(({ done }) => !done),
        op.map(({ value }) => encoder.decode(value)),
        op.filter((value) => value !== '')
      );
    })
  );

现在我们有了一个块流,但是我们需要一个行流,这样我们就可以很容易地将它们转换为QData。阅读您的代码,它看起来是安全的假设,每一个换行符分隔的字符串是潜在的解析-作为QData。当运算符到达时,这个操作符会缓冲块并发出完整的行。

代码语言:javascript
复制
const chunksToLines: MonoTypeOperatorFunction<string> = pipe(
  op.scan(
    (acc, chunk) => {
      const lines = acc.buffer.concat(chunk).split('\n');
      return {
        lines: lines.slice(0, lines.length - 1),
        buffer: lines[lines.length - 1],
      };
    },
    { buffer: '', lines: [] as readonly string[] }
  ),
  op.concatMap(({ lines }) => lines)
);

响应不能作为JSON直接检索

是啊,那是不可避免的。没有任何rxjs导出将直接与您特定的长轮询实现集成。但是您的代码仍然可以到达一个非常干净的地方:

代码语言:javascript
复制
// This function is a combination of extractValidQDataString and
// JSON.parse(qDataString), except it doesn't have to handle arrays.
declare const stringToQData: (input: string) => QData | undefined;

useEffect(() => {
 const s = streamResponseBody(
   `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`
 )
   .pipe(
     chunksToLines,
     op.map(stringToQData),
     op.filter((value): value is QData => !!value),
     op.startWith(<QData>{ tileStatus: 'NOTILE' })
   )
   .subscribe({
     next: (response) => setQData({ status: 'update', payload: response }),
     error: (error) => handleError(error),
     complete: () => setQData({ status: 'offline' }), // stream completed
   });
 setSubscription(s);
 return () => s.unsubscribe();
}, []);

另一件值得指出的事情是:书面的示例没有利用“热”可观察到的优点。功能将是相同的,而不需要使用connect并直接订阅。这是因为连接的可观测性在useEffect回调之外是不可访问的,因此不能接收其他订户。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70578580

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档