我是spark和分布式系统的新手。这里我有50个患者的数据,每个患者的数据存储在一个tsv文件中。
每个tsv文件都有一个名为“突变”的列,我想计算每个突变的突变率。突变率被定义为突变患者的数量/患者总数(这里是50 )。如果一个患者的突变发生了多次,它只会计入一次。你知道怎么用scala/python写东西吗?
输入:同一目录下50个tsv文件
输出:频率字典,格式为:{突变:突变率}
我知道在python中这是可以工作的:但是我应该如何在spark中编写呢?
def CalculateMutationRate(data:tsv_files):
mutation_list=dict()
for tsv_table in data:
for mutation in set(tsv_table['mutation']):
if mutation in mutation_list:
mutation_list['mutation']+=1
else:
mutation_list['mutation']=1
return mutation_list数据看起来是这样的,目前第一列表示突变,我只关心第一列。总共有50张桌子。

发布于 2019-12-28 21:52:02
首先将所有文件加载到DataFrame中:
df = spark.read.csv(path, sep='\t', header='true')然后像这样计算费率:
df = df.groupBy('Hugo_Symbol').agg(count(col('*')).alias('ct'))\
.withColumn('mutation_rate', col('ct')/sum('ct').over(Window.partitionBy()))\
.drop('ct')最后将结果转换为字典:
freq_dict = df.rdd.collectAsMap()https://stackoverflow.com/questions/59495342
复制相似问题