首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache多个输出行

Apache多个输出行
EN

Stack Overflow用户
提问于 2017-03-28 00:13:18
回答 1查看 1.6K关注 0票数 0

我试图按下面的方式运行flink作业来读取Apache &print中的数据:

Java程序

代码语言:javascript
复制
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "test.net:9092");
    properties.setProperty("group.id", "flink_consumer");
    properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
    properties.setProperty("topic", "topic_name");

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));

            messageStream.rebalance().map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                public String map(String value) throws Exception {
                    return "Kafka and Flink says: " + value;
                }
            }).print();

            env.execute();

Scala代码

代码语言:javascript
复制
  var properties = new Properties();
  properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
  properties.setProperty("group.id", "flink_consumer");
  properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
  properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
  var env = StreamExecutionEnvironment.getExecutionEnvironment();
  var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
  stream.print();
  env.execute();

每当我在eclipse中的app中运行这个程序时,我就会看到下面的内容:

03/27/2017 20:06:19作业执行切换到状态运行。

03/27/2017 20:06:19来源:自定义源-> Sink:未命名(1/4)切换到预定的03/27/2017 20:06:19来源:自定义源-> Sink:未命名(1/4)切换到部署03/27/2017 20:06:19:自定义源-> Sink:未命名(3/4)切换到计划的03/27/2017 20:06:19来源:自定义源-> Sink:未命名(3/4)切换到部署03/27/2017 20:06:19)切换到运行03/27/2017 20:06:19来源:自定义源-> Sink:未命名(2/4)切换到运行03/27/2017 20:06:19

我的问题是:

1)为什么在所有情况下(计划、部署和运行)都会看到4个接收器实例。

2)在Apache中收到的每一行,我都看到这里被打印了很多次,大部分是4次。原因是什么?

理想情况下,我只想读一次每一行,并对其进行进一步的处理。任何输入/帮助都将是值得赞赏的!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-28 07:56:19

如果在LocalStreamEnvironment中运行程序(在IDE中调用StreamExecutionEnvironment.getExecutionEnvironment()时会得到),那么所有操作符的默认并行性都等于CPU核心的数量。

因此,在您的示例中,每个操作符被并行为四个子任务。在日志中,您可以看到这四个子任务中每个子任务的消息(3/4表示这是总共四个任务中的第三个)。

您可以通过调用StreamExecutionEnvironment.setParallelism(int)或对每个操作符调用setParallelism(int)来控制子任务的数量。

根据你的程序,卡夫卡的记录不应该被复制。每个记录只能打印一次。但是,由于记录是并行写入的,所以输出行以x>作为前缀,其中x指示发出该行的并行子任务的id。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43058533

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档