首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >:ReadFromKafka不能使用数据(错误)?

:ReadFromKafka不能使用数据(错误)?
EN

Stack Overflow用户
提问于 2021-09-01 08:45:00
回答 2查看 721关注 0票数 1

运行env:

2.12-2.0.1

  • apach-beam

  • OS:Ubuntu20.04

  • kafka版本:库版本:apache-beam==2.32.0

操作步骤:

shell 1:在代码下面运行

代码语言:javascript
复制
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka

pipeline_options = PipelineOptions(["--runner=DirectRunner"])


def run():
    with beam.Pipeline(options=pipeline_options) as p:
        _ = (
            p
            | 'ReadData' >> ReadFromKafka(
                consumer_config={"bootstrap.servers": "localhost:9092"},
                topics=["my-first-topic"],
            )
            | 'PrintData' >> beam.Map(print)
        )

if __name__ == "__main__":
    run()

输出shell1

代码语言:javascript
复制
WARNING:apache_beam.runners.interactive.interactive_environment:You have limited Interactive Beam features since your ipython kernel is not connected to any notebook frontend.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
2.32.0: Pulling from apache/beam_java11_sdk
Digest: sha256:a45f89584071950d371966abf910869c456179ab54c7b5213e3f4e2a54bd2753
Status: Image is up to date for apache/beam_java11_sdk:2.32.0
docker.io/apache/beam_java11_sdk:2.32.0

外壳2

代码语言:javascript
复制
$ cd kafka_2.12-2.0.1/bin && ./kafka-console-producer.sh --topic "my-first-topic" --broker-list localhost:9092

>2
>3
>4

shell 1输出

代码语言:javascript
复制
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
2.32.0: Pulling from apache/beam_java11_sdk
Digest: sha256:a45f89584071950d371966abf910869c456179ab54c7b5213e3f4e2a54bd2753
Status: Image is up to date for apache/beam_java11_sdk:2.32.0
docker.io/apache/beam_java11_sdk:2.32.0
ERROR:root:severity: ERROR
timestamp {
  seconds: 1630485467
  nanos: 764000000
}
message: "Client failed to deque and process the value"
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* trace: "org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element \'org.apache.beam.sdk.io.kafka.KafkaRecord@4c9edf30\' with coder \'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)\'.\n\tat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1683)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2205)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2374)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2092)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)\n\tat org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:221)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:316)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:219)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:329)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalArgumentException: Unable to encode element \'org.apache.beam.sdk.io.kafka.KafkaRecord@4c9edf30\' with coder \'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)\'.\n\tat org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)\n\tat org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:385)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:259)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\nCaused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:63)\n\tat org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:70)\n\tat org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)\n\tat org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)\n\tat org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:385)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:259)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2205)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2374)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2092)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)\n\tat org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:221)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:316)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:219)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:329)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
*/
instruction_id: "bundle_116"
log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
thread: "31"

ERROR:root:severity: ERROR
timestamp {
  seconds: 1630485467
  nanos: 770000000
}
message: "Exception while trying to handle InstructionRequest bundle_116"
trace: "org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element \'org.apache.beam.sdk.io.kafka.KafkaRecord@4c9edf30\' with coder \'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)\'.

...
 

/home/newdisk/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    767         expected_timer_output)
    768
--> 769     result, splits = bundle_manager.process_bundle(
    770         data_input, data_output, input_timers, expected_timer_output)
    771     # Now we collect all the deferred inputs remaining from bundle execution.

/home/newdisk/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1118
   1119     if result.error:
-> 1120       raise RuntimeError(result.error)
   1121
   1122     if result.process_bundle.requires_finalization:

RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@4c9edf30' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1683)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2205)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2374)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2092)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:221)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:316)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:219)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:329)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@4c9edf30' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:385)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:259)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:63)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:70)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:385)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:259)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2205)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2374)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2092)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:221)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:316)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:219)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:329)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

看起来它与key_deserializerReadFromKafkavalue_deserializer args有关,所以我尝试更改它们:

代码语言:javascript
复制
# key_deserializer="org.apache.kafka.common.serialization.StringSerializer",
# value_deserializer="org.apache.kafka.common.serialization.StringSerializer",

但这又引起了另一个错误:

代码语言:javascript
复制
RuntimeError: java.lang.RuntimeException: Failed to build transform beam:external:java:kafkaio:typedwithoutmetadata:v1 from spec urn: "beam:external:java:kafkaio:typedwithoutmetadata:v1"
payload: "\n\213\002\n\035\n\017consumer_config\032\n*\b\n\002\020\a\022\002\020\a\n\020\n\006topics\032\006\032\004\n\002\020\a\n\026\n\020key_deserializer\032\002\020\a\n\030\n\022value_deserializer\032\002\020\a\n\027\n\017start_read_time\032\004\b\001\020\004\n\027\n\017max_num_records\032\004\b\001\020\004\n\025\n\rmax_read_time\032\004\b\001\020\004\n\037\n\031commit_offset_in_finalize\032\002\020\b\n\026\n\020timestamp_policy\032\002\020\a\022$6a700b0b-2839-492d-8629-9b3268d90919\022\272\001\t\002p\000\000\000\000\001\021bootstrap.servers\016localhost:9092\000\000\000\001\016my-first-topic6org.apache.kafka.common.serialization.StringSerializer6org.apache.kafka.common.serialization.StringSerializer\000\016ProcessingTime"

他们怎么了?我遗漏了什么吗?

EN

回答 2

Stack Overflow用户

发布于 2021-09-21 22:35:50

我也有这个问题。原来这把冒烟的枪是回溯中的这句话:

代码语言:javascript
复制
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]

它的意思是某个地方有一个null值,它不能对它进行编码。对我来说,结果是我用kafka-python key=None函数发送了producer.send,并将key值更改为一个字符串解决了问题。例如:

代码语言:javascript
复制
with open("../data/sample_data.json") as fp:
    for line in fp.readlines():
        producer.send(KAFKA_TOPIC, key=str.encode("foo"), value=str.encode(line))

还可以在key_serializer对象中设置KafkaProducervalue_serializer

提醒:即使在修复空读之后,在Dataflow Runner之外的Python中从Kafka读取仍然存在问题。请参阅:https://issues.apache.org/jira/browse/BEAM-11998

票数 0
EN

Stack Overflow用户

发布于 2021-11-18 08:54:37

我也有同样的问题。正如我所理解的,ReadFromKafka希望以键读取数据:value格式。我的解决方案是使用以下选项启动kafka-console-producer.sh:

代码语言:javascript
复制
--property "parse.key=true" --property "key.separator=:" 

并在格式键中添加数据:value (f.e。名称:彼得代替彼得)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69010506

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档