首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从卡夫卡到哈迪的电火花流

从卡夫卡到哈迪的电火花流
EN

Stack Overflow用户
提问于 2022-09-28 05:49:45
回答 1查看 81关注 0票数 2

我刚开始使用hudi我有个问题。我正在使用一个在AWS中使用pyspark,Kafka的EMR,我想要做的是阅读一个主题,然后将它移到hudi格式的S3中。老实说,从几个星期前开始,我已经试过很多次了,我不知道这是不是不可能。有人能告诉我帮帮我吗?我正在处理的代码是:

代码语言:javascript
复制
    #Reading
    df_T = spark.readStream \
        .format("kafka") \
        .options(**options_read) \
        .option("subscribe", topic) \
        .load() 

……

代码语言:javascript
复制
    hudi_options = {
        'hoodie.table.name': MyTable,
        'hoodie.datasource.write.table.name': MyTable,
        'hoodie.datasource.write.recordkey.field': MyKeyInTable,
        'hoodie.datasource.write.partitionpath.field': MyPartitionKey,
        'hoodie.datasource.write.hive_style_partitioning': "true",
        'hoodie.datasource.write.row.writer.enable': "false",
        'hoodie.datasource.write.operation': 'bulk_insert',
        'hoodie.datasource.write.precombine.field': MyTimeStamp,
        'hoodie.insert.shuffle.parallelism': 1,
        'hoodie.consistency.check.enabled': "true",
        'hoodie.cleaner.policy': "KEEP_LATEST_COMMITS",
        'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
        'hoodie.compact.inline': "false",
        'hoodie.datasource.hive_sync.table': MyTable,
        'hoodie.datasource.hive_sync.partition_fields': MyPartitionKey,
        'hoodie.datasource.hive_sync.database' : Mydatabase,
        'hoodie.datasource.hive_sync.auto_create_database': "true",
        'hoodie.datasource.write.keygenerator.class': "org.apache.hudi.keygen.ComplexKeyGenerator",
        'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        'hoodie.datasource.hive_sync.enable': 'true',
        'hoodie.datasource.hive_sync.skip_ro_suffix': 'true'
    }

……

代码语言:javascript
复制
    ds = df_T \
        .writeStream \
        .outputMode('append') \
        .format("org.apache.hudi") \
        .options(**hudi_options)\
        .option('checkpointLocation', MyCheckpointLocation) \
        .start(MyPathLocation) \
        .awaitTermination(300)

……

EMR中的这段代码显示工作正常,但是当我要查找hudi文件时,它不会创建任何文件。我知道kafka配置可以工作,因为当我在输出模式下设置“控制台”时,它可以正常工作,有人能帮我吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-09-28 17:14:07

你好,伙计们,我可以修复这个错误,首先,您必须清除dataframe,不是所有东西,但是至少表中的主键中的所有字段都是空的。作为第二点,在hoodie.datasource.write.precombine.field中,您可以设置

..。

代码语言:javascript
复制
import datetime

currentDate = datetime.datetime.now() 

#As for example:

    hudi_options = {
...
        'hoodie.datasource.write.precombine.field': currentDate,
...
    }

最后,如果您的dataframe中没有时间戳,您可以设置如下:

代码语言:javascript
复制
.withColumn('Loaded_Date', F.lit(currentDate).cast('timestamp')) 
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73876704

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档