首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Storm Trident拓扑中关闭由IBackingMap实现打开的数据库连接?

如何在Storm Trident拓扑中关闭由IBackingMap实现打开的数据库连接?
EN

Stack Overflow用户
提问于 2015-02-26 21:13:12
回答 1查看 348关注 0票数 10

我正在为我的三叉树拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch (我知道GitHub已经有几个集成了三叉树/ElasticSearch的实现,但我决定实现一个更适合我的任务的自定义实现)。

所以我的实现是一个带有工厂的经典实现:

代码语言:javascript
复制
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}

您可以看到,它获取主机/端口/集群名称作为输入参数,并创建一个ElasticSearch客户机作为类的成员,但它从不关闭客户机。

然后在拓扑中以一种非常熟悉的方式使用它:

代码语言:javascript
复制
tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));

这个拓扑被包装到某个公共静态void main中,打包在一个jar中,并发送到Storm以供执行。

问题是,我应该担心关闭ElasticSearch连接,还是这是风暴自己的事?如果它不是由Storm完成的,那么在拓扑的生命周期中,我应该如何以及何时做到这一点?

提前感谢!

EN

回答 1

Stack Overflow用户

发布于 2015-08-05 22:35:55

好吧,回答我自己的问题。

首先,再次感谢@dedek的建议,并在Storm's Jira中恢复门票。

最后,由于没有正式的方法可以做到这一点,我决定使用三叉树过滤器的cleanup()方法。到目前为止,我已经验证了以下内容(对于Storm v0.9.4):

使用LocalCluster的

在集群的shutdown

  • cleanup()上调用
  • cleanup()在终止拓扑时没有调用,这不应该是一个悲剧,很可能人们无论如何都不会使用LocalCluster进行真正的部署

具有真实集群

当拓扑被终止以及工作进程使用pkill停止时,'backtype.storm.daemon.worker'

  • it将被调用-u

  • 如果工作进程被kill -9终止,或者当它崩溃时,或者可悲的是,当工作进程由于异常

而死亡时,不会被调用-f

总而言之,这或多或少保证了cleanup()的调用,前提是你要小心异常处理(我倾向于在我的每个Trident原语中添加'thundercatches‘)。

我的代码:

代码语言:javascript
复制
public class CloseFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);

    private final Closeable[] closeables;

    public CloseFilter(Closeable... closeables) {
        this.closeables = closeables;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        return true;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {
        for (Closeable c : closeables) {
            try {
                c.close();
            } catch (Exception e) {
                LOG.warn("Failed to close an instance of {}", c.getClass(), e);
            }
        }
    }
}

然而,如果有一天用于关闭连接的钩子成为API的一部分,那就太好了。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28743434

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档