这是一个例子,我们有3个kafka主题(每个有50个分区),它们有不同的消息,而所有这些消息都有字段‘用户名’,
topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute我们定义了一个包装器类,
MessageWrapper{
List<Message01> list01;
List<Message02> list02;
List<Message03> list03;
}我们有一个flatMap,可以将原始消息‘转换’成tuple3,
String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object所有3个流都由类似的flatMap()函数处理,
public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
throws Exception {
String name = value.getUsername();
if (!StringUtils.isBlank(name)) {
MessageWrapper wrapper = new MessageWrapper();
List<Message01> list = new ArrayList<>();
list.add(value);
wrapper.setList01(list);
out.collect(new Tuple3<>(name, 1, wrapper));
}
}在flatMap()之后,我们合并这3个流,
stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
.process(
new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {
@Override
public void process(Tuple key,
ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
Collector<MessageWrapper> out) throws Exception {
// merge all entities which have same username, to get a big fat wrapper object
MessageWrapper w = new MessageWrapper();
for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
MessageWrapper ret = t3.f2;
Integer type = t3.f1;
if (type == 1) {
// add to list01
} else if (type == 2) {
// add to list02
} else if (type == 3) {
// add to list03
}
}
if (all 3 lists are not empty) {
out.collect(ret);
}
}
});目前我们使用20个任务管理器,每个4核+ 16G,总共80个插槽,我们使用50个并行通过。
我们总是遇到任务管理器没有响应的问题,因为有太多的完整gc,
Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".如果我们将时间窗口从5分钟减少到1分钟,一切都很好。根据这一点,看起来flink集群没有足够的资源,但是对于数百万条消息(每条消息的大小约为5KB)来说,80核+ 320G应该足够了,对吧?
有没有人能在这里说点什么?或者可能在代码中有一些问题?
发布于 2019-04-03 23:56:43
我通过在所有机器的/etc/hosts文件上使用127.0.1.1注释行,解决了集群设置中的这个问题。并且我在conf/flink-conf.yaml文件的属性taskmanager.numberOfTaskSlots:上增加了槽的并行度。
https://stackoverflow.com/questions/55155839
复制相似问题