我是新来的达斯克,并正在寻找一种方法,以扁平一个字典栏在熊猫数据。下面是1600万行dataframe的第一行的屏幕截图:

下面是三行的文本示例:
{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}我通常会用以下代码平铺Form990PartVIISectionAGrp列:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)我希望在Dask中这样做,但得到了以下错误:"ValueError:计算数据中的列与提供的元数据中的列不匹配。“
我正在使用Python2.7。我进口相关的包裹
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()为了测试代码,我创建了数据的随机示例:
dfs = df.sample(1000)然后生成dataframe:
ddf = dd.from_pandas(dfs, npartitions=nCores)该列当前为字符串格式,因此我将其转换为字典。通常,我只编写一行代码:
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval) 但是,我尝试在这里用一个更类似于“达斯克”的形式来实现它,所以我编写了以下函数,然后应用它:
def make_dict(dfs):
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
return dfs ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()这是可行的--它返回了Form990PartVIISectionAGrp列以字典格式显示的熊猫数据(不过,它并不比非Dask应用更快)。

然后我重新创建了DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)并编写一个函数来平平列:
def flatten(ddf_out):
ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
#ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
return ddf_out如果我运行以下代码:
result = ddf.map_partitions(flatten)我得到了以下输出,其中该列尚未扁平化:

我还收到了有关缺少元数据的错误,并且考虑到上面的内容无助于解析字典列,所以我创建了一个列的列表,这些列是由普通Python扁平化列生成的,并使用该列表创建了列和数据类型的字典:
metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}然后,我使用这个元数据应用平面函数:
result = ddf.map_partitions(flatten, meta=metadir)因此,我得到了以下输出:

运行result.columns将产生以下结果:

运行compute()失败的地方,我得到以下错误消息:"ValueError:计算数据中的列与提供的元数据中的列不匹配。“无论我写什么,我都会得到相同的错误:
result.compute()或
result.compute(meta=metadir)我不知道我在这里做错了什么。结果中的列似乎与元数据中的列相匹配。如有任何建议,将不胜感激。
更新:这是我在更新平面函数方面的尝试。
meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER'], dtype="O")
def flatten(ddf_out):
ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
for m in meta:
if m not in ddf_out:
df[m] = ''
return ddf_out然后我跑:
result = ddf.map_partitions(flatten, meta=meta).compute()发布于 2019-05-17 13:20:06
下面是几个开始的注意事项:
.apply(literal_eval)
作为map,这不是更好吗?
然后我重新创建了DF: ddf = dd.from_pandas(ddf_out,npartitions=nCores)
ddf_out已经是一个dask数据仓库了,我不知道你为什么要这么做。
结果中的列似乎与元数据中的列相匹配。
result.columns的值是从您提供的元数据中提取的,除非您请求它,否则不会进行任何计算(在大多数操作中,dask是懒惰的)。ValueError异常没有提供更多信息吗?
下面是一个完整的例子
x = ({'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'CHAIR PERSON',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
{'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'VICE CHAIR',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()我怎么知道要使用什么meta?我把这个函数应用到了熊猫的dataframe上--你可以用一小块数据文件来完成这个任务。
一些补充说明:
dd.read_csv之类的东西加载,也可以用dask函数进行聚合或编写。只有将是小的或不返回任何东西的compute() (因为它涉及到写入输出)。官方示例不使用from_pandas。发布于 2019-05-28 17:15:51
假设有一个中小型数据集,普通熊猫解决方案将有效:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)然而,熊猫解决方案有1600万行,既不会运行在16 of内存的Macbook上,也不会在96 of的Windows机器上运行。因为这个原因,我向达斯克求助。然而,正如在上面的答案和注释中所看到的,Dask解决方案不能工作,因为我的数据集中的每个观察都不一定都有所有的字典键。Form990PartVIISectionAGrp的1,600万个观测值总共有15个键在下面的列表中:
newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER']因此,我的解决方案包括接受上面@mdurant提供的一些提示,并首先向每一行添加缺少的键:
for index, row in df[:].iterrows():
for k in newkeys:
row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)在我的Macbook上花了100分钟。根据mdurant的评论,我将数据存储为JSON格式:
df.to_json('df.json', orient='records', lines=True)并将该文件以文本形式读入Dask:
import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)然后创建一个函数来平平列:
def flatten(record):
return {
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],
'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
'URL': record['URL'],
}然后,我可以应用该函数:
df = b.map(flatten).to_dataframe()并将数据导出到CSV:
df.to_csv('compensation*.csv')这就像一种魅力!简而言之,根据mdurant上面的有用评论,关键是:( 1)在所有观察中添加缺少的键;( 2)不要将数据读入熊猫的Dask (使用文本或CSV )。解决这两个问题后,这个问题得到了很好的解决。
https://stackoverflow.com/questions/56118647
复制相似问题