我从Castra创建了以下dask dataframe:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])屈服:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e我想做的是:
user_id和ts组示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e我尝试了以下几点:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()并得到以下错误:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex我尝试在管道中传递set_index('ts'),但它似乎不是Series的属性。
对于如何实现这一点,有什么想法吗?
TL;博士
如果这使问题变得更容易,我也可以更改我创建的Castra的格式。我目前的实现主要是从这的优秀文章中摘取的。
我将索引(在to_df()函数中)设置为:
df.set_index('ts',drop=False,inplace=True)并拥有:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')以下是生成的d类型:
ts datetime64[ns]
text object
user_id float64发布于 2015-11-26 22:07:22
如果我们可以假设每个user-id组都可以容纳记忆,那么我建议使用dask.dataframe来完成外部组,然后使用熊猫在每个组中执行操作,如下所示。
def per_group(blk):
return blk.groupby('ts').text.resample('3H', how='sum')
df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()这将两个困难的东西解耦到两个不同的项目中。
理想情况下,dask.dataframe会自动为您编写每组函数。目前,dask.dataframe没有智能地处理多个索引,也没有在多列组的基础上进行重采样,因此目前还没有自动解决方案可用。尽管如此,仍然有可能回到熊猫的每块计算,同时仍然使用dask.dataframe来准备相应的小组。
发布于 2015-11-26 18:55:47
尝试将索引转换为如下所示的DatetimeIndex:
import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...https://stackoverflow.com/questions/33945086
复制相似问题