我是OptaPlanner的新手,我想测试它在整数网络流问题上的能力。
我试图建模的问题相对来说很简单:
给出了一个具有节点和边的有向图(其中边是(从节点到节点到)对)。integer.
h 111流守恒:对于每个非源和非接收器节点,总流必须等于总出流(硬penalty).
)。
首先,我使用OptaPy对问题事实进行建模,如下所示:
from optapy import problem_fact, planning_id
@problem_fact
class Node:
def __init__(self, id, lb, ub, cost):
self.id = id
self.lb = lb
self.ub = ub
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'
@problem_fact
class Edge:
def __init__(self, id, node_from, node_to, cost):
self.id = id
self.node_from = node_from
self.node_to = node_to
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'我猜规划实体和变量是类似的吗?
from optapy import planning_entity, planning_variable
@planning_entity
class NodeFlow:
def __init__(self, id, node, flow=None):
self.id = id
self.node = node
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(Node, ['nodeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
@planning_entity
class EdgeFlow:
def __init__(self, id, edge, flow=None):
self.id = id
self.edge = edge
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(Node, ['edgeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow但是,现在我不得不对约束进行建模。我不清楚该如何建模,例如,流守恒。我的代码框架如下:
from optapy import constraint_provider
from optapy.types import Joiners, HardSoftScore
@constraint_provider
def define_constraints(constraint_factory):
return [
# Hard constraints
flow_conservation(constraint_factory)
]
def flow_conservation(constraint_factory):
return constraint_factory \
.for_each(Node) \
#.group_by(?) # To sum in-flows and out-flows per node?
.join(Node,
# ?
) \
.penalize('Flow conservation conflict', HardSoftScore.ONE_HARD)这是模拟这样一个问题的正确方法吗?我如何建模,例如,流守恒?
谢谢你的帮助!
发布于 2022-08-17 00:03:54
可以将"flow_bounds“建模为”内置“约束,方法是在"@planning_entity”(如https://www.optapy.org/docs/latest/planner-configuration/planner-configuration.html#valueRangeProviderOnPlanningEntity中描述的)上使用"@value_range_provider“,但似乎有一个bug需要用@value_range_provider on @planning_entity修复)(一旦发布了使用修复程序的构建,就会添加如何进行修复)。
由于"flow_conservation“只适用于非源/非接收器,我建议在Node中添加一个类别字段,以确定它是源还是接收器(或仅仅是一个正常节点)。从NodeFlow和EdgeFlow类中,flow被声明为Node,但从问题描述来看,它应该是一个整数。我搞不懂NodeFlow代表什么(对于给定的outgoing - incoming是否应该是outgoing - incoming,因此除了source和sink节点之外,应该始终为0)。
"cost_minimization“可以分为node_cost_minimization和edge_cost_minimization,这两种方法可以将产品item.flow * item.cost相加,并对其进行惩罚。把所有的限制因素加在一起,应该是这样的:
from optapy import constraint_provider
from optapy.constraint import Joiners, ConstraintCollectors
from optapy.score import HardSoftScore
from domain import Node, Edge, NodeFlow, EdgeFlow
@constraint_provider
def flow_constraints(constraint_factory):
return [
flow_conservation(constraint_factory),
minimize_node_cost(constraint_factory),
minimize_edge_cost(constraint_factory),
outgoing_bounds(constraint_factory),
incoming_bounds(constraint_factory),
]
def outgoing_bounds(constraint_factory):
return (
constraint_factory.for_each(Node)
# Join with outgoing edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id, lambda edge_flow: edge_flow.edge.node_from.id))
# Get total outgoing flow
.group_by(lambda node, outgoing_edge: node,
ConstraintCollectors.sum(lambda node, outgoing_edge: outgoing_edge.flow))
# Only penalize if outgoing_flow is not in bounds for node
.filter(lambda node, outgoing_flow: outgoing_flow < node.lb or outgoing_flow > node.ub)
.penalize('source_bounds', HardSoftScore.ONE_HARD, lambda node, outgoing_flow: 1000 * max(node.lb - outgoing_flow, outgoing_flow - node.ub))
)
def incoming_bounds(constraint_factory):
return (
constraint_factory.for_each(Node)
# Join with incoming edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id,
lambda edge_flow: edge_flow.edge.node_to.id))
# Get total incoming flow
.group_by(lambda node, incoming_edge: node,
ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
# Only penalize if outgoing_flow is not in bounds for node
.filter(lambda node, incoming_flow: incoming_flow < node.lb or incoming_flow > node.ub)
.penalize('sink_bounds', HardSoftScore.ONE_HARD, lambda node, incoming_flow: 1000 * max(node.lb - incoming_flow, incoming_flow - node.ub))
)
def flow_conservation(constraint_factory):
return (
constraint_factory.for_each(Node)
# Do not include source and sink nodes
.filter(lambda node: node.kind != 'source' and node.kind != 'sink')
# Join with incoming edge flows
.join(EdgeFlow, Joiners.equal(lambda node: node.id,
lambda edge_flow: edge_flow.edge.node_from.id))
# Get total incoming flow
.group_by(lambda node, incoming_edge: node,
ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
# Join with outgoing edge flows
.join(EdgeFlow, Joiners.equal(lambda node, incoming_flow: node.id,
lambda edge_flow: edge_flow.edge.node_to.id))
# Get total outgoing flow
.group_by(lambda node, incoming_flow, outgoing_edge: node,
lambda node, incoming_flow, outgoing_edge: incoming_flow,
ConstraintCollectors.sum(lambda node, incoming_flow, outgoing_edge: outgoing_edge.flow))
# Only penalize if incoming flow != outgoing flow
.filter(lambda node, incoming_flow, outgoing_flow: incoming_flow != outgoing_flow)
.penalize('flow_conservation', HardSoftScore.ONE_HARD, lambda node, incoming_flow, outgoing_flow: abs(incoming_flow - outgoing_flow))
)
def minimize_node_cost(constraint_factory):
return (
constraint_factory.for_each(NodeFlow)
.group_by(ConstraintCollectors.sum(lambda node_flow: node_flow.node.cost * node_flow.flow))
.penalize('node flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
)
def minimize_edge_cost(constraint_factory):
return (
constraint_factory.for_each(EdgeFlow)
.group_by(ConstraintCollectors.sum(lambda edge_flow: edge_flow.edge.cost * edge_flow.flow))
.penalize('edge flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
)我使用的域对象:
from optapy import problem_fact, planning_id
@problem_fact
class Node:
def __init__(self, id, lb, ub, cost, kind):
self.id = id
self.lb = lb
self.ub = ub
self.kind = kind
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'
@problem_fact
class Edge:
def __init__(self, id, node_from, node_to, lb, ub, cost):
self.id = id
self.node_from = node_from
self.node_to = node_to
self.lb = lb
self.ub = ub
self.cost = cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'
from optapy import planning_entity, planning_variable, planning_solution, planning_score, \
planning_entity_collection_property, problem_fact_collection_property, value_range_provider
from optapy.score import HardSoftScore
@planning_entity
class NodeFlow:
def __init__(self, id, node, flow=None):
self.id = id
self.node = node
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(object, ['nodeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
def __str__(self):
return f'{self.node}: {self.flow}'
@planning_entity
class EdgeFlow:
def __init__(self, id, edge, flow=None):
self.id = id
self.edge = edge
self.flow = flow
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ['edgeFlowRange'])
def get_flow(self):
return self.flow
def set_flow(self, new_flow):
self.flow = new_flow
def __str__(self):
return f'{self.edge}: {self.flow}'
@planning_solution
class NetworkFlow:
def __init__(self, nodes, edges, node_flows, edge_flows, score):
self.nodes = nodes
self.edges = edges
self.node_flows = node_flows
self.edge_flows = edge_flows
self.score = score
@problem_fact_collection_property(Node)
def get_nodes(self):
return self.nodes
@problem_fact_collection_property(Edge)
def get_edges(self):
return self.edges
@planning_entity_collection_property(NodeFlow)
def get_node_flows(self):
return self.node_flows
@planning_entity_collection_property(EdgeFlow)
def get_edge_flows(self):
return self.edge_flows
@planning_score(HardSoftScore)
def get_score(self):
return self.score
@problem_fact_collection_property(int)
@value_range_provider('edgeFlowRange')
def get_edge_flow_range(self):
if len(self.edges) == 0:
return [0]
else:
lb = min(map(lambda edge: edge.lb, self.edges))
ub = max(map(lambda edge: edge.ub, self.edges))
return [i for i in range(lb, ub + 1)]
@problem_fact_collection_property(int)
@value_range_provider('nodeFlowRange')
def get_node_flow_range(self):
if len(self.nodes) == 0:
return [0]
else:
lb = min(map(lambda node: node.lb, self.nodes))
ub = max(map(lambda node: node.ub, self.nodes))
return [i for i in range(lb, ub + 1)]
def set_score(self, score):
self.score = score
def __str__(self):
return (f'Nodes:\n' +
f'\n'.join(list(map(str, self.node_flows))) +
f'\nEdges:\n' +
f'\n'.join(list(map(str, self.edge_flows))) +
f'\nScore: {self.score}'
)
def generate_problem():
nodes = [
Node('source', 10, 10, 1, 'source'),
Node('sink', 10, 10, 1, 'sink'),
Node('a', 0, 2, 1, 'node'),
Node('b', 0, 8, 2, 'node'),
Node('c', 0, 3, 1, 'node'),
]
edges = [
Edge('source-a', nodes[0], nodes[2], 0, 2, 1),
Edge('source-b', nodes[0], nodes[3], 0, 10, 2),
Edge('b-c', nodes[3], nodes[4], 0, 3, 1),
Edge('a-sink', nodes[2], nodes[1], 0, 2, 1),
Edge('b-sink', nodes[3], nodes[1], 0, 5, 2),
Edge('c-sink', nodes[4], nodes[1], 0, 3, 1),
]
node_flows = []
edge_flows = list(map(lambda edge: EdgeFlow(edge.id, edge), edges))
return NetworkFlow(nodes, edges, node_flows, edge_flows, None)它可以更简单(例如,您可以将计划变量"flow“直接放入Node和Edge中,并删除NodeFlow和EdgeFlow)。请注意,OptaPy目前比OptaPlanner慢得多,但是我们正在努力改进它的性能(希望在下一个版本中)。
https://stackoverflow.com/questions/73378290
复制相似问题