首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >闪烁,TaskManager无响应

闪烁,TaskManager无响应
EN

Stack Overflow用户
提问于 2019-03-14 13:57:53
回答 1查看 382关注 0票数 1

这是一个例子,我们有3个kafka主题(每个有50个分区),它们有不同的消息,而所有这些消息都有字段‘用户名’,

代码语言:javascript
复制
topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute

我们定义了一个包装器类,

代码语言:javascript
复制
MessageWrapper{
 List<Message01> list01;
 List<Message02> list02;
 List<Message03> list03;
}

我们有一个flatMap,可以将原始消息‘转换’成tuple3,

代码语言:javascript
复制
String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object

所有3个流都由类似的flatMap()函数处理,

代码语言:javascript
复制
public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
        throws Exception {
    String name = value.getUsername();
    if (!StringUtils.isBlank(name)) {
        MessageWrapper wrapper = new MessageWrapper();
        List<Message01> list = new ArrayList<>();
        list.add(value);
        wrapper.setList01(list);
        out.collect(new Tuple3<>(name, 1, wrapper));
    }
}

在flatMap()之后,我们合并这3个流,

代码语言:javascript
复制
stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
        .process(
                new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {

                    @Override
                    public void process(Tuple key,
                            ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
                            Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
                            Collector<MessageWrapper> out) throws Exception {
                        // merge all entities which have same username, to get a big fat wrapper object
                        MessageWrapper w = new MessageWrapper();
                        for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
                            MessageWrapper ret = t3.f2;
                            Integer type = t3.f1;
                            if (type == 1) {
                                // add to list01
                            } else if (type == 2) {
                                // add to list02
                            } else if (type == 3) {
                                // add to list03
                            }
                        }

                        if (all 3 lists are not empty) {
                            out.collect(ret);
                        }
                    }
                });

目前我们使用20个任务管理器,每个4核+ 16G,总共80个插槽,我们使用50个并行通过。

我们总是遇到任务管理器没有响应的问题,因为有太多的完整gc,

代码语言:javascript
复制
Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".

如果我们将时间窗口从5分钟减少到1分钟,一切都很好。根据这一点,看起来flink集群没有足够的资源,但是对于数百万条消息(每条消息的大小约为5KB)来说,80核+ 320G应该足够了,对吧?

有没有人能在这里说点什么?或者可能在代码中有一些问题?

EN

回答 1

Stack Overflow用户

发布于 2019-04-03 23:56:43

我通过在所有机器的/etc/hosts文件上使用127.0.1.1注释行,解决了集群设置中的这个问题。并且我在conf/flink-conf.yaml文件的属性taskmanager.numberOfTaskSlots:上增加了槽的并行度。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55155839

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档