首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在多列上将DataFrame在PySpark中枢轴?

如何在多列上将DataFrame在PySpark中枢轴?
EN

Stack Overflow用户
提问于 2018-03-27 09:21:30
回答 2查看 4.5K关注 0票数 1

我有以下格式的数据集:

代码语言:javascript
复制
import numpy as np
import pandas as pd

# Create the data set

np.random.seed(42)

records = list()
for i in range(2):
    for j in range(2):        
        for k in range(500):
            t = np.random.randint(pd.Timestamp('2000-01-01').value, pd.Timestamp('2018-01-01').value)
            if np.random.rand() > .95: continue                
            ts = pd.Timestamp(t).strftime('%Y-%m-%d %H:%M:%S.%f')
            records.append( (i, j, np.random.rand(), ts) )

df = pd.DataFrame.from_records(records)
df.columns =['a_id', 'b_id', 'value', 'time']

看起来是这样的:

代码语言:javascript
复制
      a_id  b_id     value                        time
0        0     0  0.156019  2007-09-28 15:12:24.260596
1        0     0  0.601115  2015-09-08 01:59:18.043399
2        0     0  0.969910  2012-01-10 07:51:29.662492
3        0     0  0.181825  2011-08-28 19:58:33.281289
4        0     0  0.524756  2015-11-15 14:18:17.398715
5        0     0  0.611853  2015-01-07 23:44:37.034322
6        0     0  0.366362  2008-06-21 11:56:10.529679
7        0     0  0.199674  2010-11-08 18:24:18.794838
8        0     0  0.046450  2008-04-27 02:36:46.026876

在这里,a_idb_id是传感器的关键。这意味着必须将数据框架转换为:

代码语言:javascript
复制
df_ = pd.pivot_table(df, index='time', columns=['a_id', 'b_id'], values='value')
df_.index = [pd.to_datetime(v) for v in df_.index]
df_ = df_.resample('1W').mean().ffill().bfill()

在重新采样和填补空白之后,数据采用了所需的格式:

代码语言:javascript
复制
a_id               0                   1          
b_id               0         1         0         1
2000-01-09  0.565028  0.560434  0.920740  0.458825
2000-01-16  0.565028  0.146963  0.920740  0.217588
2000-01-23  0.565028  0.840872  0.920740  0.209690
2000-01-30  0.565028  0.046852  0.920740  0.209690
2000-02-06  0.565028  0.046852  0.704871  0.209690

每一列现在都包含传感器的数据。

问题是,我不知道如何在PySpark中做到这一点。

代码语言:javascript
复制
df_test = spark.createDataFrame(df) \
    .withColumn('time', F.to_utc_timestamp('time', '%Y-%m-%d %H:%M:%S.%f'))
df_test.printSchema()

拥有

代码语言:javascript
复制
root
 |-- a_id: long (nullable = true)
 |-- b_id: long (nullable = true)
 |-- value: double (nullable = true)
 |-- time: timestamp (nullable = true)

如何转换df_test,使其具有与df_相同的形式

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-03-27 11:59:40

正如在评论中提到的那样,这里有一种解决方案可以将您的数据枢轴化:

您应该将您的列a_idb_id连接到一个新的列c_id下,然后按date分组,然后在c_id上支点,并使用如何查看fit的值。

至于重采样,我要指出@zero323 这里提供的解决方案。

票数 1
EN

Stack Overflow用户

发布于 2018-06-22 19:39:04

您可以使用pyspark.ml.feature.Bucketizer“重采样”数据。

代码语言:javascript
复制
# Truncate timestamp to day precision, and convert to unixtime
df = df.withColumn("tt",
                   F.unix_timestamp(F.date_trunc("day", "time")))
df.show(5)

# +----+----+--------------------+--------------------+---+----------+
# |a_id|b_id|               value|                time| id|        tt|
# +----+----+--------------------+--------------------+---+----------+
# |   0|   0| 0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|
# |   0|   0|  0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|
# |   0|   0|  0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|
# |   0|   0| 0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|
# |   0|   0|  0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|
# +----+----+--------------------+--------------------+---+----------+

# Get the minimum and maximum dates
tmin = df.select(F.min("tt")).collect()[0][0]
tmax = df.select(F.max("tt")).collect()[0][0]

# Get the number of seconds in a week
week = 60 * 60 * 24 * 7

# Get a list of bucket splits (add infinity for last split if weeks don't evenly divide)
splits = list(range(tmin, tmax, week)) + [float("inf")]

# Create bucket and bucket your data 
bucketizer = Bucketizer(inputCol="tt", outputCol="num_week", splits=splits)
bucketed_df = Bucketizer.transform(df)
bucketed_df.show(5)

# +----+----+-------------------+--------------------+---+----------+---------+
# |a_id|b_id|              value|                time| id|        tt|num_weeks|
# +----+----+-------------------+--------------------+---+----------+---------+
# |   0|   0|0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|403.0    |
# |   0|   0| 0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|818.0    |
# |   0|   0| 0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|627.0    |
# |   0|   0|0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|607.0    |
# |   0|   0| 0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|827.0    |
# +----+----+-------------------+--------------------+---+----------+---------+

# Convert the buckets to a timestamp (seconds in week * bucket value + min_date)
bucketed_df = bucketed_df.withColumn(
    "time",
    F.from_unixtime(col("weeks") * week + tmin).cast("date")))
bucketed_df.show(5)

# +----+----+-------------------+----------+---+----------+-----+
# |a_id|b_id|              value|      time| id|        tt|weeks|
# +----+----+-------------------+----------+---+----------+-----+
# |   0|   0|0.15601864044243652|2007-09-24| 00|1190962800|403.0|
# |   0|   0| 0.6011150117432088|2015-09-07| 00|1441695600|818.0|
# |   0|   0| 0.9699098521619943|2012-01-09| 00|1326182400|627.0|
# |   0|   0|0.18182496720710062|2011-08-22| 00|1314514800|607.0|
# |   0|   0| 0.5247564316322378|2015-11-09| 00|1447574400|827.0|
# +----+----+-------------------+----------+---+----------+-----+

# Finally, do the groupBy and pivot as already explained
# (I already concatenated "a_id" and "b_id" into the column "id"
final_df = bucketed_df.groupBy("time").pivot("id").agg(F.avg("value"))
final_df.show(10)


#    +----------+--------------------+--------------------+-------------------+-------------------+
#    |      time|                  00|                  01|                 10|                 11|
#    +----------+--------------------+--------------------+-------------------+-------------------+
#    |2015-03-09|0.045227288910538066|  0.8633336495718252| 0.8229838050417675|               null|
#    |2000-07-03|                null|                null| 0.7855315583735368|               null|
#    |2013-09-09|  0.6334037565104235|                null|0.14284196481433187|               null|
#    |2005-06-06|                null|  0.9095933818175037|               null|               null|
#    |2017-09-11|                null|  0.9684887775943838|               null|               null|
#    |2004-02-23|                null|  0.3782888656818202|               null|0.26674411859262276|
#    |2004-07-12|                null|                null| 0.2528581182501112| 0.4189697737795244|
#    |2000-12-25|                null|                null| 0.5473347601436167|               null|
#    |2016-04-25|                null|  0.9918099513493635|               null|               null|
#    |2016-10-03|                null|0.057844449447160606| 0.2770125243259788|               null|
#    +----------+--------------------+--------------------+-------------------+-------------------+

这会让你得到你所需要的东西。不幸的是,实现pandas.DataFrame.ffill()和pandas.DataFrame.bfill()方法并不像在熊猫中那样容易,因为数据是分布式的。有关建议,请参见这里这里

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49508983

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档