我已经成功地重构了下面的脚本,以便从服务器1上的表中选择记录,然后连接到服务器2,并将丢失的记录插入/忽略到那里的克隆表中。
这是可行的,但运行大约需要1.5分钟。我希望有人能用一种更快、更有效的方式来做这件事,因为它是成功的,但是很昂贵。
我没有选择执行联邦存储或复制,所以这必须用脚本来完成。我以前是通过使用源表的最大ID来做到这一点的,但是在插入之后,我每天丢失最多15条记录。
下面是剧本:
$source_data = mysqli_query($conn,
"select * from `cdrdb`.`session` where ts >= now() - INTERVAL 1 DAY");
while($source = $source_data->fetch_assoc()) {
//Insert new rows into ambition.session
$stmt = $conn2->prepare(
"INSERT IGNORE INTO ambition.session (SESSIONID,
SESSIONTYPE,CALLINGPARTYNO,FINALLYCALLEDPARTYNO,
DIALPLANNAME,TERMINATIONREASONCODE,ISCLEARINGLEGORIGINATING,
CREATIONTIMESTAMP,ALERTINGTIMESTAMP,CONNECTTIMESTAMP,DISCONNECTTIMESTAMP,
HOLDTIMESECS,LEGTYPE1,LEGTYPE2,INTERNALPARTYTYPE1,INTERNALPARTYTYPE2
,SERVICETYPEID1,SERVICETYPEID2,EXTENSIONID1,EXTENSIONID2,
LOCATION1,LOCATION2,TRUNKGROUPNAME1,TRUNKGROUPNAME2,SESSIONIDTRANSFEREDFROM
,SESSIONIDTRANSFEREDTO,ISTRANSFERINITIATEDBYLEG1,
SERVICEEXTENSION1,SERVICEEXTENSION2,SERVICENAME1,
SERVICENAME2,MISSEDUSERID2,ISEMERGENCYCALL,NOTABLECALLID,
RESPONSIBLEUSEREXTENSIONID,
ORIGINALLYCALLEDPARTYNO,ACCOUNTCODE,ACCOUNTCLIENT,ORIGINATINGLEGID
,SYSTEMRESTARTNO,PATTERN,HOLDCOUNT,AUXSESSIONTYPE,
DEVICEID1,DEVICEID2,ISLEG1ORIGINATING,ISLEG2ORIGINATING,
GLOBALCALLID,CADTEMPLATEID,CADTEMPLATEID2,ts,INITIATOR,
ACCOUNTNAME,APPNAME,CALLID,CHRTYPE,CALLERNAME,serviceid1,serviceid2)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
or die(mysqli_error($conn2)) ;
mysqli_stmt_bind_param($stmt,
"iisssiissssiiiiiiiiissssiiissssiiiisssiisiiiiiiiiisisssisii"
,$source['SESSIONID']
,$source['SESSIONTYPE']
,$source['CALLINGPARTYNO']
//omitting other columns for sake of space
);
$stmt->execute() or die(mysqli_error($conn2));
}发布于 2017-10-18 19:21:29
一个简单的改进是将对prepare()的调用移到循环之前。由于准备好的语句每次通过循环都是相同的,所以不需要每次都与DB服务器联系。
您还可以将对bind_param()的调用移到循环之外,因为每次变量都是相同的。bind_param绑定到引用,因此更新变量将改变调用execute()时插入的内容。
然而,这些可能只会产生很小的影响。提高INSERT查询速度的最有效方法之一是一次插入多个行。这比mysqli更容易使用PDO,因为您可以在调用$stmt->execute()时提供一个参数数组。代码看起来应该是:
$params = array();
$count = 0;
$batch_size = 100;
$placeholders = implode(", ", array_fill(0, $batch_size, "(SESSIONID,
SESSIONTYPE,CALLINGPARTYNO,FINALLYCALLEDPARTYNO,
DIALPLANNAME,TERMINATIONREASONCODE,ISCLEARINGLEGORIGINATING,
CREATIONTIMESTAMP,ALERTINGTIMESTAMP,CONNECTTIMESTAMP,DISCONNECTTIMESTAMP,
HOLDTIMESECS,LEGTYPE1,LEGTYPE2,INTERNALPARTYTYPE1,INTERNALPARTYTYPE2
,SERVICETYPEID1,SERVICETYPEID2,EXTENSIONID1,EXTENSIONID2,
LOCATION1,LOCATION2,TRUNKGROUPNAME1,TRUNKGROUPNAME2,SESSIONIDTRANSFEREDFROM
,SESSIONIDTRANSFEREDTO,ISTRANSFERINITIATEDBYLEG1,
SERVICEEXTENSION1,SERVICEEXTENSION2,SERVICENAME1,
SERVICENAME2,MISSEDUSERID2,ISEMERGENCYCALL,NOTABLECALLID,
RESPONSIBLEUSEREXTENSIONID,
ORIGINALLYCALLEDPARTYNO,ACCOUNTCODE,ACCOUNTCLIENT,ORIGINATINGLEGID
,SYSTEMRESTARTNO,PATTERN,HOLDCOUNT,AUXSESSIONTYPE,
DEVICEID1,DEVICEID2,ISLEG1ORIGINATING,ISLEG2ORIGINATING,
GLOBALCALLID,CADTEMPLATEID,CADTEMPLATEID2,ts,INITIATOR,
ACCOUNTNAME,APPNAME,CALLID,CHRTYPE,CALLERNAME,serviceid1,serviceid2)"));
$stmt = $pdo->prepare("INSERT INTO ambition.sentence (<columns>) VALUES $placeholders");
while ($row = $source_data->fetch(PDO::FETCH_NUM)) {
$params += $row; // Append this row to $params
$count++;
if ($count != $batch_size) {
continue;
}
$stmt->execute($params);
// Reset variables for next batch
$params = array();
$count = 0;
}
if ($count) { // Handle the last batch that isn't the full size
$placeholders = implode(", ", array_fill(0, $count, "( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"));
$stmt = $pdo->prepare("INSERT INTO ambition.sentence (SESSIONID,
SESSIONTYPE,CALLINGPARTYNO,FINALLYCALLEDPARTYNO,
DIALPLANNAME,TERMINATIONREASONCODE,ISCLEARINGLEGORIGINATING,
CREATIONTIMESTAMP,ALERTINGTIMESTAMP,CONNECTTIMESTAMP,DISCONNECTTIMESTAMP,
HOLDTIMESECS,LEGTYPE1,LEGTYPE2,INTERNALPARTYTYPE1,INTERNALPARTYTYPE2
,SERVICETYPEID1,SERVICETYPEID2,EXTENSIONID1,EXTENSIONID2,
LOCATION1,LOCATION2,TRUNKGROUPNAME1,TRUNKGROUPNAME2,SESSIONIDTRANSFEREDFROM
,SESSIONIDTRANSFEREDTO,ISTRANSFERINITIATEDBYLEG1,
SERVICEEXTENSION1,SERVICEEXTENSION2,SERVICENAME1,
SERVICENAME2,MISSEDUSERID2,ISEMERGENCYCALL,NOTABLECALLID,
RESPONSIBLEUSEREXTENSIONID,
ORIGINALLYCALLEDPARTYNO,ACCOUNTCODE,ACCOUNTCLIENT,ORIGINATINGLEGID
,SYSTEMRESTARTNO,PATTERN,HOLDCOUNT,AUXSESSIONTYPE,
DEVICEID1,DEVICEID2,ISLEG1ORIGINATING,ISLEG2ORIGINATING,
GLOBALCALLID,CADTEMPLATEID,CADTEMPLATEID2,ts,INITIATOR,
ACCOUNTNAME,APPNAME,CALLID,CHRTYPE,CALLERNAME,serviceid1,serviceid2) VALUES $placeholders");
$stmt->execute($params);
}要像我所写的那样工作,您需要确保SELECT查询返回的列与插入的列表的顺序相同。避免在执行此操作时使用SELECT *,因此,如果源表的架构发生更改,则不会有任何意外。
发布于 2017-10-18 17:21:55
现在您可能已经知道,每次插入一行,每一行一个事务,是加载数据的最慢的方式。
我测试了不同的数据加载方式,并在2017年Percona上做了一个演讲:
TL;DR:使用负载数据信息,即使必须首先将源数据转储到CSV文件中。
这里有一个例子,尽管我还没有测试它:
$all_columns = "
`SESSIONID`, `SESSIONTYPE`, `CALLINGPARTYNO`, `FINALLYCALLEDPARTYNO`,
`DIALPLANNAME`, `TERMINATIONREASONCODE`, `ISCLEARINGLEGORIGINATING`,
`CREATIONTIMESTAMP`, `ALERTINGTIMESTAMP`, `CONNECTTIMESTAMP`,
`DISCONNECTTIMESTAMP`, `HOLDTIMESECS`, `LEGTYPE1`, `LEGTYPE2`,
`INTERNALPARTYTYPE1`, `INTERNALPARTYTYPE2`, `SERVICETYPEID1`,
`SERVICETYPEID2`, `EXTENSIONID1`, `EXTENSIONID2`, `LOCATION1`, `LOCATION2`,
`TRUNKGROUPNAME1`, `TRUNKGROUPNAME2`, `SESSIONIDTRANSFEREDFROM`,
`SESSIONIDTRANSFEREDTO`, `ISTRANSFERINITIATEDBYLEG1`, `SERVICEEXTENSION1`,
`SERVICEEXTENSION2`, `SERVICENAME1`, `SERVICENAME2`, `MISSEDUSERID2`,
`ISEMERGENCYCALL`, `NOTABLECALLID`, `RESPONSIBLEUSEREXTENSIONID`,
`ORIGINALLYCALLEDPARTYNO`, `ACCOUNTCODE`, `ACCOUNTCLIENT`, `ORIGINATINGLEGID`,
`SYSTEMRESTARTNO`, `PATTERN`, `HOLDCOUNT`, `AUXSESSIONTYPE`, `DEVICEID1`,
`DEVICEID2`, `ISLEG1ORIGINATING`, `ISLEG2ORIGINATING`, `GLOBALCALLID`,
`CADTEMPLATEID`, `CADTEMPLATEID2`, `ts`, `INITIATOR`, `ACCOUNTNAME`, `APPNAME`,
`CALLID`, `CHRTYPE`, `CALLERNAME`, `serviceid1`, `serviceid2`"
$select_sql = "
SELECT $all_columns FROM `cdrdb`.`session`
WHERE ts >= NOW() - INTERVAL 1 DAY";
$source_data = mysqli_query($conn, $select_sql);
$tmpfilename = tempnam('/tmp', 'data');
fp = fopen($tmpfilename, 'w'); // do proper error handling and use
while ($source = $source_data->fetch_assoc()) {
fputcsv($fp, array_values($source));
}
fclose($fp);
$tmpfilename = mysqli_real_escape_string($conn2, $tmpfilename);
$load_sql = "
LOAD DATA LOCAL INFILE '$tmpfilename'
IGNORE INTO TABLE `ambition`.`session`
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
( $all_columns )";
$conn2->query($load_sql);https://stackoverflow.com/questions/46814441
复制相似问题