我在Palantir Foundry中有一个80 of的日期分区数据集,它每3小时在增量附加事务中摄取300-450 of的数据。我想要创建一个增量转换,使用它作为输入。
但是,数据集太大,无法一次读取初始快照。附加到数据集的数据将足够小,足以在初始快照后处理每个增量构建。如何解析输入数据集中的待办事项并达到可以在增量模式下运行转换的地步?
发布于 2022-07-29 23:38:46
当读取已逐步构造的大型输入数据集时,Foundry无法从输入事务的某些子集读取。您必须同时读取整个输入数据集(snapshot模式),或者只读取自上次生成输出(incremental模式)以来已写入的输入事务。
为了解决这个问题,我们必须聪明地解析输入。这是一个转变:
from transforms.api import transform, Input, Output, incremental
from pyspark.sql import Row
from pyspark.sql import functions as F, types as T, SparkSession as S
import datetime
# set this value for the type of build:
# "first" for a snapshot run on a single date (sets placeholder_date, runs snapshot)
# "catchup" for subsequent runs on subsequent dates (reads from placeholder_date to decide what date to run, then runs update from full read)
# "continuing" for ongoing incremental runs
PHASE = 'first'
# Where data begins
START_DATE = datetime.date(2022, 7, 1)
# Where we want the automated rebuild process to stop.
# Set this value to less than the most recent date for reasons discussed in the accompanying post
END_DATE = datetime.date(2022, 7, 22)
DAYS_PER_RUN = 4 # How many days worth of data do we want each 'catchup' run to read
placeholder_date_schema = T.StructType([
T.StructField("date", T.DateType(), True)
])
@incremental(semantic_version=3)
@transform(
output=Output("output"),
placeholder_date=Output("placeholder_date"),
source=Input("input"),
)
def compute(source, output, placeholder_date):
# First and Catchup Builds
if((PHASE == 'first') | (PHASE == 'catchup')):
df = source.dataframe('current') # read the entire input dataset
# Continuing Builds
if(PHASE == 'continuing'):
df = source.dataframe() # read the latest incremental appends
# First Build: Build placeholder_date initially
if(PHASE == 'first'):
spark = S.builder.getOrCreate()
next_output_last_date = START_DATE + datetime.timedelta(days=(DAYS_PER_RUN-1))
most_recent_output_date = START_DATE - datetime.timedelta(days=1)
placeholder_date_df = spark.createDataFrame(data=[Row(next_output_last_date)], schema=placeholder_date_schema)
# Catchup Builds: Use placeholder_date to get the previous starting time
if(PHASE == 'catchup'):
placeholder_date_df = placeholder_date.dataframe('previous', placeholder_date_schema)
most_recent_output_date = placeholder_date_df.collect()[0][0] # noqa
next_output_last_date = most_recent_output_date + datetime.timedelta(days=DAYS_PER_RUN)
# Ensure that the time window doesn't go past the end date by curtailing the period if necessary
if next_output_last_date >= END_DATE:
next_output_last_date == END_DATE
# Ensure we don't run once we pass the end point
if most_recent_output_date >= END_DATE:
return True # this will result in the build completing without writing or reading any further data
placeholder_date_df = placeholder_date_df.withColumn("date", F.lit(next_output_last_date))
# First and Catchup Builds: Write the placeholder
# It's safe to write the placeholder because if the build fails the placeholder transaction will also be aborted
if((PHASE == 'first') | (PHASE == 'catchup')):
placeholder_date.set_mode('replace')
placeholder_date.write_dataframe(placeholder_date_df, output_format='csv')
# Filter the whole input dataset
df = df.where((F.col("date") > F.lit(most_recent_output_date)) & (F.col("date") <= F.lit(next_output_last_date)))
# Transform the data as required
df = transform_data(df)
# Write the output
output.write_dataframe(df, partition_cols=["date"])
# Define whatever transformations you want to perform here
def transform_data(df):
return df转换有三个“阶段”- first、catchup和continuing。在first阶段运行一次转换,然后在catchup阶段按需要运行多次,直到解析了整个现有的输入数据集。最后,一旦完成,您将其切换到continuing阶段,并在每次输入更新时将其调度为运行(递增)。
生成将状态存储在placeholder_date dataset中,该数据集是在first构建中创建的,并在catchup构建期间读取以确定catchup进程的运行位置。catchup模式有一个额外的故障安全,如果构建继续超过END_DATE,它将不会写出空事务。这允许您在catchup阶段设置一个(部队建设)计划(例如每10分钟一次),然后简单地离开它,定期回来检查,而不必仔细地计时catchup阶段的端点。一旦您完成了catchup阶段,您可以将转换设置为continuing模式,它将切换到完全增量的行为。
Notes
在上面的示例代码中,使用由date划分的输入数据集非常有用。这将使过滤变得更便宜和更容易。然而,这将工作(虽然要慢得多)没有蜂巢分区的输入。尽管如此,用date对增量数据集进行分区是一个很好的实践,本例中的输出将date的输出划分为便于将来使用。
注意:此过程假定所摄取的数据是按时间顺序的,即来自后续增量追加的数据将具有与上一个增量追加中的最新日期相同或更晚的日期值。如果您的输入数据集在时间上没有单调增长,如果不小心管理,此技术可能导致数据丢失。例如,假设您对前4天的数据运行catchup模式(一次运行一天数据)。当您运行第三天的构建时,数据将被输入到包含第一天数据的输入中。catchup模式将不会解析这些数据,因为它已经接收了第一天的数据,随后的catchup构建将从第一天起过滤掉数据。此外,在最终成功的catchup构建之前出现的任何附加输入都不会被转换的continuing阶段看到,因为它们自上次成功构建以来就不再是新的数据了。如果这种情况发生在您身上,您可以通过标识行为并对其进行解释来确保数据的完整性:假设每个后续附加到输入数据集的数据可以包含最多3天前的数据,并假设您希望从第1天赶上第30天(今天)。因此,您知道在运行catchup构建时,将在1-27天内不发布任何新的数据。如果将END_DATE设置为第27天,则第一个continuing构建将有相当大的增量构建,但不会出现数据丢失。
注意:I选择将first、catchup和continuing阶段之间的切换作为手动过程,原因有两个:
首先,您可以将first和catchup阶段合并为一个单一阶段,方法是将从placeholder_date数据集中读取的过程包装在try/catch中,但这使您处于依赖错误处理控制流的位置,这通常是不明智的。
其次,一旦catchup阶段完成,continuing阶段将放弃不再适合使用的placeholder_date数据集(因为continuing阶段正在读取可能是日内或其他混合日期的事务)。因此,不可能安全地确定下一个构建应该是来自现有已知状态的catchup还是continuing。
https://stackoverflow.com/questions/73171849
复制相似问题