首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >配置单元动态分区-来自Spark的并发写入会损坏数据

配置单元动态分区-来自Spark的并发写入会损坏数据
EN

Stack Overflow用户
提问于 2016-12-28 00:58:19
回答 1查看 1.3K关注 0票数 3

我们已经设置了一个spark作业来插入到Hive中(使用数据帧)。设置hive表用于创建动态分区。只要我们运行一个spark作业将数据插入到Hive中,一切都会完美地工作。

我们遇到的问题是,我们预计要运行并发的spark作业来将数据加载到Hive中。这似乎不起作用。我读到过动态分区不提供排它锁,而是提供共享锁。在我们的例子中,我们可以看到,如果我们同时运行4-5个spark作业,数据会损坏,一些记录会丢失。非常容易重现,几乎每次都会发生。

有没有人能解决这个问题?即使用具有并发作业的动态分区插入到hive表中,并且仍然确保不发生数据损坏。非常感谢您提供的任何意见!

spark代码的代码片段:

代码语言:javascript
复制
// Set hive conf to allow dynamic partitions to be created
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

//Create temp table to load data into Hive
parsedDataDF.registerTempTable("parsedDatatempTable")

//Insert data into Hive, dynamic partitioning   
sqlContext.sql("insert into table " + hiveDBToLoad + "." + hiveTableToLoad + " partition (partition_1, partition_2, partition_3) " +
    "select * from parsedDatatempTable")
EN

回答 1

Stack Overflow用户

发布于 2020-07-14 15:47:58

代码语言:javascript
复制
    $partition_tbl=array();
$partition_tbl[]=array(
    'tbl'=>$wpdb->prefix.'postmeta',
    'field'=>'meta_id',
    'limit'=>2,
    'part_key'=>'p',
);
$partition_tbl[]=array(
    'tbl'=>$wpdb->prefix.'posts',
    'field'=>'ID',
    'limit'=>2,
    'part_key'=>'pos',
);
        
if(!empty($partition_tbl)){
    foreach($partition_tbl as $ppptt){
        $field=$ppptt['field'];
        $tbl=$ppptt['tbl'];
        $limit=$ppptt['limit'];
        $part_key=$ppptt['part_key'];
        $get_max = $wpdb->get_results("SELECT * FROM ".$wpdb->prefix."postmeta ORDER BY $field DESC LIMIT 0, 1");
        if(!empty($get_max)){
            $max=$get_max[0]->$field;
            if($max > $limit){                              
                $partcc=ceil($max/$limit);          
                $has_partitions = $wpdb->get_results("EXPLAIN partitions SELECT * FROM ".$tbl);     
                $haspartarr=array('none');
                if(!empty($has_partitions)){
                    if(!empty($has_partitions[0]->partitions)){
                        $haspartarr=explode(",",$has_partitions[0]->partitions);
                        if(count($haspartarr) > 1){
                             unset($haspartarr[count($haspartarr)-1]);
                        }
                    }
                }
                $part_sql='';
                $part_arr=array();
                $first=true;
                for($i=0;$i<$partcc;$i++){
                    $cpart=$part_key.$i;
                    $reclim=$limit*($i+1);
                    if(!in_array($cpart,$haspartarr)){
                        if(empty($has_partitions[0]->partitions)){
                            $part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";
                        }else{
                            $nwlmn=($reclim-$limit);                        
                            if($first){
                                $part_sql.="ALTER TABLE $tbl REORGANIZE PARTITION $cpart INTO (";
                                $first=false;
                                $morel=$nwlmn-$limit;                                       
                            }
                            $part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";                                          
                        }
                    }
                }
                if(empty($has_partitions[0]->partitions) && !empty($part_arr)){
                    $cpart=$part_key.$i;                
                    $part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";               
                    $part_sql.="ALTER TABLE $tbl PARTITION BY RANGE($field)(";
                    $part_sql.=implode(",",$part_arr);
                    $part_sql.=")";
                    $wpdb->get_results($part_sql);
                }else{
                    if(!empty($part_arr)){
                        $cpart=$part_key.$i;
                        $part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";
                        $part_sql.=implode(",",$part_arr);      
                        $part_sql.=");";
                        $wpdb->get_results($part_sql);
                    }           
                }
            }
        }

    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41349118

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档