首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何利用光学中的方差测量工作负载平衡

如何利用光学中的方差测量工作负载平衡
EN

Stack Overflow用户
提问于 2022-10-10 07:41:07
回答 1查看 49关注 0票数 0

我是新使用这个OptaPy解算器。我试图通过使用它来解决我的工作量平衡问题。

我的作业问题是:我有一个用户列表和一个作业列表。每个作业只需要根据业务需求从用户列表中选出一个用户。一个用户可以处理多个作业。在分配之后,用户的工作负载应该尽可能地均匀。

一个用户的工作量计算为:用户的存在成本+作业成本除以用户的容量。

方差计算为:σ2=(Σ(x-μ)2)/ N. X是个人的工作量,μ是所有用户的平均工作量。N是用户的数量。

我的当前实现可以根据用户的最后最低工作负载分配任务。我仍然需要得到平均工作量,然后计算方差。不幸的是,我没有办法在文档和示例的基础上实现这个目标。有人能帮我吗?在此之前,非常感谢您。

把我的代码附在这里供你参考。

代码语言:javascript
复制
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
    planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
    get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration


@problem_fact
class Users:
    def __init__(self, id, capacity, exists_cost):
        self.id = id
        self.capacity = capacity
        self.exists_cost = exists_cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return (
            f"User("
            f"id={self.id}, "
            f"capacity={self.capacity}, "
            f"exists_cost={self.exists_cost})"
        )


@planning_entity
class Jobs:
    def __init__(self, id, job_id, role_id, user, cost, selected=None):
        self.id = id
        self.job_id = job_id
        self.role_id = role_id
        self.user = user
        self.cost = cost
        self.selected = selected

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(int, ["selected_list"])
    def get_selected(self):
        return self.selected

    def set_selected(self, selected):
        self.selected = selected

    def __str__(self):
        return (
            f"Job("
            f"id={self.id}, "
            f"job_id={self.job_id}, "
            f"role_id={self.role_id}, "
            f"user={self.user}, "
            f"selected={self.selected}, "
            f"cost={self.cost}"
            f")"
        )


def format_list(a_list):
    return ',\n'.join(map(str, a_list))


@planning_solution
class Assignments:
    def __init__(self, selected_list, job_list, score=None):
        self.selected_list = selected_list
        self.job_list = job_list
        self.score = score

    @problem_fact_collection_property(int)
    @value_range_provider("selected_list")
    def get_selected_list(self):
        return self.selected_list

    @planning_entity_collection_property(Jobs)
    def get_job_list(self):
        return self.job_list

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    def set_score(self, score):
        self.score = score

    def __str__(self):
        return (
            f"Assignments("
            f"selected_list={format_list(self.selected_list)},\n"
            f"job_list={format_list(self.job_list)},\n"
            f"score={str(self.score.toString()) if self.score is not None else 'None'}"
            f")"
        )


@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
    return [job_conflict(constraint_factory),
            job_conflict1(constraint_factory),
            user_conflict(constraint_factory),
            ]


def job_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected > 1) \
        .penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)


def job_conflict1(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected == 0) \
        .penalize("must_one_user", HardSoftScore.ONE_HARD)


def calculation(user, cost):
    return (user.exists_cost + cost) * 10000 / user.capacity


def user_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.user,
                 ConstraintCollectors.conditionally(lambda job: job.selected == 1,
                                                    ConstraintCollectors.sum(lambda job: job.cost))) \
        .penalize("Minimize Total Cost", HardSoftScore.ONE_SOFT,
                  lambda user, cost: int(calculation(user, cost)))


def generate_problem():
    user_alice = Users('Alice', 100, 45)
    user_bob = Users('Bob', 90, 54)
    user_chris = Users('Chris', 80, 56)
    user_dave = Users('Dave', 80, 52)

    selected_list = [0, 1]
    job_list = [
        Jobs(1, 'Job2', 'Leader', user_bob, 10),
        Jobs(2, 'Job2', 'Leader', user_chris, 10),
        Jobs(3, 'Job3', 'Leader', user_alice, 5),
        Jobs(4, 'Job3', 'Leader', user_bob, 5),
        Jobs(5, 'Job6', 'Leader', user_alice, 5),
        Jobs(6, 'Job6', 'Leader', user_bob, 5),
        Jobs(7, 'Job6', 'Leader', user_dave, 5),
        Jobs(8, 'Job7', 'Leader', user_bob, 10),
        Jobs(9, 'Job7', 'Leader', user_dave, 10),
    ]
    job = job_list[0]
    job.set_selected(selected_list[0])

    return Assignments(selected_list, job_list)


