我目前对一个问题有一个糟糕的解决方案,但它是有效的。我想就如何改进我现有的解决方案提出建议。我有一个spark数据框架,它由来自许多不同机器的传感器数据组成,目前格式很长。下面是一个示例:
machine | run | timestep | sensor1 | sensor2 |
A | 1 | 2020-10-11 00:00:10 | 10 | 200 |
A | 1 | 2020-10-11 00:00:20 | 11 | 200 |
A | 1 | 2020-10-11 00:00:30 | 1 | 200 |
B | 1 | 2020-10-11 01:10:10 | 10 | 10 |
B | 1 | 2020-10-11 01:10:20 | 1000 | 5 |
A | 1 | 2020-10-11 00:00:40 | 10 | 200 |
A | 2 | 2020-20-11 00:00:10 | 10 | 200 |
...在代码中,我有一个机器(键)字典,其中包含相关的时间范围(值)列表。我只想在提供的时间范围内提取每台指定机器的所有信息。例如
{"A": [("2020-10-1 00:00:00", "2020-12-30"), ("2021-1-15", "2021-3-30"))], ...}是字典中的示例条目。因此,在这种情况下,我想为一台设备提取给定时间范围内的两组数据。我目前遍历字典,每个时间范围运行一次查询,每个结果都保存到一个文件中。然后,我遍历保存的文件,并将所有单独的数据帧合并为一个数据帧。
以下是在代码中创建的过程的示例
for machine, machine_parts in lifetimes.items():
for machine_part in machine_parts:
query = f"""
select `timestamp`, sensor1, run, machine
from database.table
where machine = '{machine}'
and start >= '{machine_part.install}'
and end <= '{machine_part.removal}'
order by start, `timestamp` asc
"""
print(f"Executing query: {query}")
spark = get_spark_context()
df = spark.sql(query).toPandas()
filename = f"{machine}_{machine_part.install}_{machine_part.removal}.csv".replace(
" ", "_"
)
MACHINE_PART_LIFETIME_DIR.mkdir(parents=True, exist_ok=True)
filepath = os.path.join(HEATER_LIFETIME_DIR, filename)
print(f"Saving to: {filepath}")
df.to_csv(filepath, index=False)
print("-" * 20)理想情况下,我认为应该有一个查询能够一次完成所有这些,而不是运行多个查询,保存输出,重新打开,合并到一个数据帧中,然后保存结果。这应该允许我不必将每个spark数据帧转换为pandas数据帧,保存到磁盘,然后重新打开每个数据帧并合并为一个。有没有办法用pyspark动态地做到这一点呢?
发布于 2021-04-23 21:29:35
因此,正如@mck建议的那样,我能够通过使用join来极大地改进这一点。对于那些感兴趣的人,这里是我使用的相关代码。
要从字典转到spark dataframe:
values = []
for machine, machine_parts in lifetimes.items():
for machine_part in machine_parts:
values.append((machine, machine_part.install, machine_part.removal))
columns = ["machine", "install_date", "removal_date"]
df = spark.createDataFrame(values, columns)要进行联接,请执行以下操作:
df_joined = df1.join(df).where((df1.machine == df.machine) & (df1.start >= df.install_date) & (df1.end<= df.removal_date))https://stackoverflow.com/questions/67210160
复制相似问题