在我的apache-beam作业中,我调用了一个外部源,GCP存储,这可以被认为是一个通用的http调用,重要的是它是一个外部调用来丰富作业。
我在处理每一条数据时,都会调用此API来获取一些信息来丰富数据。在API上有大量对相同数据的重复调用。
有没有一种好的方法来缓存或存储结果,以便对处理的每个数据块重复使用,以限制所需的网络流量。这是一个巨大的处理瓶颈。
发布于 2019-06-14 00:20:25
您可以考虑将此值作为实例状态保存在DoFn上。例如
class MyDoFn(beam.DoFn):
def __init__(self):
# This will be called during construction and pickled to the workers.
self.value1 = some_api_call()
def setup(self):
# This will be called once for each DoFn instance (generally
# once per worker), good for non-pickleable stuff that won't change.
self.value2 = some_api_call()
def start_bundle(self):
# This will be called per-bundle, possibly many times on a worker.
self.value3 = some_api_call()
def process(self, element):
# This is called on each element.
key = ...
if key not in self.some_lru_cache:
self.some_lru_cache[key] = some_api_call()
value4 = self.some_lru_cache[key]
# Use self.value1, self.value2, self.value3 and/or value4 here.发布于 2019-06-06 04:57:00
在Beam中没有内部持久层。您必须下载要处理的数据。这可能会发生在所有都必须访问数据的工作人员的车队上。
但是,您可能希望考虑将数据作为辅助输入进行访问。您将必须预加载所有数据,并且不需要查询每个元素的外部源:https://beam.apache.org/documentation/programming-guide/#side-inputs
特别是对于https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java,您可能希望尝试使用现有的IO,例如TextIO: GCS
https://stackoverflow.com/questions/56466949
复制相似问题