我试图按下面的方式运行flink作业来读取Apache &print中的数据:
Java程序
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "test.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
properties.setProperty("topic", "topic_name");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();Scala代码
var properties = new Properties();
properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
stream.print();
env.execute();每当我在eclipse中的app中运行这个程序时,我就会看到下面的内容:
03/27/2017 20:06:19作业执行切换到状态运行。
03/27/2017 20:06:19来源:自定义源-> Sink:未命名(1/4)切换到预定的03/27/2017 20:06:19来源:自定义源-> Sink:未命名(1/4)切换到部署03/27/2017 20:06:19:自定义源-> Sink:未命名(3/4)切换到计划的03/27/2017 20:06:19来源:自定义源-> Sink:未命名(3/4)切换到部署03/27/2017 20:06:19)切换到运行03/27/2017 20:06:19来源:自定义源-> Sink:未命名(2/4)切换到运行03/27/2017 20:06:19
我的问题是:
1)为什么在所有情况下(计划、部署和运行)都会看到4个接收器实例。
2)在Apache中收到的每一行,我都看到这里被打印了很多次,大部分是4次。原因是什么?
理想情况下,我只想读一次每一行,并对其进行进一步的处理。任何输入/帮助都将是值得赞赏的!
发布于 2017-03-28 07:56:19
如果在LocalStreamEnvironment中运行程序(在IDE中调用StreamExecutionEnvironment.getExecutionEnvironment()时会得到),那么所有操作符的默认并行性都等于CPU核心的数量。
因此,在您的示例中,每个操作符被并行为四个子任务。在日志中,您可以看到这四个子任务中每个子任务的消息(3/4表示这是总共四个任务中的第三个)。
您可以通过调用StreamExecutionEnvironment.setParallelism(int)或对每个操作符调用setParallelism(int)来控制子任务的数量。
根据你的程序,卡夫卡的记录不应该被复制。每个记录只能打印一次。但是,由于记录是并行写入的,所以输出行以x>作为前缀,其中x指示发出该行的并行子任务的id。
https://stackoverflow.com/questions/43058533
复制相似问题