我有以下数据:
df = sc.parallelize([(1, 2, 3, '2','1','1'), (4, 5, 6, '3','2','1')]).toDF(['ID1', 'ID2', 'ID3','Impressions','Clicks','ImpressionsMinusClicks'])
df.show()

我想把它转换成这样(但是不知道如何以及是否应用split()和explode()来实现这一点):

这里的关键是基本上复制每个实例以匹配#的注释(例如,10个印象实例变为10行),然后这些行将它们标记为# Click作为一个正示例,其余的行被标记为# Impressions -单击时间作为一个负示例。总结一下:一个实例有10种印象,3次点击。我想把它转换成10行,3个阳性样本("1“表示单击)和7个阴性样本("0”表示“印象深刻/没有单击”)。其目的是将其用于分类模型的输入,如朴素Bayes或Logistic回归。这是起源于Kaggle杯2012数据集。
发布于 2017-04-29 17:52:38
基于@TzachZohar提供的答案和帮助,我找到了一个实用的、尽管“丑陋”的解决方案,它适用于PySpark。如果你有一种更优雅的方式,请让我知道或发一个回复或评论!
# The first UDF we create is to map the number of impressions into a sequence
impsIndexed = udf(lambda Impressions: range(0,Impressions), ArrayType(IntegerType()))
# The second UDF is to map each impression into a 1 or 0 depending if that event had a matching click. So far this is equivalent to Tzach's solution, with the exception that I was not able to package both operations into a single UDF.
IndicesToClicks = udf(lambda Impressed, Clicks: 1 if Impressed < Clicks else 0, IntegerType())
# Next, I take the input data frame and apply the first UDF to it, creating a new data frame.
df_new = df.withColumn('Impressed', explode(impsInd('Impressions')))
df_new.show()
# Lastly, I take this new data frame and apply the second UDF to it, achieving the final data frame.
df_fin = df_new.withColumn('Clicked', toClicks('Impressed','Clicks')).select('ID1','ID2','ID3','Clicked')
df_fin.show()发布于 2017-04-28 17:33:49
您确实可以对UDF的结果使用explode,该结果将生成一系列“事件”-1用于单击事件,0用于非单击印象事件:
// We create a UDF which expects two columns (imps and clicks) as input,
// and returns an array of "is clicked" (0 or 1) integers
val toClickedEvents = udf[Array[Int], Int, Int] {
case (imps, clicks) => {
// First, we map the number of imps (e.g. 3) into a sequence
// of "imps" indices starting from zero; Each one would later
// represent a single impression "event"
val impsIndices = (0 until imps)
// we map each impression "event", represented by its index,
// into a 1 or a 0: depending if that event had a matching click;
// we do that by assigning "1" to indices lower than the number of clicks
// and "0" for the rest
val clickIndicatorPerImp = impsIndices.map(index => if (clicks > index) 1 else 0)
// finally we just convert into an array, to comply with the UDF signature
clickIndicatorPerImp.toArray
}
}
// explode the result of the UDF and calculate ImpressedNotClicked
df.withColumn("Clicked", explode(toClickedEvents($"Impressions", $"Clicks")))
.select($"ID1", $"ID2", $"ID3", $"Clicked", abs($"Clicked" - lit(1)) as "ImpressedNotClicked")注意事项:最初的文章是用scala标记的;如果您可以将其转换为python,请随时编辑
https://stackoverflow.com/questions/43685580
复制相似问题