我刚刚开始使用node.js和postgres,使用node-postgres。我尝试做的一件事是编写一个简短的js来填充我的数据库,使用一个包含大约200,000个条目的文件。
我注意到一段时间(不到10秒)后,我开始得到“错误:连接终止”。我不确定这是因为我使用node-postgres的方式有问题,还是因为我在向postgres发送垃圾邮件。
无论如何,这里有一个简单的代码来说明这种行为:
var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";
pg.connect(connectionString, function(err,client,done){
if(err) {
return console.error('could not connect to postgres', err);
}
client.query("DROP TABLE IF EXISTS testDB");
client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)");
done();
for (i = 0; i < 1000000; i++){
client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")", function(err,result){
if (err) {
return console.error('Error inserting query', err);
}
done();
});
}
});它在大约18,000-20,000次查询后失败。这是使用client.query的错误方式吗?我尝试更改默认客户端号,但似乎没有帮助。
client.connect()似乎也没有帮助,但那是因为我有太多的客户,所以我绝对认为客户端池是可行的。
谢谢你的帮助!
发布于 2015-03-29 07:26:15
更新
这个答案已经被本文所取代:Data Imports,它代表了最新的方法。
为了复制您的场景,我使用了pg-promise库,我可以确认,正面尝试它永远不会起作用,无论您使用哪个库,重要的是方法。
下面是一种改进的方法,我们将插入分区到块中,然后在事务中执行每个块,这是负载平衡(也称为节流):
function insertRecords(N) {
return db.tx(function (ctx) {
var queries = [];
for (var i = 1; i <= N; i++) {
queries.push(ctx.none('insert into test(name) values($1)', 'name-' + i));
}
return promise.all(queries);
});
}
function insertAll(idx) {
if (!idx) {
idx = 0;
}
return insertRecords(100000)
.then(function () {
if (idx >= 9) {
return promise.resolve('SUCCESS');
} else {
return insertAll(++idx);
}
}, function (reason) {
return promise.reject(reason);
});
}
insertAll()
.then(function (data) {
console.log(data);
}, function (reason) {
console.log(reason);
})
.done(function () {
pgp.end();
});这在大约4分钟内产生了1,000,000条记录,在前3个事务之后显著减慢。我使用的是Node JS 0.10.38 (64位),它消耗了大约340MB的内存。这样,我们插入了100,000条记录,连续插入了10次。
如果我们做同样的事情,只是这次在100个事务中插入10,000条记录,同样的1,000,000条记录在1m25秒内添加,没有减慢,Node JS消耗了大约100MB的内存,这告诉我们像这样对数据进行分区是一个非常好的想法。
使用哪个库并不重要,方法应该是相同的:
在每个事务提交之后,以大约10,000 records;
如果你违反了这些规则中的任何一条,你肯定会有麻烦。例如,如果您违反了规则3,您的Node JS进程很可能很快就会耗尽内存并抛出错误。我的示例中的规则4是由库提供的。
如果您遵循此模式,则不需要为连接池设置而烦恼。
更新1
后续版本的pg-promise完全支持此类场景,如下所示:
function factory(index) {
if (index < 1000000) {
return this.query('insert into test(name) values($1)', 'name-' + index);
}
}
db.tx(function () {
return this.batch([
this.none('drop table if exists test'),
this.none('create table test(id serial, name text)'),
this.sequence(factory), // key method
this.one('select count(*) from test')
]);
})
.then(function (data) {
console.log("COUNT:", data[3].count);
})
.catch(function (error) {
console.log("ERROR:", error);
});如果你不想包含任何额外的东西,比如创建表,那么它看起来就更简单了:
function factory(index) {
if (index < 1000000) {
return this.query('insert into test(name) values($1)', 'name-' + index);
}
}
db.tx(function () {
return this.sequence(factory);
})
.then(function (data) {
// success;
})
.catch(function (error) {
// error;
});详情请参见Synchronous Transactions。
例如,使用Bluebird作为promise库,在我的生产机器上插入1,000,000条记录(没有启用长堆栈跟踪)需要1m43s。
您只需让您的factory方法根据index返回请求,直到您没有剩余的请求,就这么简单。
最好的部分是,这不仅速度快,而且对NodeJS进程的负载也很小。内存测试过程在整个测试过程中保持在60MB以下,仅占用CPU时间的7-8%。
更新2
从1.7.2版本开始,pg-promise可以轻松地支持超大规模事务。请参阅第Synchronous Transactions章。
例如,我可以在使用Windows 8.1 64位的家用PC上,在短短15分钟内在单个事务中插入10,000,000条记录。
为了进行测试,我将我的PC设置为生产模式,并使用Bluebird作为promise库。在测试期间,整个NodeJS 0.12.5 (64位)进程的内存消耗没有超过75MB,而我的i7-4770CPU显示稳定的15%负载。
以同样的方式插入1亿条记录只需要更多的耐心,而不需要更多的计算机资源。
同时,之前针对1m插入的测试从1m43s下降到1m31s。
更新3
以下注意事项可能会产生巨大的不同:Performance Boost。
更新4
相关问题,还有一个更好的实现示例:Massive inserts with pg-promise。
更新5
可以在这里找到一个更好、更新的示例:nodeJS inserting Data into PostgreSQL error
发布于 2015-03-17 22:00:49
我猜你已经达到最大池大小了。因为client.query是异步的,所以所有可用的连接在返回之前都会被使用。
默认池大小为10。选中此处:https://github.com/brianc/node-postgres/blob/master/lib/defaults.js#L27
您可以通过设置pg.defaults.poolSize来增加默认池大小:
pg.defaults.poolSize = 20;更新:释放连接后执行另一个查询。
var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";
var MAX_POOL_SIZE = 25;
pg.defaults.poolSize = MAX_POOL_SIZE;
pg.connect(connectionString, function(err,client,done){
if(err) {
return console.error('could not connect to postgres', err);
}
var release = function() {
done();
i++;
if(i < 1000000)
insertQ();
};
var insertQ = function() {
client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")", function(err,result){
if (err) {
return console.error('Error inserting query', err);
}
release();
});
};
client.query("DROP TABLE IF EXISTS testDB");
client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)");
done();
for (i = 0; i < MAX_POOL_SIZE; i++){
insertQ();
}
});基本思想是,由于您正在使用相对较小的连接池大小对大量查询进行排队,因此您将达到最大池大小。在这里,我们只在一个已有的连接被释放之后进行新的查询。
https://stackoverflow.com/questions/29100807
复制相似问题