首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Hazelcast Jet查询

Hazelcast Jet查询
EN

Stack Overflow用户
提问于 2018-04-10 06:51:22
回答 1查看 320关注 0票数 0

我有以下关于Hazelcast Jet的疑问

用例,如下所示:

有一个应用程序(部署在集群中的应用程序'A‘)使用Hazelcast IMDG,并在hazelcast IMap中放置数百万条记录/事务。

已为此IMap配置了“事件日刊”。

还有另一个应用程序(部署在集群中的应用程序B )实例化JetInstance并在每个节点上单独运行作业来处理记录。

目前,这项工作从事件日志中读取数据并添加到IList (Reference -IList)中。

由于作业在多个节点上运行,事件日志中的记录由多个节点处理。这将导致IList中的多个条目。

是否可以确保记录仅由“应用程序B”的一个节点处理,而不由其他节点处理以避免重复?

如果不是,这是否意味着作业将由'Application B‘集群的单个节点运行?

这里是一个示例代码(应用程序B)

代码语言:javascript
复制
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

这里是一个完整的代码.

应用--源代码.

代码语言:javascript
复制
public class RemoteMapJournalSourceSrv1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

    Config hzConfig = getConfig();
    HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);

    try {
        IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
        System.out.println("*************** Initial Map  address " +  map.size() );

        while(true) {
            System.out.println("***************map size "+map.size());  
            TimeUnit.SECONDS.sleep(20);
        }

    } finally {
        Hazelcast.shutdownAll();
    }
}

private static HazelcastInstance startRemoteHzCluster(Config config) {
    HazelcastInstance remoteHz = Hazelcast.newHazelcastInstance(config);
    return remoteHz;
}

private static Config getConfig() {
    Config config = new Config();
    // Add an event journal config for map which has custom capacity of 1000 (default 10_000)
    // and time to live seconds as 10 seconds (default 0 which means infinite)
    config.addEventJournalConfig(new EventJournalConfig().setEnabled(true)
                                                         .setMapName(MAP_NAME)
                                                         .setCapacity(10000)
                                                         .setTimeToLiveSeconds(100));
    return config;
}

这里是应用程序B-节点1示例代码

代码语言:javascript
复制
public class RemoteMapJournalSourceCL1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);

        int size1 = list1.size();

        System.out.println("***************List Initial size "+size1);

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, false))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           

    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}

private static String getAddress(HazelcastInstance remoteHz) {
    Address address = remoteHz.getCluster().getLocalMember().getAddress();
    System.out.println("***************Remote address " + address.getHost() + ":" + address.getPort() );
    return address.getHost() + ":" + address.getPort();
}

private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

这里是应用程序B-节点2示例代码

代码语言:javascript
复制
public class RemoteMapJournalSourceCL2 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);
        int size1 = list1.size();
        System.out.println("***************List Initial size "+size1);


        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           
    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}
private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

Hazelcast客户端-在Hazelcast地图(应用程序A)中输入条目

代码语言:javascript
复制
public class HZClient {

  public static void main(String[] args) {

    ClientConfig clientConfig = new ClientConfig();
    GroupConfig groupConfig = new GroupConfig();

    clientConfig.getNetworkConfig().addAddress("localhost:5701");
    clientConfig.setGroupConfig(groupConfig);

    HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
    IMap<Integer, Integer> map = client.getMap("map");
    Scanner in = new Scanner(System.in);
    int startIndex= 0;
    int endIndex= 0;

    while(true) {
        if(args !=null && args.length > 0 && args[0].equals("BATCH")) {

            System.out.println("Please input the batch size");
            int b = in.nextInt();
            startIndex= endIndex + 1;
            endIndex+= b;
            System.out.println("Batch starts from  "+ startIndex +"ends at"+endIndex);
            putBatch(map,startIndex,endIndex);

        }
        else {
            System.out.println("Please input the map entry");
            int a = in.nextInt();
            System.out.println("You entered integer "+a);
            put(map,a,a);
        }
    }
}

public static void putBatch(IMap map,int startIndex, int endIndex) {
    int index= startIndex;
    System.out.println("Start Index" + startIndex +"End Index"+endIndex );
    while(index<=endIndex){
        System.out.println("Map Values"+ index);
        put(map,index,index);
        index+=1;
    }

}

public static void put(IMap map,int key,int value) {
    map.set(key, value);
}

这里是执行这个.的步骤

  1. 运行Application程序RemoteMapJournalSourceSrv1
  2. 运行应用程序B节点1- Java程序RemoteMapJournalSourceCL1
  3. 运行应用程序B节点2- Java程序RemoteMapJournalSourceCL2
  4. 运行应用程序A程序HZClient的Hazelcast客户端

此客户端程序基于控制台输入将条目放入映射中。请提供整数输入。

Observations

在执行时,.peek()记录应用程序B的两个节点的值,在应用程序A映射中插入一个条目时,列表计数变为2。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-04-10 09:25:34

这似乎是你提交两个独立的工作从两个喷气机客户端。每个作业接收所有IMap事件日志项并将它们推送到相同的IList,因此预期的结果是IList包含每个项的两个实例。

请记住,您只从Jet客户端提交作业,但它实际上在Jet集群中同时运行在其所有成员上。如果您只想在接收器中只提交一份数据,请不要提交同一作业两次。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49747170

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档