作为glue工作的一部分,我正在尝试更改多个列中的多个值。不管怎么说,我写的函数完全超时了。我在函数中添加了注释,以指出问题所在。
是什么导致胶合作业不将作业分配给执行者?
def map_values_in_columns(self, df):
for k, v in self.value_mapping.items():
column_name = self.value_mapping[k]['column_name']
values = self.value_mapping[k]['values']
df = df.withColumn(column_name,F.col(column_name).cast("string"))
for old_value, new_value in values.items():
old_value = str(old_value)
new_value = str(new_value)
df = df.withColumn(column_name, F.when((F.col(column_name)==old_value), new_value).otherwise(F.col(column_name)))
# this line results in a progress bar
df.select(column_name).distinct().show(truncate = False)
# this line here is where my job will hang forever, showing zero sign of life in the logs
df.select(column_name).distinct().show(truncate = False)
logger.info("Succesfully mapped values in columns")
return df发布于 2021-10-26 10:30:36
最后通过均匀化列数据类型和数据类型的映射值,并使用df.replace方法求解。代码如下:
def map_values_in_columns(self, df):
for k, v in self.value_mapping.items():
column_name = self.value_mapping[k]['column_name']
values = self.value_mapping[k]['values']
df = df.withColumn(column_name, F.col(column_name).cast("string"))
new_value_mapping = {}
for old_value, new_value in values.items():
str_old_value = str(old_value)
str_new_value = str(new_value)
new_value_mapping[str_old_value] = str_new_value
df = df.replace(new_value_mapping, subset=column_name)
return dfhttps://stackoverflow.com/questions/69713165
复制相似问题