下面的代码片段是用Pandas编写的。
grouped = df.groupBy('episode_id')
get_entropy = lambda x: entropy(x.value_counts(), base=2)
ops = dict()
ops.update({f:('entropy',get_entropy) for f in categorical_features})
df = grouped.agg(ops)我如何将其转换为与Pyspark兼容的等效代码?
我无法使用PySpark提供的agg函数找到解决方案。
提前谢谢你。
发布于 2022-12-01 21:38:08
既然没有人知道答案,我就公布我的个人解决方案。
def compute_entropy(x):
counter = collections.Counter(x)
probs = [counter[item]/len(x) for item in sorted(counter.keys())]
ent = 0.
for i in probs:
ent -= i * math.log(i, 2)
return ent #entropy(probs,base=2)/1.0
compute_entropy_udf = F.udf(compute_entropy, T.DoubleType())
entropy_cols = [compute_entropy_udf(F.collect_list(x)).alias('entropy_' + x) for x in categorical_features]
df1 = df.groupBy('episode_id').agg(*entropy_cols)https://stackoverflow.com/questions/74632516
复制相似问题