我刚开始使用hudi我有个问题。我正在使用一个在AWS中使用pyspark,Kafka的EMR,我想要做的是阅读一个主题,然后将它移到hudi格式的S3中。老实说,从几个星期前开始,我已经试过很多次了,我不知道这是不是不可能。有人能告诉我帮帮我吗?我正在处理的代码是:
#Reading
df_T = spark.readStream \
.format("kafka") \
.options(**options_read) \
.option("subscribe", topic) \
.load() ……
hudi_options = {
'hoodie.table.name': MyTable,
'hoodie.datasource.write.table.name': MyTable,
'hoodie.datasource.write.recordkey.field': MyKeyInTable,
'hoodie.datasource.write.partitionpath.field': MyPartitionKey,
'hoodie.datasource.write.hive_style_partitioning': "true",
'hoodie.datasource.write.row.writer.enable': "false",
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.precombine.field': MyTimeStamp,
'hoodie.insert.shuffle.parallelism': 1,
'hoodie.consistency.check.enabled': "true",
'hoodie.cleaner.policy': "KEEP_LATEST_COMMITS",
'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
'hoodie.compact.inline': "false",
'hoodie.datasource.hive_sync.table': MyTable,
'hoodie.datasource.hive_sync.partition_fields': MyPartitionKey,
'hoodie.datasource.hive_sync.database' : Mydatabase,
'hoodie.datasource.hive_sync.auto_create_database': "true",
'hoodie.datasource.write.keygenerator.class': "org.apache.hudi.keygen.ComplexKeyGenerator",
'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.skip_ro_suffix': 'true'
}……
ds = df_T \
.writeStream \
.outputMode('append') \
.format("org.apache.hudi") \
.options(**hudi_options)\
.option('checkpointLocation', MyCheckpointLocation) \
.start(MyPathLocation) \
.awaitTermination(300)……
EMR中的这段代码显示工作正常,但是当我要查找hudi文件时,它不会创建任何文件。我知道kafka配置可以工作,因为当我在输出模式下设置“控制台”时,它可以正常工作,有人能帮我吗?
发布于 2022-09-28 17:14:07
你好,伙计们,我可以修复这个错误,首先,您必须清除dataframe,不是所有东西,但是至少表中的主键中的所有字段都是空的。作为第二点,在hoodie.datasource.write.precombine.field中,您可以设置
..。
import datetime
currentDate = datetime.datetime.now()
#As for example:
hudi_options = {
...
'hoodie.datasource.write.precombine.field': currentDate,
...
}最后,如果您的dataframe中没有时间戳,您可以设置如下:
.withColumn('Loaded_Date', F.lit(currentDate).cast('timestamp')) https://stackoverflow.com/questions/73876704
复制相似问题