每次我运行简单的flink作业时,Iv都会出现这个错误
若要使用组管理或偏移提交API,必须在使用者配置中提供有效的org.apache.kafka.common.errors.InvalidGroupIdException:。
我试着添加ConsumerConfig.GROUP_ID_CONFIG,它对我不起作用
有人能帮帮我吗?
这是代码
StreamExecutionEnvironment cdcEnv = StreamExecutionEnvironment.getExecutionEnvironment();
cdcEnv.setParallelism(1);
cdcEnv.enableCheckpointing(5000);
cdcEnv.setStateBackend(new FsStateBackend(ResourceUtil.getKey("checkpointUri") + "/flink/checkpoint/cp2"));
cdcEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
cdcEnv.getCheckpointConfig().setCheckpointTimeout(5000);
cdcEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
cdcEnv.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", ResourceUtil.getKey("bootstrapServers"));
properties.setProperty("ConsumerConfig.GROUP_ID_CONFIG", "cloudera_mirrormaker");
DataStream<String> cdcDataStream = cdcEnv.addSource(new FlinkKafkaConsumer<String>("ods_decl", new SimpleStringSchema(), properties));
cdcDataStream
.addSink(new CdcSink(1, 1000L))
.name("cdc sink.");
cdcEnv.execute("CDC Flink Job");发布于 2022-05-31 07:27:00
从"ConsumerConfig.GROUP_ID_CONFIG"中删除引号:用properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cloudera_mirrormaker");替换properties.setProperty("ConsumerConfig.GROUP_ID_CONFIG", "cloudera_mirrormaker");
这是因为在Java中,双引号下的文本被解释为字符串。
https://stackoverflow.com/questions/66450477
复制相似问题