我在Scala/Spark中有一个批处理作业,它根据一些输入动态创建Drools规则,然后计算规则。我还有一个输入RDD[T],它对应于要插入到规则引擎中的事实。
到目前为止,我正在逐个插入事实,然后触发关于此事实的所有规则。我正在使用rdd.aggregate来做这件事。
seqOp运算符的定义如下:
/**
* @param broadcastRules the broadcasted KieBase object containing all rules
* @param aggregator used to accumulate values when rule matches
* @param item the fact to run Drools with
* @tparam T the type of the given item
* @return the updated aggregator
*/
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
aggregator: MyAggregator,
item: T) : MyAggregator = {
val session = broadcastRules.value.newStatelessKieSession
session.setGlobal("aggregator", aggregator)
session.execute(CommandFactory.newInsert(item))
aggregator
}以下是生成的规则的示例:
dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
when condition
then do something on the aggregator
end 对于相同的RDD,批处理花了20分钟来评估3K规则,但花了10小时来评估10K规则!
我想知道按事实插入事实是否是最好的方法。一次插入RDD的所有项比触发所有规则更好吗?对我来说,这似乎不是最好的,因为所有的事实都会同时存在于工作记忆中。
你看到上面的代码有什么问题吗?
发布于 2018-05-31 20:32:29
最后我发现了问题所在,它更多地与规则匹配时在聚合器上所做的操作有关,而不是与规则的评估有关。
https://stackoverflow.com/questions/50600288
复制相似问题