我了解如何在Node的新Streams2库中使用可写流,但我不知道如何使用可读的流。
以dgram模块周围的流包装器为例:
var dgram = require('dgram');
var thumbs = {
twiddle: function() {}
};
var defaults = {
address: '0.0.0.0',
type: 'udp4',
port: 12345,
broadcast: null,
multicast: null,
multicastTTL: 1
};
var UDPStream = function(options) {
if (!(this instanceof UDPStream))
return new UDPStream(options);
Duplex.call(this);
options = options || {};
this.address = options.address || defaults.address;
this.type = options.type || defaults.type;
this.port = options.port || defaults.port;
this.broadcast = options.broadcast || defaults.broadcast;
this.multicast = options.multicast || defaults.multicast;
this.multicastTTL = options.multicastTTL || defaults.multicastTTL;
this._socket = dgram.createSocket(this.type, setup.bind(this));
this._socket.on('message', this.push.bind(this));
};
util.inherits(UDPStream, Duplex);
var setup = function() {
if (this.multicast) {
this._socket.addMembership(this.multicast);
this._socket.setMulticastTTL(this.multicastTTL);
this.destination = this.multicast;
} else {
// default to using broadcast if multicast address is not specified.
this._socket.setBroadcast(true);
// TODO: get the default broadcast address from os.networkInterfaces() (not currently returned)
this.destination = this.broadcast || '255.255.255.255';
}
};
UDPStream.prototype._read = function(size) {
thumbs.twiddle();
};
UDPStream.prototype._write = function(chunk, encoding, callback) {
this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
callback();
};
module.exports = UDPStream;除了_read实现之外,一切都是有意义的。因为我不明白我应该在那里做什么。当udp套接字发出新消息时,我的数据会被推送,但我无法暂停或恢复基础资源。这看起来应该是什么样子?
发布于 2013-10-23 17:56:54
答案很简单:如果真的没有办法对底层资源施加背压,那么_read实现就是空的。在到达highWaterMark之前,流将负责排队等待被推送的数据,但在这一点之后没有任何保证。医生们说,您应该“只要数据变得可用就提供它”。
发布于 2013-07-11 20:47:08
_read是暂停恢复机制的一部分。来自NodeJS API文档
当数据可用时,通过调用readable.push(块)将其放入读队列中。如果push返回false,那么您应该停止阅读。当再次调用_read时,您应该开始推送更多的数据。
因此,在_write函数中,如果socket.send调用失败,返回false或调用回调时出错,则应该暂停流。那么_read就可以简单地做this._paused = false了
可能长得像这样。
UDPStream.prototype._read = function() {
this._paused = false;
}
UDPStream.prototype._write = function(chunk, encoding, callback) {
if(!this._paused)
this._socket.send(chunk, 0, chunk.length, this.port, this.destination);
};https://stackoverflow.com/questions/17600705
复制相似问题