首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NodeJs Streams、pipelines和https post

NodeJs Streams、pipelines和https post
EN

Stack Overflow用户
提问于 2020-03-18 12:26:53
回答 2查看 323关注 0票数 0

我需要一点理智的检查。

我正在运行节点11.10.1

我有一个使用nodejs oracledb库从oracle db读取数据的进程。有一个流函数,我执行select *并将结果以10k个对象为一批进行流式处理。然后,我通过https将此数据发布到索引器。对象流被注入到管道函数中。

我使用下面的代码已经有一段时间了。我正在尝试调试吞吐量。有时,我可以看到通过此管道每秒处理大约2k个文档。大多数时候,我看到的是<150。在我开始调试我的索引服务器之前。我想确保这些函数编码正确。

代码语言:javascript
复制
  async function streamReindex(databaseStream) {
    let pipeline = util.promisify(stream.pipeline)
    await pipeline(
      selectStream,//  "oracledb": "^4.0.0", stream function
      camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
      JSONStream.stringify(), //"JSONStream": "^1.3.5"
      reindexClient.streamReindex(core)
    )
  }

// reindexClient code.

  function streamReindex(core) {
    const updateUrl = baseUrl + core + '/update'
    const options = {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      'auth': `${user.username}:${user.password}`,
    }
    let postStream = https.request(updateUrl, options, (res) => {
      let response = {
        status: {
          code: res.statusCode,
          message: res.statusMessage
        },
        headers: res.headers,
      }
      if (res.statusCode !== 200) {
        postStream.destroy(new Error(JSON.stringify(response)))
      }
    })
    postStream.on('error', (err)=>{
      throw new Error(err)
    })
    postStream.on('socket', (socket) => {
      socket.setKeepAlive(true, 240000)
    })
    return postStream
  }


  async function selectStream(sql, bindings = [], fetchSize = 
     fetchArraySize) {
     let connection = await knex.client.acquireConnection()

    log.info(`Fetch size is set to ${fetchSize}`)
    let select = connection.queryStream(sql, bindings, {
      fetchArraySize: fetchSize,
      outFormat: outFormat
    })

    select.on('error', (err) => {
      log.error('Oracle Error Event', err)
      knex.client.releaseConnection(connection)
    })

    select.on('close', () => {
      log.info('Oracle Close Event')
      knex.client.releaseConnection(connection)
      select = null
      connection = null
    })

    return select
  }

如果我从管道中删除reindexClient.streamReindex(核心)函数。我看到每秒大约5k个对象的吞吐量。我正在研究streams的高水位线功能,但似乎不知道如何在postStream上应用它。如果我console.log post流,它也不会说它处于对象模式。这意味着它的高水位线是以字节为单位的,我相信它的阈值很低。

如果你需要更多的信息,我会尽可能多的提供。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-17 08:39:31

因此,我通过自己实现JSONStream.stringify()函数,每秒增加了近500个/docs的吞吐量。因为它不允许我设置水印。一旦我这样做了,我就能够真正地提高它,但我可以使用我所有的内存。使用下面的代码进行设置,我得到了稳定的内存占用,并且具有更好的吞吐量。我还去掉了through2库,并在stringify转换中加入了camelize功能。大部分代码和解释都可以在这里找到:https://blog.dmcquay.com/2017/09/06/node-stream-db-results-with-transform.html

代码如下:

代码语言:javascript
复制
  function camelizeAndStringify() {
    let first = true
    const serialize = new Transform({
      objectMode: true,
      highWaterMark: 1000,
      transform(chunk, encoding, callback) {
        if (first) {
          this.push('[' + JSON.stringify(camelize(chunk)))
          first = false
        } else {
          this.push(',' + JSON.stringify(camelize(chunk)))
        }
        callback()
        chunk = null
      },
      flush(callback) {
        this.push(']')
        callback()
      }
    })
    return serialize
  }
票数 1
EN

Stack Overflow用户

发布于 2020-03-20 09:15:50

虽然您的问题似乎不是oracledb的问题,但我将它放在这里,以便我可以格式化代码。您可能会从调整oracledb流中获得一些性能优势,例如:

代码语言:javascript
复制
   diff --git a/lib/queryStream.js b/lib/queryStream.js
   index 08ddc720..11953e4b 100644
   --- a/lib/queryStream.js
   +++ b/lib/queryStream.js
   @@ -24,7 +24,7 @@ const { Readable } = require('stream');
    class QueryStream extends Readable {

      constructor(rs) {
   -    super({ objectMode: true });
   +    super({ objectMode: true, highWaterMark: 64 });  // choose your own value
    this._fetching = false;
    this._numRows = 0;

允许将高水位线设置为queryStream()选项的PR将是受欢迎的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60733550

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档