首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将同一表的多个pyspark sql查询合并为一个查询

如何将同一表的多个pyspark sql查询合并为一个查询
EN

Stack Overflow用户
提问于 2021-04-22 17:05:34
回答 1查看 134关注 0票数 1

我目前对一个问题有一个糟糕的解决方案,但它是有效的。我想就如何改进我现有的解决方案提出建议。我有一个spark数据框架,它由来自许多不同机器的传感器数据组成,目前格式很长。下面是一个示例:

代码语言:javascript
复制
machine | run | timestep            | sensor1 | sensor2 |
A       |  1  | 2020-10-11 00:00:10 | 10      | 200     |
A       |  1  | 2020-10-11 00:00:20 | 11      | 200     |
A       |  1  | 2020-10-11 00:00:30 | 1       | 200     |
B       |  1  | 2020-10-11 01:10:10 | 10      | 10      |
B       |  1  | 2020-10-11 01:10:20 | 1000    | 5       |
A       |  1  | 2020-10-11 00:00:40 | 10      | 200     |
A       |  2  | 2020-20-11 00:00:10 | 10      | 200     |
...

在代码中,我有一个机器(键)字典,其中包含相关的时间范围(值)列表。我只想在提供的时间范围内提取每台指定机器的所有信息。例如

代码语言:javascript
复制
{"A": [("2020-10-1 00:00:00", "2020-12-30"), ("2021-1-15", "2021-3-30"))], ...}

是字典中的示例条目。因此,在这种情况下,我想为一台设备提取给定时间范围内的两组数据。我目前遍历字典,每个时间范围运行一次查询,每个结果都保存到一个文件中。然后,我遍历保存的文件,并将所有单独的数据帧合并为一个数据帧。

以下是在代码中创建的过程的示例

代码语言:javascript
复制
    for machine, machine_parts in lifetimes.items():
        for machine_part in machine_parts:
            query = f"""
            select `timestamp`, sensor1, run, machine
            from database.table
            where machine = '{machine}'
            and start >= '{machine_part.install}'
            and end <= '{machine_part.removal}'
            order by start, `timestamp` asc
            """

            print(f"Executing query: {query}")
            spark = get_spark_context()
            df = spark.sql(query).toPandas()

            filename = f"{machine}_{machine_part.install}_{machine_part.removal}.csv".replace(
                " ", "_"
            )

            MACHINE_PART_LIFETIME_DIR.mkdir(parents=True, exist_ok=True)

            filepath = os.path.join(HEATER_LIFETIME_DIR, filename)
            print(f"Saving to: {filepath}")
            df.to_csv(filepath, index=False)
            print("-" * 20)

理想情况下,我认为应该有一个查询能够一次完成所有这些,而不是运行多个查询,保存输出,重新打开,合并到一个数据帧中,然后保存结果。这应该允许我不必将每个spark数据帧转换为pandas数据帧,保存到磁盘,然后重新打开每个数据帧并合并为一个。有没有办法用pyspark动态地做到这一点呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-23 21:29:35

因此,正如@mck建议的那样,我能够通过使用join来极大地改进这一点。对于那些感兴趣的人,这里是我使用的相关代码。

要从字典转到spark dataframe:

代码语言:javascript
复制
values = []
for machine, machine_parts in lifetimes.items():
    for machine_part in machine_parts:
        values.append((machine, machine_part.install, machine_part.removal))
columns = ["machine", "install_date", "removal_date"]
df = spark.createDataFrame(values, columns)

要进行联接,请执行以下操作:

代码语言:javascript
复制
df_joined = df1.join(df).where((df1.machine == df.machine) & (df1.start >= df.install_date) & (df1.end<= df.removal_date))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67210160

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档