我是新使用这个OptaPy解算器。我试图通过使用它来解决我的工作量平衡问题。
我的作业问题是:我有一个用户列表和一个作业列表。每个作业只需要根据业务需求从用户列表中选出一个用户。一个用户可以处理多个作业。在分配之后,用户的工作负载应该尽可能地均匀。
一个用户的工作量计算为:用户的存在成本+作业成本除以用户的容量。
方差计算为:σ2=(Σ(x-μ)2)/ N. X是个人的工作量,μ是所有用户的平均工作量。N是用户的数量。
我的当前实现可以根据用户的最后最低工作负载分配任务。我仍然需要得到平均工作量,然后计算方差。不幸的是,我没有办法在文档和示例的基础上实现这个目标。有人能帮我吗?在此之前,非常感谢您。
把我的代码附在这里供你参考。
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration
@problem_fact
class Users:
def __init__(self, id, capacity, exists_cost):
self.id = id
self.capacity = capacity
self.exists_cost = exists_cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return (
f"User("
f"id={self.id}, "
f"capacity={self.capacity}, "
f"exists_cost={self.exists_cost})"
)
@planning_entity
class Jobs:
def __init__(self, id, job_id, role_id, user, cost, selected=None):
self.id = id
self.job_id = job_id
self.role_id = role_id
self.user = user
self.cost = cost
self.selected = selected
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ["selected_list"])
def get_selected(self):
return self.selected
def set_selected(self, selected):
self.selected = selected
def __str__(self):
return (
f"Job("
f"id={self.id}, "
f"job_id={self.job_id}, "
f"role_id={self.role_id}, "
f"user={self.user}, "
f"selected={self.selected}, "
f"cost={self.cost}"
f")"
)
def format_list(a_list):
return ',\n'.join(map(str, a_list))
@planning_solution
class Assignments:
def __init__(self, selected_list, job_list, score=None):
self.selected_list = selected_list
self.job_list = job_list
self.score = score
@problem_fact_collection_property(int)
@value_range_provider("selected_list")
def get_selected_list(self):
return self.selected_list
@planning_entity_collection_property(Jobs)
def get_job_list(self):
return self.job_list
@planning_score(HardSoftScore)
def get_score(self):
return self.score
def set_score(self, score):
self.score = score
def __str__(self):
return (
f"Assignments("
f"selected_list={format_list(self.selected_list)},\n"
f"job_list={format_list(self.job_list)},\n"
f"score={str(self.score.toString()) if self.score is not None else 'None'}"
f")"
)
@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
return [job_conflict(constraint_factory),
job_conflict1(constraint_factory),
user_conflict(constraint_factory),
]
def job_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected > 1) \
.penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)
def job_conflict1(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected == 0) \
.penalize("must_one_user", HardSoftScore.ONE_HARD)
def calculation(user, cost):
return (user.exists_cost + cost) * 10000 / user.capacity
def user_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.user,
ConstraintCollectors.conditionally(lambda job: job.selected == 1,
ConstraintCollectors.sum(lambda job: job.cost))) \
.penalize("Minimize Total Cost", HardSoftScore.ONE_SOFT,
lambda user, cost: int(calculation(user, cost)))
def generate_problem():
user_alice = Users('Alice', 100, 45)
user_bob = Users('Bob', 90, 54)
user_chris = Users('Chris', 80, 56)
user_dave = Users('Dave', 80, 52)
selected_list = [0, 1]
job_list = [
Jobs(1, 'Job2', 'Leader', user_bob, 10),
Jobs(2, 'Job2', 'Leader', user_chris, 10),
Jobs(3, 'Job3', 'Leader', user_alice, 5),
Jobs(4, 'Job3', 'Leader', user_bob, 5),
Jobs(5, 'Job6', 'Leader', user_alice, 5),
Jobs(6, 'Job6', 'Leader', user_bob, 5),
Jobs(7, 'Job6', 'Leader', user_dave, 5),
Jobs(8, 'Job7', 'Leader', user_bob, 10),
Jobs(9, 'Job7', 'Leader', user_dave, 10),
]
job = job_list[0]
job.set_selected(selected_list[0])
return Assignments(selected_list, job_list)
solver_config = optapy.config.solver.SolverConfig() \
.withEntityClasses(get_class(Jobs)) \
.withSolutionClass(get_class(Assignments)) \
.withConstraintProviderClass(get_class(constraints)) \
.withTerminationSpentLimit(Duration.ofSeconds(30))
solution = solver_factory_create(solver_config) \
.buildSolver() \
.solve(generate_problem())
print(solution)更新#2
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors, Joiners
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration
@problem_fact
class Users:
def __init__(self, id, capacity, exists_cost):
self.id = id
self.capacity = capacity
self.exists_cost = exists_cost
@planning_id
def get_id(self):
return self.id
def __str__(self):
return (
f"User("
f"id={self.id}, "
f"capacity={self.capacity}, "
f"exists_cost={self.exists_cost})"
)
@planning_entity
class Jobs:
def __init__(self, id, job_id, role_id, user, cost, selected=None):
self.id = id
self.job_id = job_id
self.role_id = role_id
self.user = user
self.cost = cost
self.selected = selected
@planning_id
def get_id(self):
return self.id
@planning_variable(int, ["selected_list"])
def get_selected(self):
return self.selected
def set_selected(self, selected):
self.selected = selected
def __str__(self):
return (
f"Job("
f"id={self.id}, "
f"job_id={self.job_id}, "
f"role_id={self.role_id}, "
f"user={self.user}, "
f"selected={self.selected}, "
f"cost={self.cost}"
f")"
)
def format_list(a_list):
return ',\n'.join(map(str, a_list))
@planning_solution
class Assignments:
def __init__(self, selected_list, job_list, score=None):
self.selected_list = selected_list
self.job_list = job_list
self.score = score
@problem_fact_collection_property(int)
@value_range_provider("selected_list")
def get_selected_list(self):
return self.selected_list
@planning_entity_collection_property(Jobs)
def get_job_list(self):
return self.job_list
@planning_score(HardSoftScore)
def get_score(self):
return self.score
def set_score(self, score):
self.score = score
def __str__(self):
return (
f"Assignments("
f"selected_list={format_list(self.selected_list)},\n"
f"job_list={format_list(self.job_list)},\n"
f"score={str(self.score.toString()) if self.score is not None else 'None'}"
f")"
)
@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
return [job_conflict(constraint_factory),
job_conflict1(constraint_factory),
get_total_job_cost_per_user(constraint_factory),
get_user_workload(constraint_factory),
]
def job_conflict(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected > 1) \
.penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)
def job_conflict1(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(Jobs) \
.groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
.filter(lambda job_id, selected: selected == 0) \
.penalize("must_one_user", HardSoftScore.ONE_HARD)
def calculation(user, cost):
return (user.exists_cost + cost) * 10000 / user.capacity
def get_total_job_cost_per_user(constraint_factory: ConstraintFactory):
return constraint_factory.for_each(Jobs) \
.groupBy(lambda job: job.user,
ConstraintCollectors.conditionally(lambda job: job.selected == 1,
ConstraintCollectors.sum(lambda job: job.cost)))
def get_user_workload(constraint_factory: ConstraintFactory):
return get_total_job_cost_per_user(constraint_factory) \
.groupBy(lambda user, cost: int(calculation(user, cost))) \
.penalize('Minimize', HardSoftScore.ONE_SOFT, lambda user, cost, workload: workload)
def generate_problem():
user_alice = Users('Alice', 100, 45)
user_bob = Users('Bob', 90, 54)
user_chris = Users('Chris', 80, 56)
user_dave = Users('Dave', 80, 52)
selected_list = [0, 1]
job_list = [
Jobs(1, 'Job2', 'Leader', user_bob, 10),
Jobs(2, 'Job2', 'Leader', user_chris, 10),
Jobs(3, 'Job3', 'Leader', user_alice, 5),
Jobs(4, 'Job3', 'Leader', user_bob, 5),
Jobs(5, 'Job6', 'Leader', user_alice, 5),
Jobs(6, 'Job6', 'Leader', user_bob, 5),
Jobs(7, 'Job6', 'Leader', user_dave, 5),
Jobs(8, 'Job7', 'Leader', user_bob, 10),
Jobs(9, 'Job7', 'Leader', user_dave, 10),
]
job = job_list[0]
job.set_selected(selected_list[0])
return Assignments(selected_list, job_list)
solver_config = optapy.config.solver.SolverConfig() \
.withEntityClasses(get_class(Jobs)) \
.withSolutionClass(get_class(Assignments)) \
.withConstraintProviderClass(get_class(constraints)) \
.withTerminationSpentLimit(Duration.ofSeconds(30))
solution = solver_factory_create(solver_config) \
.buildSolver() \
.solve(generate_problem())
print(solution)发布于 2022-10-10 17:34:29
方差可以计算为ConstraintCollectors的组合(下面假设有一个类DataPoint,您希望最小化‘值’的方差):
def minimize_variance(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(DataPoint)
.group_by(ConstraintCollectors.average(lambda point: point.value))
.join(DataPoint)
.group_by(ConstraintCollectors.compose(
ConstraintCollectors.sum(lambda avg, point: (point.value - avg)**2),
ConstraintCollectors.count_bi(),
lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
))
.penalize('Minimize variance', SimpleScore.ONE,
lambda variance: variance)
)int(((diff_sum / max(1, count)) * 100))是将方差和转换为整数(保持2小数的精度)。您可以根据需要增加或减少常量,以获得更高/更低的精度。你需要惩罚在OptaPlanner/OptaPy中的整数。
为了适应特定的问题,您需要首先从Users/Jobs对计算特定用户的负载:
def get_users_loads(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Users)
.join(constraint_factory.for_each(Jobs).filter(lambda job: job.selected == 1),
Joiners.equal(lambda user: user, lambda job: job.user))
.group_by(lambda user, job: user, ConstraintCollectors.count_bi())
)可以计算平均负载。
def get_average_load(constraint_factory: ConstraintFactory):
return (
get_users_loads(constraint_factory)
.group_by(ConstraintCollectors.average(lambda user, load: load))
)并把这一切结合在一起:
def minimize_user_load_variance(constraint_factory: ConstraintFactory):
(
get_user_loads(constraint_factory)
.join(get_average_load(constraint_factory))
.group_by(ConstraintCollectors.compose(
ConstraintCollectors.sum(lambda user, load, avg: (load - avg)**2),
ConstraintCollectors.count_tri(),
lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
))
.penalize('Minimize variance', HardSoftScore.ONE_SOFT,
lambda variance: variance)
)这篇关于OptaPlanner中公平性的博文可能会引起人们的兴趣:https://www.optaplanner.org/blog/2017/02/03/FormulaForMeasuringUnfairness.html
https://stackoverflow.com/questions/74011580
复制相似问题