solver_config = optapy.config.solver.SolverConfig() \
    .withEntityClasses(get_class(Jobs)) \
    .withSolutionClass(get_class(Assignments)) \
    .withConstraintProviderClass(get_class(constraints)) \
    .withTerminationSpentLimit(Duration.ofSeconds(30))

solution = solver_factory_create(solver_config) \
    .buildSolver() \
    .solve(generate_problem())

print(solution)

更新#2

代码语言:javascript
复制
import optapy.config
from optapy import planning_entity, planning_variable, planning_id, value_range_provider, planning_solution, \
    planning_score, planning_entity_collection_property, problem_fact_collection_property, constraint_provider, \
    get_class, problem_fact
from optapy import solver_factory_create
from optapy.constraint import ConstraintCollectors, Joiners
from optapy.constraint import ConstraintFactory
from optapy.score import HardSoftScore
from optapy.types import Duration


@problem_fact
class Users:
    def __init__(self, id, capacity, exists_cost):
        self.id = id
        self.capacity = capacity
        self.exists_cost = exists_cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return (
            f"User("
            f"id={self.id}, "
            f"capacity={self.capacity}, "
            f"exists_cost={self.exists_cost})"
        )


@planning_entity
class Jobs:
    def __init__(self, id, job_id, role_id, user, cost, selected=None):
        self.id = id
        self.job_id = job_id
        self.role_id = role_id
        self.user = user
        self.cost = cost
        self.selected = selected

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(int, ["selected_list"])
    def get_selected(self):
        return self.selected

    def set_selected(self, selected):
        self.selected = selected

    def __str__(self):
        return (
            f"Job("
            f"id={self.id}, "
            f"job_id={self.job_id}, "
            f"role_id={self.role_id}, "
            f"user={self.user}, "
            f"selected={self.selected}, "
            f"cost={self.cost}"
            f")"
        )


def format_list(a_list):
    return ',\n'.join(map(str, a_list))


@planning_solution
class Assignments:
    def __init__(self, selected_list, job_list, score=None):
        self.selected_list = selected_list
        self.job_list = job_list
        self.score = score

    @problem_fact_collection_property(int)
    @value_range_provider("selected_list")
    def get_selected_list(self):
        return self.selected_list

    @planning_entity_collection_property(Jobs)
    def get_job_list(self):
        return self.job_list

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    def set_score(self, score):
        self.score = score

    def __str__(self):
        return (
            f"Assignments("
            f"selected_list={format_list(self.selected_list)},\n"
            f"job_list={format_list(self.job_list)},\n"
            f"score={str(self.score.toString()) if self.score is not None else 'None'}"
            f")"
        )


@constraint_provider
def constraints(constraint_factory: ConstraintFactory):
    return [job_conflict(constraint_factory),
            job_conflict1(constraint_factory),
            get_total_job_cost_per_user(constraint_factory),
            get_user_workload(constraint_factory),
            ]


def job_conflict(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected > 1) \
        .penalize("only_one_user", HardSoftScore.ONE_HARD, lambda job_id, selected: selected - 1)


def job_conflict1(constraint_factory: ConstraintFactory):
    return constraint_factory.forEach(Jobs) \
        .groupBy(lambda job: job.job_id, ConstraintCollectors.sum(lambda job: job.selected)) \
        .filter(lambda job_id, selected: selected == 0) \
        .penalize("must_one_user", HardSoftScore.ONE_HARD)


def calculation(user, cost):
    return (user.exists_cost + cost) * 10000 / user.capacity                                            


def get_total_job_cost_per_user(constraint_factory: ConstraintFactory):
    return constraint_factory.for_each(Jobs) \
        .groupBy(lambda job: job.user,
                 ConstraintCollectors.conditionally(lambda job: job.selected == 1,
                                                    ConstraintCollectors.sum(lambda job: job.cost)))


