我试图在applyInPandas中使用appears函数,当我在某个类中转换它时,它会引发一些错误,比如this:pickle.PicklingError:未能序列化对象:异常:看起来您试图从广播变量、操作或转换引用SparkContext。SparkContext只能在驱动程序上使用,而不能在它在工作人员上运行的代码中使用。有关更多信息,请参见火花-5063。
我的脚本在函数类型编码方面运行良好:
from scipy.stats import kendalltau
import numpy as np
import pandas as pd
def kendall(dat, a, b):
kentmp = []
ken = [np.nan, np.nan]
if type(a) is list:
if dat.shape[0] > 3:
for item in a:
kentmp.append(kendalltau(dat[item], dat[b])[0])
tmp = pd.Series(kentmp, index=a).dropna()
if tmp.shape[0] > 0:
cato = tmp.idxmax()
if (tmp < 0).any():
cato = tmp.abs().idxmax()
ken = [cato, tmp[cato]]
index = ['category', 'corr']
else:
if dat.shape[0] >= 10:
ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
index = ['corr', 'N']
return pd.Series(ken, index=index)
def kendall_process(pdf):
result = pdf.groupby(['step_id','unit_id']).apply(kendall,'process','label')
result = pd.DataFrame(result).reset_index()
#result.columns = ['step_id','unit_id','corr','N']
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id','unit_id'])['label'].mean().reset_index()
result = pd.merge(result,result_,on=['step_id','unit_id'],how='left')
result.columns = ['step_id','unit_id','corr','N','ratio']
return result
result = datInOut.groupBy('step_id','unit_id').applyInPandas(kendall_process, schema='step_id string,\
unit_id string,\
corr float,\
N long,\
ratio float')
result.show(5)
+--------------+--------+-----------+----+-----+
| step_id| unit_id| corr| N|ratio|
+--------------+--------+-----------+----+-----+
|10303_A2AOI300|A2AOI300| null|null| 0.0|
|17613_A2AOI500|A2AOI500|-0.13477948| 14| 0.5|
|1B304_A2MAC100|A2MAC100| null|null| 1.0|
|1A106_A2SPR100|A2SPR100| null|null| 1.0|
|19103_A2AOI800|A2AOI800| null|null| 0.5|
+--------------+--------+-----------+----+-----+
only showing top 5 rows但是,当我将其转换为类类型编码时,它会引发PicklingError:
@staticmethod
def kendall(dat,a,b):
kentmp=[]
ken=[np.nan,np.nan]
if type(a) is list:
if dat.shape[0]>3:
for item in a:
kentmp.append(kendalltau(dat[item],dat[b])[0])
tmp=pd.Series(kentmp,index=a).dropna()
if tmp.shape[0]>0:
cato=tmp.idxmax()
if (tmp<0).any():
cato=tmp.abs().idxmax()
ken=[cato,tmp[cato]]
index=['category','corr']
else:
if dat.shape[0]>=10:
ken=[kendalltau(dat[a],dat[b])[0],dat.shape[0]]
index=['corr','N']
return pd.Series(ken,index=index)
@staticmethod
def kendall_delay(pdf):
result = pdf.groupby(['step_id','equip_id']).apply(QTWorker.kendall,'delay','label')
result = pd.DataFrame(result).reset_index()
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
return result
ret = datQ.groupBy(self.step, self.equip).applyInPandas(self.kendall_delay, schema='step_id string,equip_id string,corr float,N long,ratio float')正如您所看到的,我已经对静态方法中使用的函数进行了修饰,但它仍然不起作用。我真的很想解决这个问题!
发布于 2020-11-20 03:00:11
即使我也不知道为什么,但我已经通过在kendall_delay下推kendall函数来解决这个问题。我真的想弄清楚原因!
@staticmethod
def kendall_process(pdf):
def kendall(dat, a, b):
kentmp = []
ken = [np.nan, np.nan]
if type(a) is list:
if dat.shape[0] > 3:
for item in a:
kentmp.append(kendalltau(dat[item], dat[b])[0])
tmp = pd.Series(kentmp, index=a).dropna()
if tmp.shape[0] > 0:
cato = tmp.idxmax()
if (tmp < 0).any():
cato = tmp.abs().idxmax()
ken = [cato, tmp[cato]]
index = ['category', 'corr']
else:
if dat.shape[0] >= 10:
ken = [kendalltau(dat[a], dat[b])[0], dat.shape[0]]
index = ['corr', 'N']
return pd.Series(ken, index=index)
result = pdf.groupby(['step_id','equip_id']).apply(kendall,'process','label')
result = pd.DataFrame(result).reset_index()
pdf['label'] = pdf.label.astype('int')
result_ = pdf.groupby(['step_id', 'equip_id'])['label'].mean().reset_index()
result = pd.merge(result, result_, on=['step_id', 'equip_id'], how='left')
result.columns = ['step_id', 'equip_id', 'corr', 'N', 'ratio']
return resulthttps://stackoverflow.com/questions/64904866
复制相似问题