我们已经设置了一个spark作业来插入到Hive中(使用数据帧)。设置hive表用于创建动态分区。只要我们运行一个spark作业将数据插入到Hive中,一切都会完美地工作。
我们遇到的问题是,我们预计要运行并发的spark作业来将数据加载到Hive中。这似乎不起作用。我读到过动态分区不提供排它锁,而是提供共享锁。在我们的例子中,我们可以看到,如果我们同时运行4-5个spark作业,数据会损坏,一些记录会丢失。非常容易重现,几乎每次都会发生。
有没有人能解决这个问题?即使用具有并发作业的动态分区插入到hive表中,并且仍然确保不发生数据损坏。非常感谢您提供的任何意见!
spark代码的代码片段:
// Set hive conf to allow dynamic partitions to be created
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
//Create temp table to load data into Hive
parsedDataDF.registerTempTable("parsedDatatempTable")
//Insert data into Hive, dynamic partitioning
sqlContext.sql("insert into table " + hiveDBToLoad + "." + hiveTableToLoad + " partition (partition_1, partition_2, partition_3) " +
"select * from parsedDatatempTable")发布于 2020-07-14 15:47:58
$partition_tbl=array();
$partition_tbl[]=array(
'tbl'=>$wpdb->prefix.'postmeta',
'field'=>'meta_id',
'limit'=>2,
'part_key'=>'p',
);
$partition_tbl[]=array(
'tbl'=>$wpdb->prefix.'posts',
'field'=>'ID',
'limit'=>2,
'part_key'=>'pos',
);
if(!empty($partition_tbl)){
foreach($partition_tbl as $ppptt){
$field=$ppptt['field'];
$tbl=$ppptt['tbl'];
$limit=$ppptt['limit'];
$part_key=$ppptt['part_key'];
$get_max = $wpdb->get_results("SELECT * FROM ".$wpdb->prefix."postmeta ORDER BY $field DESC LIMIT 0, 1");
if(!empty($get_max)){
$max=$get_max[0]->$field;
if($max > $limit){
$partcc=ceil($max/$limit);
$has_partitions = $wpdb->get_results("EXPLAIN partitions SELECT * FROM ".$tbl);
$haspartarr=array('none');
if(!empty($has_partitions)){
if(!empty($has_partitions[0]->partitions)){
$haspartarr=explode(",",$has_partitions[0]->partitions);
if(count($haspartarr) > 1){
unset($haspartarr[count($haspartarr)-1]);
}
}
}
$part_sql='';
$part_arr=array();
$first=true;
for($i=0;$i<$partcc;$i++){
$cpart=$part_key.$i;
$reclim=$limit*($i+1);
if(!in_array($cpart,$haspartarr)){
if(empty($has_partitions[0]->partitions)){
$part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";
}else{
$nwlmn=($reclim-$limit);
if($first){
$part_sql.="ALTER TABLE $tbl REORGANIZE PARTITION $cpart INTO (";
$first=false;
$morel=$nwlmn-$limit;
}
$part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";
}
}
}
if(empty($has_partitions[0]->partitions) && !empty($part_arr)){
$cpart=$part_key.$i;
$part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";
$part_sql.="ALTER TABLE $tbl PARTITION BY RANGE($field)(";
$part_sql.=implode(",",$part_arr);
$part_sql.=")";
$wpdb->get_results($part_sql);
}else{
if(!empty($part_arr)){
$cpart=$part_key.$i;
$part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";
$part_sql.=implode(",",$part_arr);
$part_sql.=");";
$wpdb->get_results($part_sql);
}
}
}
}
}
}https://stackoverflow.com/questions/41349118
复制相似问题