首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么每次调用Readable._read()时Readable.push()都返回false

为什么每次调用Readable._read()时Readable.push()都返回false
EN

Stack Overflow用户
提问于 2016-08-08 06:39:13
回答 2查看 707关注 0票数 0

我在typescript中有以下可读流:

代码语言:javascript
复制
import {Readable} from "stream";

enum InputState {
    NOT_READABLE,
    READABLE,
    ENDED
}

export class Aggregator extends Readable {

    private inputs: Array<NodeJS.ReadableStream>;
    private states: Array<InputState>;
    private records: Array<any>;

    constructor(options, inputs: Array<NodeJS.ReadableStream>) {
        // force object mode
        options.objectMode = true;

        super(options);

        this.inputs = inputs;

        // set initial state
        this.states = this.inputs.map(() => InputState.NOT_READABLE);
        this.records = this.inputs.map(() => null);

        // register event handlers for input streams
        this.inputs.forEach((input, i) => {
            input.on("readable", () => {
                console.log("input", i, "readable event fired");

                this.states[i] = InputState.READABLE;

                if (this._readable) { this.emit("_readable"); }
            });

            input.on("end", () => {
                console.log("input", i, "end event fired");

                this.states[i] = InputState.ENDED;

                // if (this._end) { this.push(null); return; }

                if (this._readable) { this.emit("_readable"); }
            });
        });
    }

    get _readable () {
        return this.states.every(
            state => state === InputState.READABLE ||
            state === InputState.ENDED);
    }

    get _end () {
        return this.states.every(state => state === InputState.ENDED);
    }

    _aggregate () {
        console.log("calling _aggregate");

        let timestamp = Infinity,
            indexes = [];

        console.log("initial record state", JSON.stringify(this.records));

        this.records.forEach((record, i) => {
            // try to read missing records
            if (!this.records[i] && this.states[i] !== InputState.ENDED) {
                this.records[i] = this.inputs[i].read();

                if (!this.records[i]) {
                    this.states[i] = InputState.NOT_READABLE;
                    return;
                }
            }

            // update timestamp if a better one is found
            if (this.records[i] && timestamp > this.records[i].t) {
                timestamp = this.records[i].t;

                // clean the indexes array
                indexes.length = 0;
            }

            // include the record index if has the required timestamp
            if (this.records[i] && this.records[i].t === timestamp) {
                indexes.push(i);
            }
        });

        console.log("final record state", JSON.stringify(this.records), indexes, timestamp);

        // end prematurely if after trying to read inputs the aggregator is
        // not ready
        if (!this._readable) {
            console.log("end prematurely trying to read inputs", this.states);
            this.push(null);
            return;
        }

        // end prematurely if all inputs are ended and there is no remaining
        // record values
        if (this._end && indexes.length === 0) {
            console.log("end on empty indexes", this.states);
            this.push(null);
            return;
        }

        // create the aggregated record
        let record = {
            t: timestamp,
            v: this.records.map(
                (r, i) => indexes.indexOf(i) !== -1 ? r.v : null
            )
        };

        console.log("aggregated record", JSON.stringify(record));

        if (this.push(record)) {
            console.log("record pushed downstream");
            // remove records already aggregated and pushed
            indexes.forEach(i => { this.records[i] = null; });

            this.records.forEach((record, i) => {
                // try to read missing records
                if (!this.records[i] && this.states[i] !== InputState.ENDED) {
                    this.records[i] = this.inputs[i].read();

                    if (!this.records[i]) {
                        this.states[i] = InputState.NOT_READABLE;
                    }
                }
            });
        } else {
            console.log("record failed to push downstream");
        }
    }

    _read () {
        console.log("calling _read", this._readable);
        if (this._readable) { this._aggregate(); }
        else {
            this.once("_readable", this._aggregate.bind(this));
        }
    }
}

它旨在以对象模式聚合多个输入流。最后,它将多个时间序列数据流聚合成一个单独的数据流。我面临的问题是,当我测试这个特性时,我反复看到消息record failed to push downstream,然后立即看到消息calling _read true,并且只在与聚合算法相关的3条消息之间。因此,可读的流机制会调用_read,并且每次调用push()都会失败。你知道为什么会发生这种事吗?你知道有实现这种算法的库吗?或者有更好的方法来实现这个特性吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-08-09 06:39:46

我会回答我自己的问题。

问题是我误解了this.push()返回值调用的含义。我认为返回值为false表示当前推送操作失败,但真正的含义是下一次推送操作将失败。

对上面显示的代码的一个简单修复方法是替换以下代码:

代码语言:javascript
复制
if (this.push(record)) {
    console.log("record pushed downstream");
    // remove records already aggregated and pushed
    indexes.forEach(i => { this.records[i] = null; });

    this.records.forEach((record, i) => {
        // try to read missing records
        if (!this.records[i] && this.states[i] !== InputState.ENDED) {
            this.records[i] = this.inputs[i].read();

            if (!this.records[i]) {
                this.states[i] = InputState.NOT_READABLE;
            }
        }
    });
} else {
    console.log("record failed to push downstream");
}

通过以下方式:

代码语言:javascript
复制
this.push(record);
console.log("record pushed downstream");
// remove records already aggregated and pushed
indexes.forEach(i => { this.records[i] = null; });

this.records.forEach((record, i) => {
    // try to read missing records
    if (!this.records[i] && this.states[i] !== InputState.ENDED) {
        this.records[i] = this.inputs[i].read();

        if (!this.records[i]) {
            this.states[i] = InputState.NOT_READABLE;
        }
    }
});

您可以注意到,唯一的区别是避免对this.push()调用的返回值执行条件操作。假设当前的实现每个_read()调用只调用一次this.push(),那么这个简单的更改就解决了这个问题。

票数 0
EN

Stack Overflow用户

发布于 2020-10-20 14:38:25

这意味着进食比消耗更快。官方的方法是扩大其highWaterMark,默认值: 16384 (16KB),或者objectMode为16。只要它的内部缓冲区足够大,push函数总是会返回true。它不必在单个_read()中是单个推送()。您可以在单个_read()中推送与highWaterMark指示的一样多的内容。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38819196

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档