我正在开发一个监听消息队列(ActiveMQ)并将接收到的消息批量添加到redis的node.js组件(每个批必须为20个)。
当从ActiveMQ接收的消息数为每秒10条或更少时,没有问题。
我的问题是,消息每4毫秒就会添加到队列中。这使得添加到批次中的记录的数量有时多于每批次20条。
const stompit = require('stompit');
var uuid = require('node-uuid');
var Redis = require('ioredis');
var redis = new Redis();
var pipeline = redis.pipeline();
var batchCounter = 0;
stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) {
client.subscribe({ destination: 'MyQueue' }, function(err2, msg) {
msg.readString('UTF-8', function(err3, body) {
if (batchCounter >= 20){
pipeline.exec(function(err4, results) {
pipeline = redis.pipeline();
batchCounter = 0;
console.log(results);
});
}
batchCounter++;
pipeline.set(uuid.v1(), JSON.stringify(body));
//client.disconnect();
});
});
});如何解决此问题?谢谢
发布于 2017-06-04 03:09:24
我最终使用标志来控制每批记录的数量。我相信有另一种可能的,可能更有效的解决方法,那就是控制从ActiveMQ读取数据的过程。但是,在这种情况下,flags为我做了这件事。使用标志的完整代码在以下链接中:
https://github.com/TamerB/ToActiveMQToRedis/blob/master/consumer/app.js
发布于 2017-02-14 17:42:05
尝试在调用.exec方法之前重置管道,我假设它是一个异步方法。因为.exec在将来的某个时候运行,所以增量和pipeline.set可以在它之前运行。
下面的代码保存当前管道,并在.exec之前同步创建一个新管道
if (batchCounter >= 20){
let fullpipeline = pipeline;
pipeline = redis.pipeline();
batchCounter = 0;
fullpipeline.exec(function(err4, results) {
console.log(err4, results);
});
}然后,应该只将新消息附加到新管道。
https://stackoverflow.com/questions/42221874
复制相似问题