这不是(应该是)一个意见问题,但它是一个新手问题,所以如果我还没有找到我需要阅读的资源,请指出:)
我正处于基于微服务的ETL系统的设计阶段。特别是有一种服务似乎是通过oop进行函数式编程的自然选择,但我不想成为一个寻找问题的解决方案。
服务将以GraphFragment的形式接收复杂的数据对象(如json)。
[
{
"type":"node",
"label":"Product",
"properties": {}
},
{
"type":"node",
"label":"Factor",
"properties": {},
},
{
"type":"Edge",
"label":"INCLUDES_FACTOR",
"properties": {},
"from": 0, // Product
"to": 1, // Factor
},
]除了具有数百个节点和边(可能是数千个,但没有更多的节点和边)更复杂之外,每个节点和边缘都具有许多属性。
然后,微服务会询问该图形片段并生成一个报告(也是在类似json的对象中)。在该报告中将有两种不同的“类别”领域:
在最终将最终产品放入kafka队列之前,该服务本身没有写任何东西的责任。除了图形片段之外,它不需要其他任何东西。
对于每个函数集合都向报表对象添加一个属性的管道来说,这似乎是一个很好的选择。每个函数都会接收到整个图,并且应该对以前和将来的函数的输出视而不见。
其他一些要求将是快速和资源效率高的。这一具体过程将比其他过程使用得更多,而且必须尽可能地发挥作用。并行运行多个函数是很有吸引力的。
我想我有两个问题:
为什么那声音记忆对我来说是浪费的?
我很感谢你的任何想法,并乐意补充澄清。
发布于 2017-07-19 19:48:44
这是函数式编程的一个很好的用例,可能是Kotlin或Scala (或Python),它们是应用程序整体中使用最多的三种语言。
天真地说,这似乎很好。遍历图收集状态似乎具有明显的功能(如果可以映射/减少节点,而不是按顺序遍历图,则更好)。
如果数据结构是不可变的,那么一个函数如何向报表添加属性?它是否传递了报表的当前状态,然后返回该报告的副本和新属性。
有可能。更好的解决方案(如果实际上需要在将结果写入队列之前收集所有结果)可能是返回一个链接列表类型的对象,该对象是新属性和指向报表的“先前”状态的指针。一旦处理完成,它将返回最后一个属性,该属性指向前一个属性,指向前一个属性,以此类推。
发布于 2017-07-19 20:51:20
对于每个函数集合都向报表对象添加一个属性的管道来说,这似乎是一个很好的选择。每个函数都会接收到整个图,并且应该对以前和将来的函数的输出视而不见。
不要每次只添加一个属性。使用类似地图的操作来创建最终的字典。在python中是这样的:
# start with a dictionary mapping the attribute to the function
# that should produce that attribute
functions_to_apply = {
'foobar': foobar_function,
'goat': goat_function
}
# send the function and graph parameter to an executor who will run
# the operation in parallel and return a future.
with_futures = {key: executor.submit(value, graph) for key, value in functions_to_apply.items()}
# resolve all of the future to get your final object
final_report = {key: value.get() for key, value in with_futures.items()}发布于 2017-07-19 21:11:10
如果数据结构是不可变的,那么一个函数如何向报表添加属性?它是否传递了报表的当前状态,然后返回该报告的副本和新属性。
在函数式语言中,您可以返回一个修改报告的函数,并将报告折叠到返回的函数(Scala):
type ReportModifier = Report => Report
type PipelineFunction = GraphFragment => ReportModifier
def executePipelineFunctionsInParallel(
report: Report,
pipelineFunctions: List[PipelineFunction],
graphFragment: GraphFragment
): Future[Report] = {
// Run our pipeline functions in parallel
// Collect their results into Future[List[ReportModifier]]
Future.traverse(pipelineFunctions)(pipelineFunction => Future {
pipelineFunction(graphFragment)
}).map(reportModifiers => reportModifiers.foldLeft(report) {
case (report, reportModifier) => reportModifier(report)
})
}https://softwareengineering.stackexchange.com/questions/353115
复制相似问题