我从旧的气流迁移到新的气流环境。在迁移到新的气流env时,我遇到了酸洗误差。
我理解由于RDD操作关闭,需要对对象进行筛选。
我还了解到,由于redis集群库中的一些问题,我声明的对象是不能被腌制的。
rc = RedisCluster(
startup_nodes=config["redis"]["mydashboard"]["nodes"],
password=redis_pwd,
decode_responses=True,
)
def write_to_redis(row) -> None:
rc.set(
name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
)
result_df.rdd.foreach(write_to_redis)但我不明白为什么这段代码在旧的气流环境下执行得很好。
两种环境的不同之处
这两种环境都使用主选项作为纱线。
如果你需要更多的信息,问我。谢谢
发布于 2022-05-25 14:30:50
以下是我认为会发生采摘错误的一个简单解释:
当您执行foreach方法时,可调用函数(在您的例子中是write_to_redis)将在Spark中的executor节点上执行。这意味着您的类(rc)在驱动节点上被初始化,它被用于执行器节点(通常是其他服务器/实例等)。
Spark试图做的是对初始化的类进行筛选,然后将其“复制”到要由函数使用的所有executor节点中。不幸的是,某些初始化类(例如boto3连接)不能被腌制。
要解决这个问题,您可以尝试在函数中调用的方法中初始化类,如下所示:
def write_to_redis(row) -> None:
rc = RedisCluster(
startup_nodes=config["redis"]["mydashboard"]["nodes"],
password=redis_pwd,
decode_responses=True,
)
rc.set(
name=row.mid, value=row.messages, ex=config["redis"]["ttl"]
)
result_df.rdd.foreach(write_to_redis)如果要使用此方法,请注意您将打开到Redis集群的连接数量,因为调用此函数的每个executor节点都将打开一个连接。
https://stackoverflow.com/questions/72339000
复制相似问题