我在一个简单的RxJS快速服务器上做了一些简单的NodeJS实验,在这里我比较了处理和处理请求的不同方法(基于本文的https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/)。这是基本设置:
const express = require('express')
const crypto = require('crypto')
const { asyncScheduler, asapScheduler, range } = require('rxjs')
const { promisify } = require('util')
const setImmediatePromise = promisify(setImmediate)
const PID = process.pid
function log(msg) {
console.log(`[${PID}]`, new Date(), msg)
}
const app = express()
function randomString() {
return crypto.randomBytes(100).toString('hex')
}
app.get('/compute-sync', function computeSync(req, res) {
log('computing sync!')
const hash = crypto.createHash('sha256')
for (let i = 0; i < 1e6; i++) {
hash.update(randomString())
}
res.send(hash.digest('hex') + '\n')
})
app.get('/compute-immediate', function computeImmediate(req, res) {
log('computing immediate!')
const hash = crypto.createHash('sha256')
for (let i = 0; i < 1e6; i++) {
await setImmediatePromise(hash.update, randomString())
}
res.send(hash.digest('hex') + '\n')
})
app.get('/compute-rxjs', async function computeRxjs(req, res) {
log('computing Rxjs!')
const hash = crypto.createHash('sha256')
range(0, 1e6, asapScheduler).subscribe({
next() {
hash.update(randomString())
},
complete() {
res.send(hash.digest('hex') + '\n')
},
})
})
app.get('/healthcheck', function healthcheck(req, res) {
log('they check my health')
res.send('all good!\n')
})
const PORT = process.env.PORT || 1337
let server = app.listen(PORT, () => log('server listening on :' + PORT))我的理解是,asapScheduler将在幕后使用setImmediate,那么为什么/compute-immediate端点而不是阻塞事件循环(使服务器响应新的请求),但/compute-rxjs确实阻塞并导致服务器在运行状态端点上超时?
我也尝试过asyncScheduler -这不阻塞,但它确实需要一个数量级的时间来完成比/compute-immediate端点。
我真的很想使用RxJS来处理更复杂的传入请求,但是我觉得这个问题使得这个选择不受欢迎。我遗漏了什么吗?是否有办法使RxJS解决方案以与setImmediate解决方案相同的方式工作?
发布于 2021-02-02 09:50:59
谢谢你的回应!
我现在有了一个基于这个要点的解决方案:https://gist.github.com/neilk/5380684
/compute-rxjs现在看起来如下所示:
app.get('/compute-rxjs', function computeRxjs(req, res) {
log('computing Rxjs!')
const hash = crypto.createHash('sha256')
new Observable(subscriber => {
;(iter = (i = 0, max = 1e6) => {
if (i === max) return subscriber.complete()
subscriber.next(i)
return setImmediate(iter, i + 1, max)
})()
}).subscribe({
next() {
hash.update(randomString())
},
complete() {
res.send(hash.digest('hex') + '\n')
},
})
})它的行为似乎完全符合我的要求(递归--谁知道呢?)--它不阻塞事件循环,运行时间与/compute-immediate端点相同,但为我提供了使用RxJS管道功能的灵活性。
https://stackoverflow.com/questions/65997902
复制相似问题