我需要订阅一个热流在一个反应自定义钩子。这意味着Ajax请求尚未完成。它不断地从服务器接收数据块(格式良好的JSON)。
我的源代码如下所示:
export const useQData = (
resourceId: string,
): {
qDataResult: Result<QData>;
} => {
const [qData, setQData] = useState<Result<QData>>({ status: "initial" });
function handleQData(qData: Result<QData>) {
setQData(qData);
}
// get qData initially
useEffect(() => {
debugger;
const observable = ajax
.getJSON<QData>(`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`)
.pipe(startWith(<QData>{ tileStatus: "NOTILE" }));
const c = connectable<QData>(observable);
c.connect();
const subscription = c.subscribe({
next: (response) => handleQData({ status: "update", payload: response }),
error: (error) => handleQData({ status: "error", responseErrorCode: 404, messageId: error.message }),
complete: () => handleQData({ status: "offline" }), // stream completed
});
return () => subscription.unsubscribe();
}, []);
return {
qDataResult: qData,
};
};由于某种原因,流没有启动。即使是startWith()设置的初始值也不被处理。我是不是错过了开始处理的东西?
更新:
通过为Ajax请求设置includeDownloadProgress: true,我已经实现了运行处理。以下是源代码:
useEffect(() => {
observable = ajax<string>({
url: `${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`,
includeDownloadProgress: true,
responseType: "text",
}).pipe(
// Extract response body from response object
map((response) => response.response),
filter((responseString) => responseString !== ""),
// Extract response objects
map((responseString) => responseString.split("\n")),
filter((responseArray) => responseArray.length > 0),
// Find last valid qdata object
map((responseArray) => extractValidQDataString(responseArray)),
filter((qDataString) => qDataString !== ""),
map((qDataString) => JSON.parse(qDataString) as QData),
startWith(<QData>{ tileStatus: "NOTILE" }),
);
const c = connectable(observable);
const s = c.subscribe({
next: (response) => setQData({ status: "update", payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: "offline" }), // stream completed
});
c.connect();
setSubscription(s);
return () => s.unsubscribe();
}, []);不幸的是,下载进度响应的结果有几个缺点:
发布于 2022-04-03 19:37:31
它包含越来越多的响应。
核心问题是ajax()发出的ajax()(每次都是相同的对象)发出的response属性是随每一块生长。我猜这么长的轮询是为了维护对旧浏览器的支持吧?
如果不是,我建议使用,而不是使用--我刚才链接的那篇文章将详细解释。
const streamResponseBody = (url: string): Observable<string> =>
from(fetch(url)).pipe(
op.concatMap((resp) => {
const reader = resp.body?.getReader();
if (!reader) {
return throwError(() => new Error('response had no body to read'));
}
const encoder = new TextDecoder();
return defer(() => reader.read()).pipe(
op.repeat(),
op.takeWhile(({ done }) => !done),
op.map(({ value }) => encoder.decode(value)),
op.filter((value) => value !== '')
);
})
);现在我们有了一个块流,但是我们需要一个行流,这样我们就可以很容易地将它们转换为QData。阅读您的代码,它看起来是安全的假设,每一个换行符分隔的字符串是潜在的解析-作为QData。当运算符到达时,这个操作符会缓冲块并发出完整的行。
const chunksToLines: MonoTypeOperatorFunction<string> = pipe(
op.scan(
(acc, chunk) => {
const lines = acc.buffer.concat(chunk).split('\n');
return {
lines: lines.slice(0, lines.length - 1),
buffer: lines[lines.length - 1],
};
},
{ buffer: '', lines: [] as readonly string[] }
),
op.concatMap(({ lines }) => lines)
);响应不能作为JSON直接检索
是啊,那是不可避免的。没有任何rxjs导出将直接与您特定的长轮询实现集成。但是您的代码仍然可以到达一个非常干净的地方:
// This function is a combination of extractValidQDataString and
// JSON.parse(qDataString), except it doesn't have to handle arrays.
declare const stringToQData: (input: string) => QData | undefined;
useEffect(() => {
const s = streamResponseBody(
`${process.env.REACT_APP_API_BASE}/subscription/msnOutput/${resourceId}`
)
.pipe(
chunksToLines,
op.map(stringToQData),
op.filter((value): value is QData => !!value),
op.startWith(<QData>{ tileStatus: 'NOTILE' })
)
.subscribe({
next: (response) => setQData({ status: 'update', payload: response }),
error: (error) => handleError(error),
complete: () => setQData({ status: 'offline' }), // stream completed
});
setSubscription(s);
return () => s.unsubscribe();
}, []);另一件值得指出的事情是:书面的示例没有利用“热”可观察到的优点。功能将是相同的,而不需要使用connect并直接订阅。这是因为连接的可观测性在useEffect回调之外是不可访问的,因此不能接收其他订户。
https://stackoverflow.com/questions/70578580
复制相似问题