首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >apache Flink中flink连接流面临的竞争情况

apache Flink中flink连接流面临的竞争情况
EN

Stack Overflow用户
提问于 2018-08-01 15:49:36
回答 1查看 308关注 0票数 0

在flink connected streams中实现进程功能时面临竞争条件。我有一个在两个函数processElement1 & processElement2之间共享的Cache Map,它被两个不同的线程并行调用。

Streams1->(发送报价数据)

Streams2->(发送lms(忠诚度管理系统数据))

代码语言:javascript
复制
connect=Streams1.connect(Streams2);

connect.process(new TriggerStream);

TriggerStream Class中,我使用惟一的Id:MemberId存储数据,作为unique Key将& lookup data存储在缓存中。当数据流进来时,我得不到组合的结果

代码语言:javascript
复制
class LRUConcurrentCache<K,V>{
    private final Map<K,V> cache;
    private final int maxEntries;
    public LRUConcurrentCache(final int maxEntries) {
        this.cache = new LinkedHashMap<K,V>(maxEntries, 0.75F, true) {
            private static final long serialVersionUID = -1236481390177598762L;
            @Override
            protected boolean removeEldestEntry(Map.Entry<K,V> eldest){
                return size() > maxEntries;
            }
        };
    }
    //Why we cant lock on the key
    public void put(K key, V value) {
        synchronized(key) {
            cache.put(key, value);
        }
    }

    //get methode
    public V get(K key) {
        synchronized(key) {
            return cache.get(key);
        }
    }



public class TriggerStream extends CoProcessFunction<IOffer, LMSData, String> {


    private static final long serialVersionUID = 1L;
    LRUCache cache; 
    private String offerNode;
    String updatedValue, retrivedValue;
    Subscriber subscriber;

    TriggerStream(){
        this.cache== new LRUCache(10);
    }



@Override
    public void processElement1(IOffer offer) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub
            IOffer latestOffer = offer;

            //Check the subscriber is there or not

            retrivedValue = cache.get(latestOffer.getMemberId().toString());
            if ((retrivedValue == null)) {
                //Subscriber is the class that is used and converted as Json String & then store into map
                Subscriber subscriber = new Subscriber();
                subscriber.setMemberId(latestOffer.getMemberId());
                ArrayList<IOffer> offerList = new ArrayList<IOffer>();
                offerList.add(latestOffer);
                subscriber.setOffers(offerList);
                updatedValue = mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            } else {
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                List<IOffer> offers = subscriber.getOffers();
                offers.add(latestOffer);
                updatedValue= mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), subscriberUpdatedValue);
            }
        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

@Override
    public void processElement2(LMSData lms) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub

            //Check the subscriber is there or not

            retrivedValue = cache.get(lms.getMemberId().toString());
            if(retrivedValue !=null){
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                //do some calculations 
                String updatedValue = mapper.writeValueAsString(subscriber);
                //Update value
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            }

        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

}   
EN

回答 1

Stack Overflow用户

发布于 2018-08-01 16:00:48

Flink不保证CoProcessFunction (或任何其他Co*函数)摄取数据的顺序。在分布式并行任务中维护某种确定性顺序的代价太高了。

相反,你必须在你的函数中使用状态和可能的计时器来解决这个问题。函数中的状态应该维护为LRUCache (可能是keyed state)。否则,它将在失败的情况下丢失。您可以为第一个流和缓冲区记录添加另一个状态,直到来自第二个流的查找值到达为止。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51628037

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档