首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从MapState元素内的多个线程读取flink process键值引发ConcurrentModification异常

从MapState元素内的多个线程读取flink process键值引发ConcurrentModification异常
EN

Stack Overflow用户
提问于 2020-04-03 17:21:47
回答 1查看 290关注 0票数 0

在我的flink作业中,我将数据保存在mapstate中几分钟,以便进一步处理,在process元素中,我需要在mapstate值的帮助下执行一组操作,所以我在mapstate内容的帮助下使用多个线程进行处理。线程没有修改状态,它只获取导致CocurrentModification异常的键值,这个TTL异常也出现在COnfigured TTL时间之前,日志如下。

代码语言:javascript
复制
`java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
    at java.util.concurrent.FutureTask.report(Futur`enter code here`eTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.XXXXXX.processResults(DataProcessor.java:198)
    at com.XXXXXX.processElement(DataProcessor.java:151)
    at com.XXXXXX.processElement(DataProcessor.java:1)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$753/1478413652.accept(Unknown Source)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$706/2073221180.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:60)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory$$Lambda$789/1622849283.run(Unknown Source)
    at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:59)
    at org.apache.flink.runtime.state.ttl.TtlMapState.get(TtlMapState.java:54)
    at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
    at com.XXXXXX.treamFunctions.findPreviousData(DATAExecutor.java:216)
    at com.XXXXXX.previousVal(DataExecutor.java:175)
    at com.XXXXXX.$FlinkStreamFunctions.execute(DATAExecutor.java:107)
    at com.XXXXXX.evaluateFunction(ExpressionEvaluator.java:89)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.runCleanup(TtlIncrementalCleanup.java:78)
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:58)
    ... 17 more

java apache-flink`

EN

回答 1

Stack Overflow用户

发布于 2020-04-03 19:32:03

简而言之,Flink的状态抽象不是为并发访问而设计的,不应该在多个线程之间共享。

长长的答案是,Flink的状态抽象不是简单的Map或仅存储某些值的值容器。在幕后,当用户通过这些抽象访问Flink状态时,可能会发生多种事情。例如,如果您配置了TTL,它将确保TTL未过期。此外,如果您使用键控流,它将确保您根据当前密钥获得正确的状态值。根据StateBackend操作的不同,抽象可能必须从磁盘读取状态并对其进行反序列化。

话虽如此,在满足某些条件的情况下,仍然可以在线程之间共享实际状态值。如果您的线程只使用状态,而不是写入状态,那么您可以读出状态并将实际值传递给其他线程。

如果您需要将并发线程的更新写回状态,那么事情就会变得有点复杂。您必须确保从Flink的Task线程访问该状态。此外,当您想要将状态从FutureTask写回Flink状态时,您必须确保您正在处理正确的密钥。因此,您需要一些记账来建立键与并发任务之间的映射。不过,请注意,这可能会变得相当棘手,特别是当您想要确保正确的故障转移行为时。

或者,你可以看看Flink的async I/O operator。使用异步I/O操作符时,不能从操作符访问状态。您必须输出结果,并在下游运算符中保持状态。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61008710

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档