首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJS调度器

RxJS调度器
EN

Stack Overflow用户
提问于 2021-02-01 18:07:36
回答 1查看 116关注 0票数 1

我在一个简单的RxJS快速服务器上做了一些简单的NodeJS实验,在这里我比较了处理和处理请求的不同方法(基于本文的https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/)。这是基本设置:

代码语言:javascript
复制
const express = require('express')
const crypto = require('crypto')
const { asyncScheduler, asapScheduler, range } = require('rxjs')
const { promisify } = require('util')

const setImmediatePromise = promisify(setImmediate)

const PID = process.pid

function log(msg) {
    console.log(`[${PID}]`, new Date(), msg)
}

const app = express()

function randomString() {
    return crypto.randomBytes(100).toString('hex')
}

app.get('/compute-sync', function computeSync(req, res) {
    log('computing sync!')
    const hash = crypto.createHash('sha256')
    for (let i = 0; i < 1e6; i++) {
        hash.update(randomString())
    }
    res.send(hash.digest('hex') + '\n')
})

app.get('/compute-immediate', function computeImmediate(req, res) {
    log('computing immediate!')

    const hash = crypto.createHash('sha256')

    for (let i = 0; i < 1e6; i++) {
        await setImmediatePromise(hash.update, randomString())
    }

    res.send(hash.digest('hex') + '\n')
})

app.get('/compute-rxjs', async function computeRxjs(req, res) {
    log('computing Rxjs!')

    const hash = crypto.createHash('sha256')

    range(0, 1e6, asapScheduler).subscribe({
        next() {
            hash.update(randomString())
        },
        complete() {
            res.send(hash.digest('hex') + '\n')
        },
    })
})

app.get('/healthcheck', function healthcheck(req, res) {
    log('they check my health')
    res.send('all good!\n')
})

const PORT = process.env.PORT || 1337
let server = app.listen(PORT, () => log('server listening on :' + PORT))

我的理解是,asapScheduler将在幕后使用setImmediate,那么为什么/compute-immediate端点而不是阻塞事件循环(使服务器响应新的请求),但/compute-rxjs确实阻塞并导致服务器在运行状态端点上超时?

我也尝试过asyncScheduler -这不阻塞,但它确实需要一个数量级的时间来完成比/compute-immediate端点。

我真的很想使用RxJS来处理更复杂的传入请求,但是我觉得这个问题使得这个选择不受欢迎。我遗漏了什么吗?是否有办法使RxJS解决方案以与setImmediate解决方案相同的方式工作?

EN

回答 1

Stack Overflow用户

发布于 2021-02-02 09:50:59

谢谢你的回应!

我现在有了一个基于这个要点的解决方案:https://gist.github.com/neilk/5380684

/compute-rxjs现在看起来如下所示:

代码语言:javascript
复制
app.get('/compute-rxjs', function computeRxjs(req, res) {
    log('computing Rxjs!')

    const hash = crypto.createHash('sha256')

    new Observable(subscriber => {
        ;(iter = (i = 0, max = 1e6) => {
            if (i === max) return subscriber.complete()

            subscriber.next(i)
            return setImmediate(iter, i + 1, max)
        })()
    }).subscribe({
        next() {
            hash.update(randomString())
        },
        complete() {
            res.send(hash.digest('hex') + '\n')
        },
    })
})

它的行为似乎完全符合我的要求(递归--谁知道呢?)--它不阻塞事件循环,运行时间与/compute-immediate端点相同,但为我提供了使用RxJS管道功能的灵活性。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65997902

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档