假设我有一个具有多个节点的点燃集群和一个分区的非空IgniteCache,名为"TEST_CACHE“。然后在其中一个节点中运行以下代码:
ignite.compute().run(new IgniteRunnable(){
@IgniteInstanceResource
private Ignite ignite;
@Override
public void run() {
IgniteDataStreamer<String,Long> ds = ignite.dataStreamer("TEST_CACHE");
ds.receiver(new StreamTransformer<String,Long>(){
@Override
public Object process(MutableEntry<String, Long> entry, Object... arguments)
throws EntryProcessorException {
Long value = entry.getValue();
entry.setValue(value==null?1L:(value.longValue()+1L));
return null;
}
});
//loop for adding lots of String data
while(...)
ds.addData(...);
}
});这类似于普通的StreamTransformerExample代码,但是每个节点都将获得同一个缓存的DataStreamer实例,并同时调用addData方法。换句话说,对于不同节点中相同的字符串数据,可能有一个节点通过"Long value = entry.getValue()“获得了值,但没有执行下一行代码来设置值并将其更新到缓存中,那么另一个节点正在执行"entry.getValue()”。那么,在这个并发的StreamTransformer用例中是否可能更新错误的值呢?
发布于 2017-09-15 09:54:52
StreamReceiver.receive用您的入口处理器调用cache.invoke,因此条目被锁定在此操作中。所以是的,它是并行安全的。
顺便问一下,您在allowOverwrite中启用了DataStreamer吗?
https://stackoverflow.com/questions/46234856
复制相似问题