首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >-不能选择<class‘class’>的变异class‘>

-不能选择<class‘class’>的变异class‘>
EN

Stack Overflow用户
提问于 2021-12-07 23:16:18
回答 1查看 168关注 0票数 1

我正在编写一个Dataflow管道,使用Apache向Bigtable添加大量的数据行。

  • apache-beam==2.24.0
  • google-cloud-bigtable==2.4.0

在编写Bigtable之前,在管道中使用以下方法创建Bigtable行:

代码语言:javascript
复制
class CreateBigtableRow(beam.DoFn):

    def __init__(self, settings):
        self.column_family = settings["bigtable_column_family"]
        super(CreateBigtableRow, self).__init__()

    def process(self, usage_data, *args, **kwargs):
        row_key = BigTable.generate_row_key(usage_data, key_order)
        return [
            BigTable.create_row_and_assign_values(
                row_key, usage_data, self.column_family
            )
        ]

其中`create_row_and_assign_values被定义为:

代码语言:javascript
复制
def create_row_and_assign_values(
    cls, key: str, row: dict, column_family: str
) -> DirectRow:
    table_row = DirectRow(key.encode())
    for key, val in row.items():
        if isinstance(val, float):
            val = struct.pack(">d", val)
        table_row.set_cell(column_family, key.encode(), val)
    return table_row

我的管道如下:

代码语言:javascript
复制
with beam.Pipeline(options=pipeline_options) as pipe:
    (
        pipe
        | beam.Create(["/sample_files/*combined*"])  # reads sample csv file
        | fileio.MatchAll()
        | fileio.ReadMatches()
        | beam.FlatMap(
            lambda file: csv.DictReader(open(file.metadata.path))
        )
        | "Transform to Usage dict" >> beam.ParDo(TransformToBigtableData())
        | "Create Bigtable Row" >> beam.ParDo(CreateBigtableRow(bigtable_settings))
        | WriteToBigTable(
            project_id=bigtable_settings["bigtable_project"],
            instance_id=bigtable_settings["bigtable_instance"],
            table_id=bigtable_settings["bigtable_table"])
    )

我遇到的问题是我搞错了

代码语言:javascript
复制
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']

当运行管道的时候。我已经添加了一些步骤,通过使用google-cloud-bigtable库的Bigtable Client手动处理记录,但更倾向于使用内置WriteToBigTable方法,因为它为我处理所有事情。

全堆栈跟踪:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/app/src/ingest/main.py", line 226, in <module>
    run(
  File "/app/src/ingest/main.py", line 149, in run
    (
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 596, in __exit__
    self.result = self.run()
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 546, in run
    return Pipeline.from_runner_api(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 573, in run
    return self.runner.run_pipeline(self, self._options)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 384, in run_stages
    stage_results = self._run_stage(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 646, in _run_stage
    self._run_bundle(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 769, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 995, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 354, in output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
    self.consumer.process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
    self._reraise_augmented(exn)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1316, in _reraise_augmented
    raise new_exn.with_traceback(tb)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
    self.output_processor.process_outputs(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
    self.main_receivers.receive(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 215, in receive
    self.update_counters_start(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 179, in update_counters_start
    self.opcounter.update_from(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 211, in update_from
    self.do_sample(windowed_value)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 250, in do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 1371, in get_estimated_size_and_observables
    self._value_coder.get_estimated_size_and_observables(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 358, in get_estimated_size_and_observables
    self.encode_to_stream(value, out, nested)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 422, in encode_to_stream
    self.fallback_coder_impl.encode_to_stream(value, stream, nested)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 262, in encode_to_stream
    return stream.write(self._encoder(value), nested)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 800, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-09 05:18:20

你的版本太高了。

在更新apache束依赖项here方面有一些进展。

他们也有同样的问题。你能把你的bigtable版本倒转到2之前的某个版本吗?如果你运行这个:

代码语言:javascript
复制
pip install apache-beam[gcp]

它将安装推荐的版本。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70268107

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档