首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流迁移时火花的酸洗误差

气流迁移时火花的酸洗误差
EN

Stack Overflow用户
提问于 2022-05-22 15:37:48
回答 1查看 71关注 0票数 0

我从旧的气流迁移到新的气流环境。在迁移到新的气流env时,我遇到了酸洗误差

我理解由于RDD操作关闭,需要对对象进行筛选。

我还了解到,由于redis集群库中的一些问题,我声明的对象是不能被腌制的。

代码语言:javascript
复制
rc = RedisCluster(
    startup_nodes=config["redis"]["mydashboard"]["nodes"],
    password=redis_pwd,
    decode_responses=True,
)


def write_to_redis(row) -> None:
    rc.set(
        name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
    )

result_df.rdd.foreach(write_to_redis)

但我不明白为什么这段代码在旧的气流环境下执行得很好。

两种环境的不同之处

  1. 在执行python代码时,旧环境使用python命令,而不是pyspark或submit。而新环境则使用pyspark命令。
  2. 旧的气流环境使用快速执行器,而新的env使用k8s执行器。

这两种环境都使用主选项作为纱线。

如果你需要更多的信息,问我。谢谢

EN

回答 1

Stack Overflow用户

发布于 2022-05-25 14:30:50

以下是我认为会发生采摘错误的一个简单解释:

当您执行foreach方法时,可调用函数(在您的例子中是write_to_redis)将在Spark中的executor节点上执行。这意味着您的类(rc)在驱动节点上被初始化,它被用于执行器节点(通常是其他服务器/实例等)。

Spark试图做的是对初始化的类进行筛选,然后将其“复制”到要由函数使用的所有executor节点中。不幸的是,某些初始化类(例如boto3连接)不能被腌制。

要解决这个问题,您可以尝试在函数中调用的方法中初始化类,如下所示:

代码语言:javascript
复制
def write_to_redis(row) -> None:
    rc = RedisCluster(
        startup_nodes=config["redis"]["mydashboard"]["nodes"],
        password=redis_pwd,
        decode_responses=True,
    )
    rc.set(
        name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
    )

result_df.rdd.foreach(write_to_redis)

如果要使用此方法,请注意您将打开到Redis集群的连接数量,因为调用此函数的每个executor节点都将打开一个连接。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72339000

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档