我想要创建自定义的累加器,在使用它们时我感到不安全,因为我现在只能在本地测试它们。
我的问题是:
在创建累加器时,不变性是“必须”还是“应该”?
虽然我现在找不到链接/引用,但我读到只有不可变的对象才允许累加器。然而,在spark的api(1.6)中,addInPlace方法AccumulableParam和AccumulatorParam的描述如下:“合并两个累加值。允许修改并返回第一个值以提高效率(以避免分配对象)。”
哪一个是对的?如果允许可变对象,如何使用它们来安全地创建累加器?
比方说,我有一个带一个字段的可变类,让这个字段是一个整数数组。当我们有一个可变的类时,如何覆盖addInPlace方法?
我应该写(Option1):
public MyClass addInPlace(MyClass c1, MyClass c2){
c1.update(c2); //Where int array of c1 is updated(let's say we add two arrays) and c1 itself is returned.
return c1;
}或者我应该写(Option2):
public MyClass addInPlace(MyClass c1, MyClass c2){
return update2(c1,c2); //Where a new MyClass object is returned with an array(created by adding arrays of c1 and c2)
}Option2看起来更安全,但需要额外的分配。然而,上面引用API的话说,允许修改以避免分配。
此外,如果我有一个对象数组(比如MyClass2),而不是整数数组,那么我应该克隆对象还是使用对象本身。假设我想为PriorityQueue of MyClass2创建一个累加器(也许我应该为这个问题输入另一个条目?)
我将感谢任何关于累加器/火花的答复和高级参考/文档,尤其是在java中。
编辑:
我感谢zero323的回答。
我希望我能找到让我困惑的联系,但是现在事情变得更清楚了。不过,我还有两个问题要问。
1)我遇到了以下累加器实现,以跟踪日志文件中浏览器类型出现的次数。您可以从(https://brosinski.com/post/extending-spark-accumulators/)中看到详细信息。
以下是实现:
public class MapAccumulator implements AccumulatorParam<Map<String, Long>>, Serializable {
@Override
public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) {
return mergeMap(t1, t2);
}
@Override
public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) {
return mergeMap(r1, r2);
}
@Override
public Map<String, Long> zero(final Map<String, Long> initialValue) {
return new HashMap<>();
}
private Map<String, Long> mergeMap( Map<String, Long> map1, Map<String, Long> map2) {
Map<String, Long> result = new HashMap<>(map1);
map2.forEach((k, v) -> result.merge(k, v, (a, b) -> a + b));
return result;
}
}我的问题是:
为什么我们没有
map2.forEach((k, v) -> map1.merge(k, v, (a, b) -> a + b));另外,假设我想要一个
Map<Integer, ArrayList<MyClass>> or ArrayList<ArrayList<MyClass>>我可以要(Option1)这样的东西吗?
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
for(int i=0;i<a2.size();i++){
a1.get(i).addAll(a2.get(i))
}
return a1;
}或者我应该写(Option2):
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
ArrayList<ArrayList<MyClass>> result= new ArrayList<ArrayList<MyClass>>();
for(int i=0;i<a1.size();i++){
result.add(new ArrayList<MyClass>());
result.get(i).addAll(a1.get(i));
result.get(i).addAll(a2.get(i));
}
return result;
}那么,就蓄电池的安全性而言,两种选择之间是否有区别?
2)说累加器不是线程安全的,这是否意味着rdd元素可以多次更新累加器?或者您的意思是在处理过程中使用的对象可以由另一个线程从代码中的其他地方更改?
还是只有当将累加器传送到驱动程序时才会出现问题,如链接zero323 shared (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulable.scala#L43)中所写的那样:
“如果这个[可累积]是内部的。内部[可累积]s将通过心跳向驱动程序报告。对于内部[可累积]s,R必须是线程安全的,这样才能正确地报告它们。”
我对长期的加入表示歉意,但我希望这也会对社会有所帮助。
发布于 2016-03-23 23:33:37
创建自定义累加器时是否需要不变性?不不是的。您已经发现AccumulableParam.addAccumulator和AccumulableParam.addInPlace都显式地允许修改第一个参数。如果您深入研究,您将看到这个场景实际上是在AccumulatorSuite中测试的,其中使用了下面的param:
new AccumulableParam[mutable.Set[A], A] {
def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
t1 ++= t2
t1
}
def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
t1 += t2
t1
}
def zero(t: mutable.Set[A]) : mutable.Set[A] = {
new mutable.HashSet[A]()
}
}直观地说,因为每个任务都有自己的累加器,并且以顺序的方式在分区上操作,所以不应该出现易变成为问题的情况。
尽管如此,正如在其他地方所说的累加器并不是线程安全的。因此,您可能应该忘记在分区级别上将累加器与并行处理组合在一起。
https://stackoverflow.com/questions/36188617
复制相似问题