我需要批量编辑HBase数据,为每一行编辑特定单元格的内容。通过HBase PUT/GET API不是一种选择,因为这会非常慢。我想设置一个Spark任务,它将HBase HFile加载到正确定义的DFs中,让我编辑特定列中的数据,然后将数据保存回HDFS,保持HFile格式。
我找到了关于如何将HFile从Spark批量写入HDFS的几个指南,但是,我不确定如何从HDFS中获取数据。哪种类型的DataFrame/RDD最适合这种任务?
谢谢
发布于 2019-02-04 09:28:32
回答我自己以防别人需要这个。
可以从HFiles快照加载HBase。按照以下步骤:(在HBase外壳中) 1.禁用‘名称空间:表’2快照‘命名空间:表'your_snapshot’
这将创建一个可访问的快照,您可以访问/HBase_path/.快照/您的快照
若要将快照加载为RDDImmutableBytesWritable,则结果
def loadFromSnapshot(sc: SparkContext): RDD[ImmutableBytesWritable, Result] = {
val restorePath =
new Path(s"hdfs://$storageDirectory/$restoreDirectory/$snapshotName")
val restorePathString = restorePath.toString
// create hbase conf starting from spark's hadoop conf
val hConf = HBaseConfiguration.create()
val hadoopConf = sc.hadoopConfiguration
HBaseConfiguration.merge(hConf, hadoopConf)
// point HBase root dir to snapshot dir
hConf.set("hbase.rootdir",
s"hdfs://$storageDirectory/$snapshotDirectory/$snapshotName/")
// point Hadoop to the bucket as default fs
hConf.set("fs.default.name", s"hdfs://$storageDirectory/")
// configure serializations
hConf.setStrings("io.serializations",
hadoopConf.get("io.serializations"),
classOf[MutationSerialization].getName,
classOf[ResultSerialization].getName,
classOf[KeyValueSerialization].getName)
// disable caches
hConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)
hConf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f)
hConf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY)
// configure TableSnapshotInputFormat
hConf.set("hbase.TableSnapshotInputFormat.snapshot.name", settingsAccessor.settings.snapshotName)
hConf.set("hbase.TableSnapshotInputFormat.restore.dir", restorePathString)
val scan = new Scan() // Fake scan which is applied by spark on HFile. Bypass RPC
val scanString = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
hConf.set(TableInputFormat.SCAN, scanString)
val job = Job.getInstance(hConf)
TableSnapshotInputFormat.setInput(job, settingsAccessor.settings.snapshotName, restorePath)
// create RDD
sc.newAPIHadoopRDD(job.getConfiguration,
classOf[TableSnapshotInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
}这将从快照目录加载HFile,并对其应用“假”完整扫描,这避免了缓慢的远程过程调用,但允许具有相同的扫描输出。
完成后,您可以重新启用您的表。
https://stackoverflow.com/questions/54348392
复制相似问题