首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >持久化火花流输出

持久化火花流输出
EN

Stack Overflow用户
提问于 2015-10-01 10:56:35
回答 2查看 8.6K关注 0票数 8

我正在从一个消息应用程序中收集数据,我现在使用的是Flume,它每天发送大约5000万条记录

我希望使用Kafka,使用星火流从Kafka消费,并将其持久化到hadoop并使用黑斑羚进行查询。

我对我尝试过的每一种方法都有异议。

方法1-将RDD保存为拼花,将外部单元格拼板表指向parquet目录。

代码语言:javascript
复制
// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

问题是finalParquet.saveAsParquetFile输出了大量的文件,从Kafka接收到的Dstream输出了200多个文件,批量大小为1分钟。它输出许多文件的原因是因为计算是分布式的,正如另一个how to make saveAsTextFile NOT split output into multiple file?后解释的那样。

但是,对于我来说,所提供的解决方案似乎并不理想,例如,作为一个用户状态--只有在数据很少的情况下,拥有单个输出文件才是个好主意。

方法2-使用HiveContext。将RDD数据直接插入到单元表中。

代码语言:javascript
复制
# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

这很好,它直接插入到一个拼花表,但是当处理时间超过批处理间隔时间时,批处理会出现调度延迟。消费者跟不上正在生产的产品,需要加工的批次开始排队。

似乎写到蜂箱的速度太慢了。我尝试过调整批处理间隔大小,运行更多的使用者实例。

总而言之

考虑到存在多个文件的问题和写入蜂窝的潜在延迟,从Spark流中持久化大数据的最佳方法是什么?其他人在做什么?

这里也提出了一个类似的问题,但是他对目录有一个问题,因为目录被附加到太多的How to make Spark Streaming write its output so that Impala can read it?文件中。

非常感谢你的帮助

EN

回答 2

Stack Overflow用户

发布于 2016-03-11 21:43:59

在解决方案2中,可以通过每个RDD的分区数来控制创建的文件数量。

参见此示例:

代码语言:javascript
复制
// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

现在,我想您可以使用从Kafka提取数据的频率,以及每个RDD的分区数(默认值,您的Kafka主题的分区,您可以通过重新分区来减少这些分区)。

我使用CDH5.5.1中的Spark1.5,使用df.write.mode("append").saveAsTable("test")或string获得相同的结果。

票数 0
EN

Stack Overflow用户

发布于 2020-04-16 18:38:33

我认为这个小文件问题可以有所解决。您可能会得到大量基于kafka分区的文件。对我来说,我有12个分区Kafka主题,我用spark.write.mode("append").parquet("/location/on/hdfs")编写。

现在,根据您的需求,您可以添加coalesce(1)或更多来减少文件数量。另外一种选择是增加微批次的持续时间。例如,如果你可以接受5分钟的延迟写一天,你可以有300秒的小批。

对于第二个问题,批排队只是因为您没有启用背压。首先,您应该验证在单个批处理中可以处理的最大值是什么。一旦您能够绕过这个数字,您就可以设置spark.streaming.kafka.maxRatePerPartition值和spark.streaming.backpressure.enabled=true,以启用每个微批处理的有限记录数。如果您仍然不能满足需求,那么唯一的选择是增加主题上的分区,或者增加星火应用程序上的资源。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32885825

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档