def get_user_workload(constraint_factory: ConstraintFactory):
    return get_total_job_cost_per_user(constraint_factory) \
        .groupBy(lambda user, cost: int(calculation(user, cost))) \
        .penalize('Minimize', HardSoftScore.ONE_SOFT, lambda user, cost, workload: workload)


def generate_problem():
    user_alice = Users('Alice', 100, 45)
    user_bob = Users('Bob', 90, 54)
    user_chris = Users('Chris', 80, 56)
    user_dave = Users('Dave', 80, 52)

    selected_list = [0, 1]
    job_list = [
        Jobs(1, 'Job2', 'Leader', user_bob, 10),
        Jobs(2, 'Job2', 'Leader', user_chris, 10),
        Jobs(3, 'Job3', 'Leader', user_alice, 5),
        Jobs(4, 'Job3', 'Leader', user_bob, 5),
        Jobs(5, 'Job6', 'Leader', user_alice, 5),
        Jobs(6, 'Job6', 'Leader', user_bob, 5),
        Jobs(7, 'Job6', 'Leader', user_dave, 5),
        Jobs(8, 'Job7', 'Leader', user_bob, 10),
        Jobs(9, 'Job7', 'Leader', user_dave, 10),
    ]
    job = job_list[0]
    job.set_selected(selected_list[0])

    return Assignments(selected_list, job_list)


solver_config = optapy.config.solver.SolverConfig() \
    .withEntityClasses(get_class(Jobs)) \
    .withSolutionClass(get_class(Assignments)) \
    .withConstraintProviderClass(get_class(constraints)) \
    .withTerminationSpentLimit(Duration.ofSeconds(30))

solution = solver_factory_create(solver_config) \
    .buildSolver() \
    .solve(generate_problem())

print(solution)
EN

回答 1

Stack Overflow用户

发布于 2022-10-10 17:34:29

方差可以计算为ConstraintCollectors的组合(下面假设有一个类DataPoint,您希望最小化‘值’的方差):

代码语言:javascript
复制
def minimize_variance(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(DataPoint)
            .group_by(ConstraintCollectors.average(lambda point: point.value))
            .join(DataPoint)
            .group_by(ConstraintCollectors.compose(
                ConstraintCollectors.sum(lambda avg, point: (point.value - avg)**2),
                ConstraintCollectors.count_bi(),
                lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
            ))
            .penalize('Minimize variance', SimpleScore.ONE,
                      lambda variance: variance)
    )

int(((diff_sum / max(1, count)) * 100))是将方差和转换为整数(保持2小数的精度)。您可以根据需要增加或减少常量,以获得更高/更低的精度。你需要惩罚在OptaPlanner/OptaPy中的整数。

为了适应特定的问题,您需要首先从Users/Jobs对计算特定用户的负载:

代码语言:javascript
复制
def get_users_loads(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Users)
            .join(constraint_factory.for_each(Jobs).filter(lambda job: job.selected == 1), 
                  Joiners.equal(lambda user: user, lambda job: job.user))
            .group_by(lambda user, job: user, ConstraintCollectors.count_bi())
    )

可以计算平均负载。

代码语言:javascript
复制
def get_average_load(constraint_factory: ConstraintFactory):
    return (
        get_users_loads(constraint_factory)
            .group_by(ConstraintCollectors.average(lambda user, load: load))
    )

并把这一切结合在一起:

代码语言:javascript
复制
def minimize_user_load_variance(constraint_factory: ConstraintFactory):
    (
        get_user_loads(constraint_factory)
            .join(get_average_load(constraint_factory))
            .group_by(ConstraintCollectors.compose(
                      ConstraintCollectors.sum(lambda user, load, avg: (load - avg)**2),
                      ConstraintCollectors.count_tri(),
                      lambda diff_sum, count: int(((diff_sum / max(1, count)) * 100))
            ))
            .penalize('Minimize variance', HardSoftScore.ONE_SOFT,
                      lambda variance: variance)
    )

这篇关于OptaPlanner中公平性的博文可能会引起人们的兴趣:https://www.optaplanner.org/blog/2017/02/03/FormulaForMeasuringUnfairness.html

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74011580

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档