首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何读取csv并使用dask处理行?

如何读取csv并使用dask处理行?
EN

Stack Overflow用户
提问于 2019-01-11 14:25:24
回答 1查看 1.9K关注 0票数 1

我想读取一个28 to的csv文件,并打印内容。但是,我的代码:

代码语言:javascript
复制
import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.row)] = self.row

    def index_row(self, row):
        self.row = row
        DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes')     # convert to pandas
df = df.to_dict(orient='records')
for row in df:
    ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

不起作用。当我运行命令htop时,我可以看到dask正在运行,但是没有任何输出。也没有创建任何index.kch文件。我在没有使用dask的情况下发出同样的声音,它运行得很好;我使用的是Pandas (chunksize),但是它太慢了,因此我想使用dask。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-11 15:04:14

代码语言:javascript
复制
df = df.compute(scheduler='processes')     # convert to pandas

别这么做!

您将在单独的进程中加载各个部分,然后将所有要拼接到主进程中的单个数据帧中的数据进行传输。这只会增加处理的开销,并在内存中创建数据的副本。

如果您只想(出于某种原因)将每行打印到控制台,那么使用(pd.read_csv(chunksize=..))就非常好了。您可以使用Dask的分块来运行它,并且可能会得到一个加速,就是在读取数据的工作人员中进行打印:

代码语言:javascript
复制
df = dd.read_csv(..)

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    for row in df:
        print(row)

dask.compute(*[print_a_block(d) for d in df.to_delayed()])

请注意,for row in df实际上为您获取了列,可能您需要迭代,或者您实际上希望以某种方式处理数据。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54148429

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档