首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KafkaConsumer对于多线程访问火花并不安全。

KafkaConsumer对于多线程访问火花并不安全。
EN

Stack Overflow用户
提问于 2018-04-25 13:39:07
回答 1查看 1.9K关注 0票数 2

我使用的是带有Kafka的星火结构流,但是当我试图将流写到控制台时,我得到了错误:

代码语言:javascript
复制
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

这是我的代码:

代码语言:javascript
复制
def group_obs(obs_df):
        obs = obs_df.select(f.col("obs.payload.after").alias("obs"))

        filtered_obs_with_value = obs \
            .union(obs.filter("obs.value_datetime is not null")
                   .withColumn("value", f.col("obs.value_datetime"))
                   .withColumn("value_type", f.lit("datetime")))


        grouped_by_obsgroup = filtered_obs_with_value\
                             .groupBy("obs.obs_group_id", "obs.encounter_id")
                             .agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))

        query = grouped_by_obsgroup \
                .writeStream \
                .outputMode("update") \
                .format("console") \
                .start()

        query.awaitTermination()

raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)
EN

回答 1

Stack Overflow用户

发布于 2018-04-28 12:02:06

您的代码没有什么特别的错误。

这是火花-23636跟踪的一个已知错误。对于由DStream跟踪的直接火花-19185也存在类似的问题。

根据JIRA的票:

唯一的解决办法是使用executor-core= 1启动我们的应用程序,启用动态资源分配。

在你的情况下这可能是可以接受的,也可能是不可接受的。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50023843

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档