首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >《人工智能导论》第 9 章 智能体与多智能体系统详解

《人工智能导论》第 9 章 智能体与多智能体系统详解

作者头像
啊阿狸不会拉杆
发布2026-01-21 12:26:05
发布2026-01-21 12:26:05
1710
举报

        大家好!今天我们来深入学习《人工智能导论》第 9 章的内容 —— 智能体与多智能体系统。这一章内容非常重要,因为智能体 (Agent) 是人工智能领域中一个核心概念,而多智能体系统 (MAS) 则广泛应用于分布式问题求解、机器人协作、智能决策等多个领域。

9.1 智能体的概念与结构

        智能体 (Agent) 是人工智能领域的核心概念之一,可以理解为能够自主感知环境、做出决策并执行动作的实体。

智能体的概念

        智能体是处于某个环境中的实体,它可以通过传感器感知环境,通过执行器作用于环境,并能根据自身的目标自主决策。

图 1:智能体与环境交互示意图

智能体的特性

智能体通常具有以下特性:

  • 自主性:能够在没有人类干预的情况下自主操作
  • 反应性:能够感知环境并对环境变化做出反应
  • 主动性:能够主动采取行动以实现目标
  • 社会性:能够与其他智能体进行交互
  • 学习性:能够从经验中学习并改进自身行为
智能体的结构

智能体的基本结构可以分为以下几个部分:

  • 感知模块:负责获取环境信息
  • 决策模块:负责根据感知信息和内部状态做出决策
  • 执行模块:负责执行决策产生的动作
  • 通信模块:负责与其他智能体进行交互(多智能体系统中)
  • 知识模块:负责存储和管理智能体的知识(某些类型的智能体)

图 2:智能体结构思维导图

反应式 Agent

        反应式 Agent (Reactive Agent) 是最简单的智能体类型,它没有内部状态,仅根据当前环境的感知做出反应,遵循 "感知 - 行动" 规则。

代码语言:javascript
复制
class ReactiveAgent:
    """反应式Agent实现"""
    
    def __init__(self, name):
        self.name = name
        
    def perceive(self, environment):
        """感知环境,返回当前环境状态"""
        return environment.get_state()
        
    def decide(self, state):
        """根据当前状态做出决策"""
        # 简单的规则库:根据不同状态返回不同动作
        if state == "危险":
            return "逃跑"
        elif state == "安全":
            return "探索"
        elif state == "发现食物":
            return "进食"
        else:
            return "静止"
            
    def act(self, environment):
        """执行动作"""
        # 1. 感知环境
        state = self.perceive(environment)
        print(f"{self.name}感知到环境状态:{state}")
        
        # 2. 做出决策
        action = self.decide(state)
        print(f"{self.name}决定采取行动:{action}")
        
        # 3. 执行动作并影响环境
        environment.update(self.name, action)
        return action


class Environment:
    """环境类,模拟Agent所处的环境"""
    
    def __init__(self):
        self.state = "安全"  # 初始环境状态
        self.history = []     # 记录环境历史
        
    def get_state(self):
        """返回当前环境状态"""
        return self.state
        
    def update(self, agent_name, action):
        """根据Agent的动作更新环境状态"""
        self.history.append(f"{agent_name}执行了{action}动作")
        
        # 简单的环境状态转换规则
        if action == "探索" and self.state == "安全":
            # 探索时有一定概率发现食物
            import random
            if random.random() < 0.3:  # 30%概率发现食物
                self.state = "发现食物"
        elif action == "进食":
            self.state = "安全"
        elif action == "逃跑":
            self.state = "安全"


# 测试反应式Agent
if __name__ == "__main__":
    # 创建环境
    env = Environment()
    
    # 创建反应式Agent
    robot = ReactiveAgent("机器人Agent")
    
    # 模拟5个时间步的行为
    for i in range(5):
        print(f"\n--- 时间步 {i+1} ---")
        robot.act(env)
        print(f"当前环境状态:{env.get_state()}")
慎思式 Agent

        慎思式 Agent (Deliberative Agent) 也称为认知 Agent,它拥有内部状态和目标,能够进行规划和推理,根据当前状态和目标做出决策。

代码语言:javascript
复制
class DeliberativeAgent:
    """慎思式Agent实现"""
    
    def __init__(self, name):
        self.name = name
        self.beliefs = {}  # 信念库,存储Agent对世界的认知
        self.desires = []  # 愿望/目标列表
        self.intentions = []  # 意图,即Agent承诺要实现的目标
        
    def perceive(self, environment):
        """感知环境,更新信念库"""
        state = environment.get_state()
        self.beliefs['current_state'] = state
        print(f"{self.name}感知到环境状态:{state}")
        return state
        
    def generate_desires(self):
        """生成愿望/目标"""
        # 根据当前信念生成合理的目标
        if self.beliefs.get('current_state') == "低电量":
            self.desires = ["寻找充电器", "节省能源"]
        elif self.beliefs.get('current_state') == "有任务":
            self.desires = ["完成任务"]
        else:
            self.desires = ["保持待机", "监控环境"]
        print(f"{self.name}生成愿望:{self.desires}")
        
    def select_intentions(self):
        """从愿望中选择意图(承诺要实现的目标)"""
        # 简单的意图选择策略:选择第一个愿望
        if self.desires:
            self.intentions = [self.desires[0]]
        else:
            self.intentions = []
        print(f"{self.name}选择意图:{self.intentions}")
        
    def plan(self):
        """根据意图制定计划"""
        plans = []
        for intention in self.intentions:
            if intention == "寻找充电器":
                plans.append(["移动到充电站", "连接充电器", "开始充电"])
            elif intention == "完成任务":
                plans.append(["分析任务", "执行任务步骤", "检查任务结果"])
            elif intention == "保持待机":
                plans.append(["降低功耗", "定期感知环境"])
        print(f"{self.name}制定计划:{plans}")
        return plans
        
    def act(self, environment):
        """执行动作"""
        # 1. 感知环境,更新信念
        self.perceive(environment)
        
        # 2. 生成愿望
        self.generate_desires()
        
        # 3. 选择意图
        self.select_intentions()
        
        # 4. 制定计划
        plans = self.plan()
        
        # 5. 执行计划中的第一个动作
        if plans and plans[0]:
            action = plans[0][0]
            print(f"{self.name}执行动作:{action}")
            environment.update(self.name, action)
            return action
        return None


class Environment:
    """环境类,模拟Agent所处的环境"""
    
    def __init__(self):
        self.state = "正常"  # 初始环境状态
        self.history = []     # 记录环境历史
        
    def get_state(self):
        """返回当前环境状态"""
        return self.state
        
    def update(self, agent_name, action):
        """根据Agent的动作更新环境状态"""
        self.history.append(f"{agent_name}执行了{action}动作")
        
        # 环境状态转换规则
        if action == "移动到充电站":
            self.state = "充电站附近"
        elif action == "连接充电器":
            self.state = "充电中"
        elif action == "开始充电":
            self.state = "电量恢复"
        elif action == "分析任务":
            self.state = "任务分析完成"


# 测试慎思式Agent
if __name__ == "__main__":
    # 创建环境
    env = Environment()
    
    # 创建慎思式Agent
    ai_agent = DeliberativeAgent("智能助手Agent")
    
    # 模拟不同环境状态下的Agent行为
    print("--- 初始环境状态:正常 ---")
    ai_agent.act(env)
    
    print("\n--- 改变环境状态为:低电量 ---")
    env.state = "低电量"
    ai_agent.act(env)
    
    print("\n--- 改变环境状态为:有任务 ---")
    env.state = "有任务"
    ai_agent.act(env)

图 3:慎思式 Agent 工作流程图

复合式 Agent

        复合式 Agent (Hybrid Agent) 结合了反应式 Agent 和慎思式 Agent 的优点,既能够对环境变化做出快速反应,又能够进行复杂的推理和规划。

代码语言:javascript
复制
class HybridAgent:
    """复合式Agent实现"""
    
    def __init__(self, name):
        self.name = name
        # 反应式部分
        self.reactive_rules = {
            "紧急情况": "立即停止并报警",
            "障碍物": "避开障碍物",
            "危险接近": "迅速撤离"
        }
        
        # 慎思式部分
        self.beliefs = {}  # 信念库
        self.goals = []    # 目标列表
        self.plan = []     # 计划
        
    def reactive_layer(self, state):
        """反应层:快速响应紧急情况"""
        for condition, action in self.reactive_rules.items():
            if condition in state:
                print(f"{self.name}反应层触发:{condition} -> {action}")
                return action
        return None
        
    def deliberative_layer(self, environment):
        """慎思层:进行推理和规划"""
        # 更新信念
        self.beliefs['current_state'] = environment.get_state()
        
        # 设置目标
        if "清洁任务" in self.beliefs['current_state']:
            self.goals = ["完成清洁任务"]
        elif "充电" in self.beliefs['current_state']:
            self.goals = ["找到充电器", "充电至80%"]
        else:
            self.goals = ["巡逻", "监控异常"]
            
        # 制定计划
        self.plan = []
        for goal in self.goals:
            if goal == "完成清洁任务":
                self.plan.extend(["规划清洁路线", "开始清洁", "检查清洁效果"])
            elif goal == "找到充电器":
                self.plan.extend(["搜索充电器位置", "前往充电器", "连接充电"])
                
        print(f"{self.name}慎思层:目标={self.goals}, 计划={self.plan}")
        return self.plan[0] if self.plan else None
        
    def act(self, environment):
        """Agent的行为:先检查反应层,再调用慎思层"""
        state = environment.get_state()
        print(f"\n{self.name}感知到状态:{state}")
        
        # 1. 先检查反应层,看是否有紧急情况需要处理
        action = self.reactive_layer(state)
        if action:
            environment.update(self.name, action)
            return action
            
        # 2. 如果没有紧急情况,调用慎思层
        action = self.deliberative_layer(environment)
        if action:
            environment.update(self.name, action)
            return action
            
        return "待机"


class Environment:
    """环境类"""
    
    def __init__(self):
        self.state = "正常运行"
        
    def get_state(self):
        return self.state
        
    def update(self, agent_name, action):
        print(f"环境因{agent_name}的{action}动作而更新")
        # 根据动作简单更新环境
        if action == "避开障碍物":
            self.state = "障碍物已避开,继续前进"
        elif action == "开始清洁":
            self.state = "清洁进行中"
        elif action == "连接充电":
            self.state = "充电中"


# 测试复合式Agent
if __name__ == "__main__":
    env = Environment()
    robot = HybridAgent("清洁机器人")
    
    # 测试正常情况(触发慎思层)
    robot.act(env)
    
    # 测试紧急情况(触发反应层)
    env.state = "发现障碍物,紧急情况"
    robot.act(env)
    
    # 测试另一正常情况
    env.state = "有清洁任务,区域A"
    robot.act(env)

图 4:复合式 Agent 结构流程图

Agent 的应用

        智能体在多个领域都有广泛应用,下面是一个智能家居控制 Agent 的综合案例:

代码语言:javascript
复制
class HomeEnvironment:
    """家庭环境类,模拟智能家居环境状态"""
    
    def __init__(self):
        self.temperature = 24  # 温度,单位℃
        self.light = "off"     # 灯光状态:on/off
        self.humidity = 45     # 湿度,百分比
        self.security = "normal"  # 安防状态:normal/alarm
        self.people_in_room = 0  # 房间人数
        
    def get_state(self):
        """返回当前环境状态"""
        return {
            "temperature": self.temperature,
            "light": self.light,
            "humidity": self.humidity,
            "security": self.security,
            "people_in_room": self.people_in_room
        }
        
    def update(self, action):
        """根据Agent的动作更新环境"""
        print(f"环境执行动作:{action}")
        if action == "turn_on_light":
            self.light = "on"
        elif action == "turn_off_light":
            self.light = "off"
        elif action.startswith("set_temperature"):
            temp = int(action.split(":")[1])
            self.temperature = temp
        elif action == "sound_alarm":
            self.security = "alarm"


class SmartHomeAgent:
    """智能家居控制Agent"""
    
    def __init__(self):
        # 反应式规则:处理紧急情况和简单响应
        self.reactive_rules = {
            ("security", "alarm"): "sound_alarm",
            ("people_in_room", 0): "turn_off_light",
            ("temperature", lambda x: x > 28): "set_temperature:26",
            ("temperature", lambda x: x < 18): "set_temperature:20"
        }
        
        # 慎思式部分:处理复杂规划
        self.schedules = {  # 定时任务
            "morning": {"time": "07:00", "action": "turn_on_light"},
            "night": {"time": "22:30", "action": "turn_off_light"}
        }
        
    def perceive(self, env):
        """感知环境状态"""
        return env.get_state()
        
    def reactive_response(self, state):
        """反应式响应"""
        for (key, condition), action in self.reactive_rules.items():
            value = state[key]
            # 检查条件是否满足
            if isinstance(condition, tuple):
                if value in condition:
                    return action
            elif callable(condition):
                if condition(value):
                    return action
            elif value == condition:
                return action
        return None
        
    def deliberative_planning(self, state, current_time):
        """慎思式规划"""
        # 检查是否到了定时任务时间
        for schedule in self.schedules.values():
            if schedule["time"] == current_time:
                # 检查是否需要执行(例如晚上关灯前检查是否还有人)
                if schedule["action"] == "turn_off_light" and state["people_in_room"] > 0:
                    continue  # 有人则不执行关灯
                return schedule["action"]
        
        # 根据房间人数调整灯光和温度
        if state["people_in_room"] > 0 and state["light"] == "off":
            return "turn_on_light"
            
        return None
        
    def act(self, env, current_time):
        """执行动作"""
        state = self.perceive(env)
        print(f"\n当前环境状态:{state}")
        
        # 1. 先检查反应式响应(优先级高)
        action = self.reactive_response(state)
        if action:
            print(f"反应式响应:{action}")
            env.update(action)
            return action
            
        # 2. 执行慎思式规划
        action = self.deliberative_planning(state, current_time)
        if action:
            print(f"慎思式规划:{action}")
            env.update(action)
            return action
            
        print("无动作需要执行")
        return None


# 测试智能家居Agent
if __name__ == "__main__":
    home_env = HomeEnvironment()
    agent = SmartHomeAgent()
    
    # 模拟一天中的几个时间点
    test_times = [
        ("06:30", {"people_in_room": 0, "temperature": 30}),
        ("07:00", {"people_in_room": 1, "temperature": 24}),
        ("14:00", {"people_in_room": 2, "temperature": 17}),
        ("22:30", {"people_in_room": 0, "temperature": 24}),
        ("23:00", {"people_in_room": 1, "security": "alarm"})
    ]
    
    for time, changes in test_times:
        # 更新环境状态
        for key, value in changes.items():
            setattr(home_env, key, value)
            
        print(f"--- 时间:{time} ---")
        agent.act(home_env, time)

9.2 多智能体系统的概念与结构

        多智能体系统 (Multi-Agent System, MAS) 是由多个相互作用、相互协作的智能体组成的系统。这些智能体可以是同质的(具有相同功能),也可以是异质的(具有不同功能)。

多智能体系统的特点

多智能体系统具有以下特点:

  • 自主性:每个智能体都有一定的自主决策能力
  • 分布性:智能体可以物理分布在不同位置
  • 协作性:智能体可以相互协作完成复杂任务
  • 灵活性:系统可以通过添加或移除智能体来调整功能
  • 鲁棒性:单个智能体的故障不会导致整个系统崩溃
  • 社会性:智能体之间可以进行通信、协商和合作
多智能体系统的基本类型

根据智能体之间的关系和协作方式,多智能体系统可以分为以下几种基本类型:

  1. 协作型多智能体系统:智能体有共同目标,通过协作完成任务
  2. 竞争型多智能体系统:智能体有各自目标,存在资源竞争
  3. 混合类型多智能体系统:同时存在协作和竞争关系
  4. 协商型多智能体系统:通过协商解决冲突和分配资源
多智能体系统的体系结构

多智能体系统的体系结构主要有以下几种:

  1. 集中式结构:存在一个中心智能体,负责控制和协调其他智能体
  2. 分布式结构:没有中心控制,智能体通过相互通信进行协调
  3. 混合式结构:结合集中式和分布式的特点

下面是一个分布式多智能体系统的实现案例,模拟一个简单的物流配送系统:

代码语言:javascript
复制
class Package:
    """包裹类"""
    def __init__(self, package_id, source, destination):
        self.package_id = package_id
        self.source = source  # 起始位置
        self.destination = destination  # 目标位置
        self.status = "pending"  # 状态:pending/in_transit/delivered
        
    def __str__(self):
        return f"包裹{self.package_id}: {self.source} -> {self.destination} ({self.status})"


class DeliveryAgent:
    """配送智能体"""
    def __init__(self, agent_id, location):
        self.agent_id = agent_id
        self.location = location  # 当前位置
        self.load = []  # 装载的包裹
        self.capacity = 3  # 最大装载量
        
    def receive_message(self, message, sender):
        """接收消息"""
        print(f"Agent {self.agent_id} 收到来自 Agent {sender.agent_id} 的消息: {message}")
        if message["type"] == "request_delivery":
            # 处理配送请求
            return self.handle_delivery_request(message["package"])
        elif message["type"] == "status_update":
            # 处理状态更新
            return {"status": "received"}
        return {"status": "unknown_message"}
        
    def handle_delivery_request(self, package):
        """处理配送请求"""
        # 检查是否有容量
        if len(self.load) < self.capacity:
            self.load.append(package)
            package.status = "in_transit"
            return {"status": "accepted", "agent_id": self.agent_id}
        else:
            return {"status": "rejected", "reason": "full_capacity"}
            
    def deliver(self):
        """执行配送"""
        delivered = []
        for package in self.load[:]:  # 使用副本迭代
            # 简化:假设到达目的地
            print(f"Agent {self.agent_id} 配送包裹 {package.package_id} 到 {package.destination}")
            package.status = "delivered"
            delivered.append(package)
            self.load.remove(package)
        return delivered
        
    def __str__(self):
        return f"DeliveryAgent {self.agent_id} (位置: {self.location}, 负载: {len(self.load)}/{self.capacity})"


class DispatcherAgent:
    """调度智能体"""
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.delivery_agents = []  # 配送智能体列表
        self.packages = []  # 待配送包裹
        
    def register_agent(self, agent):
        """注册配送智能体"""
        self.delivery_agents.append(agent)
        print(f"Dispatcher {self.agent_id} 注册了 Agent {agent.agent_id}")
        
    def add_package(self, package):
        """添加待配送包裹"""
        self.packages.append(package)
        print(f"Dispatcher 收到新包裹: {package}")
        # 立即尝试分配
        self.assign_package(package)
        
    def assign_package(self, package):
        """分配包裹给配送智能体"""
        # 简单策略:依次询问每个智能体
        for agent in self.delivery_agents:
            # 发送配送请求
            response = agent.receive_message({
                "type": "request_delivery",
                "package": package
            }, self)
            
            if response["status"] == "accepted":
                print(f"包裹 {package.package_id} 已分配给 Agent {response['agent_id']}")
                return
        print(f"包裹 {package.package_id} 分配失败,所有Agent均已满载")


# 测试多智能体物流系统
if __name__ == "__main__":
    # 创建调度智能体
    dispatcher = DispatcherAgent("D1")
    
    # 创建配送智能体
    agent1 = DeliveryAgent("A1", "仓库A")
    agent2 = DeliveryAgent("A2", "仓库B")
    
    # 注册配送智能体
    dispatcher.register_agent(agent1)
    dispatcher.register_agent(agent2)
    
    # 添加包裹
    packages = [
        Package(1, "仓库A", "地址1"),
        Package(2, "仓库A", "地址2"),
        Package(3, "仓库A", "地址3"),
        Package(4, "仓库A", "地址4"),  # 这会导致第一个Agent满载
        Package(5, "仓库A", "地址5")
    ]
    
    for pkg in packages:
        dispatcher.add_package(pkg)
    
    # 执行配送
    print("\n--- 开始配送 ---")
    print(agent1)
    print(agent2)
    
    print("\n--- 配送结果 ---")
    agent1.deliver()
    agent2.deliver()

图 5:分布式物流多智能体系统结构

9.3 多智能体系统的通信

        智能体之间的通信是多智能体系统协作的基础,有效的通信机制能够提高系统的整体性能。

智能体通信的类型

智能体通信主要分为以下几种类型:

  • 同步通信:发送方等待接收方的响应
  • 异步通信:发送方不等待接收方的响应
  • 广播通信:一个智能体向多个智能体发送消息
  • 点对点通信:一个智能体直接向另一个智能体发送消息
Agent 通信的方式

常见的智能体通信方式包括:

  • 消息传递:智能体之间直接发送消息
  • 黑板系统:通过共享的 "黑板" 区域进行间接通信
  • 发布 - 订阅:智能体可以发布信息或订阅感兴趣的信息
智能体通信语言

        为了使不同智能体能够理解彼此的消息,需要定义统一的通信语言。最著名的是 FIPA ACL(Agent Communication Language)。

下面是一个实现了多种通信方式的多智能体系统案例:

代码语言:javascript
复制
import time
from collections import defaultdict

class CommunicationAgent:
    """支持多种通信方式的智能体"""
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.subscriptions = set()  # 订阅的主题
        self.message_queue = []  # 消息队列(用于异步通信)
        
    def send_sync(self, receiver, message):
        """同步通信:等待响应"""
        print(f"\n[同步通信] Agent {self.agent_id} -> Agent {receiver.agent_id}: {message['content']}")
        # 添加消息元数据
        message['sender'] = self.agent_id
        message['timestamp'] = time.time()
        # 发送并等待响应
        response = receiver.receive_message(message)
        return response
        
    def send_async(self, receiver, message):
        """异步通信:不等待响应"""
        print(f"\n[异步通信] Agent {self.agent_id} -> Agent {receiver.agent_id}: {message['content']}")
        message['sender'] = self.agent_id
        message['timestamp'] = time.time()
        # 将消息放入接收者的消息队列
        receiver.message_queue.append(message)
        return {"status": "sent"}
        
    def broadcast(self, agents, message):
        """广播通信:向多个智能体发送消息"""
        print(f"\n[广播通信] Agent {self.agent_id} 向 {len(agents)} 个Agent发送: {message['content']}")
        message['sender'] = self.agent_id
        message['timestamp'] = time.time()
        message['type'] = 'broadcast'
        responses = []
        for agent in agents:
            if agent.agent_id != self.agent_id:  # 不向自己广播
                responses.append(agent.receive_message(message))
        return responses
        
    def subscribe(self, topic, broker):
        """订阅主题"""
        self.subscriptions.add(topic)
        broker.register_subscription(topic, self)
        print(f"Agent {self.agent_id} 订阅了主题: {topic}")
        
    def receive_message(self, message):
        """接收消息并处理"""
        print(f"Agent {self.agent_id} 收到消息: {message['content']}")
        
        # 处理消息
        if message.get('type') == 'query':
            return self.handle_query(message)
        elif message.get('type') == 'request':
            return self.handle_request(message)
        else:
            return {"status": "received", "receiver": self.agent_id}
            
    def handle_query(self, message):
        """处理查询消息"""
        query = message['content']
        # 简单的查询处理
        if "状态" in query:
            return {"status": "response", "content": f"Agent {self.agent_id} 状态正常"}
        return {"status": "response", "content": "无法理解查询"}
        
    def handle_request(self, message):
        """处理请求消息"""
        request = message['content']
        # 简单的请求处理
        if "协作" in request:
            return {"status": "accepted", "content": f"Agent {self.agent_id} 同意协作"}
        return {"status": "rejected", "content": "无法满足请求"}
        
    def process_queue(self):
        """处理消息队列中的异步消息"""
        print(f"\nAgent {self.agent_id} 处理 {len(self.message_queue)} 条异步消息")
        responses = []
        while self.message_queue:
            message = self.message_queue.pop(0)
            response = self.receive_message(message)
            responses.append(response)
        return responses


class MessageBroker:
    """消息代理,用于发布-订阅模式"""
    def __init__(self):
        self.subscriptions = defaultdict(list)  # 主题 -> 订阅者列表
        
    def register_subscription(self, topic, agent):
        """注册订阅"""
        self.subscriptions[topic].append(agent)
        
    def publish(self, topic, message, sender):
        """发布消息到主题"""
        print(f"\n[发布-订阅] Agent {sender.agent_id} 发布到主题 {topic}: {message['content']}")
        message['sender'] = sender.agent_id
        message['topic'] = topic
        message['timestamp'] = time.time()
        
        # 向所有订阅者发送消息
        for agent in self.subscriptions.get(topic, []):
            if agent.agent_id != sender.agent_id:  # 不向发布者自己发送
                agent.message_queue.append(message)
        return {"status": "published", "topic": topic}


# 测试多智能体通信系统
if __name__ == "__main__":
    # 创建智能体
    agent1 = CommunicationAgent("A1")
    agent2 = CommunicationAgent("A2")
    agent3 = CommunicationAgent("A3")
    agents = [agent1, agent2, agent3]
    
    # 创建消息代理
    broker = MessageBroker()
    
    # 测试同步通信
    response = agent1.send_sync(agent2, {
        "type": "query",
        "content": "你的当前状态是什么?"
    })
    print("同步通信响应:", response)
    
    # 测试异步通信
    response = agent1.send_async(agent3, {
        "type": "request",
        "content": "请完成数据收集任务"
    })
    print("异步通信响应:", response)
    # 处理异步消息
    print("\n处理异步消息:")
    agent3.process_queue()
    
    # 测试广播通信
    responses = agent2.broadcast(agents, {
        "content": "我已完成初始化,准备接受任务"
    })
    print("广播通信响应:", responses)
    
    # 测试发布-订阅模式
    # 订阅主题
    agent1.subscribe("weather", broker)
    agent3.subscribe("weather", broker)
    
    # 发布消息
    broker.publish("weather", {
        "content": "明天有雨,请做好准备"
    }, agent2)
    
    # 处理订阅消息
    print("\n处理订阅消息:")
    agent1.process_queue()
    agent3.process_queue()

图 6:多智能体通信方式对比图

9.4 多智能体系统的协调

        多智能体系统的协调 (Coordination) 是指智能体之间通过调整各自的行为,以避免冲突并实现系统整体目标的过程。

协调机制的主要目标包括:

  • 避免智能体之间的资源竞争和操作冲突
  • 确保任务分配的合理性和高效性
  • 提高系统整体性能和资源利用率

下面是一个交通路口智能体协调系统的实现案例:

代码语言:javascript
复制
class TrafficLightAgent:
    """交通灯智能体"""
    def __init__(self, agent_id, direction):
        self.agent_id = agent_id
        self.direction = direction  # 方向:南北/东西
        self.state = "red"  # 初始状态:红灯
        self.timer = 0      # 计时器
        self.green_duration = 30  # 绿灯持续时间(秒)
        self.yellow_duration = 5  # 黄灯持续时间(秒)
        
    def change_state(self, new_state):
        """改变交通灯状态"""
        if new_state != self.state:
            print(f"交通灯 {self.agent_id} ({self.direction}) 从 {self.state} 变为 {new_state}")
            self.state = new_state
            self.timer = 0  # 重置计时器
            
    def update(self):
        """更新交通灯状态(基于时间)"""
        self.timer += 1
        
        # 自动状态转换(无协调情况下)
        if self.state == "green" and self.timer >= self.green_duration:
            self.change_state("yellow")
        elif self.state == "yellow" and self.timer >= self.yellow_duration:
            self.change_state("red")
            
    def send_message(self, receiver, message):
        """发送消息给其他交通灯"""
        print(f"交通灯 {self.agent_id} 发送给 {receiver.agent_id} 消息: {message}")
        return receiver.receive_message(message, self)
        
    def receive_message(self, message, sender):
        """接收消息"""
        if message["type"] == "request_green":
            # 处理绿灯请求
            if self.state == "red":
                # 如果当前是红灯,可以同意请求
                return {"status": "accepted"}
            else:
                # 如果当前不是红灯,拒绝请求
                return {
                    "status": "rejected", 
                    "current_state": self.state,
                    "available_time": self.green_duration - self.timer + self.yellow_duration
                }
        elif message["type"] == "grant_green":
            # 收到绿灯许可
            self.change_state("green")
            return {"status": "received"}
        return {"status": "ok"}
        
    def __str__(self):
        return f"TrafficLight {self.agent_id} ({self.direction}): {self.state} (计时: {self.timer}s)"


class TrafficCoordinator:
    """交通协调器"""
    def __init__(self):
        self.traffic_lights = []
        self.queue = []  # 等待绿灯的队列
        
    def register_light(self, traffic_light):
        """注册交通灯"""
        self.traffic_lights.append(traffic_light)
        print(f"协调器注册了交通灯: {traffic_light.agent_id}")
        
    def request_coordination(self, requester):
        """处理协调请求"""
        print(f"\n协调器收到 {requester.agent_id} 的协调请求")
        
        # 检查是否有其他交通灯处于绿灯状态
        for light in self.traffic_lights:
            if light.agent_id != requester.agent_id and light.state == "green":
                # 请求当前绿灯的交通灯是否可以让行
                response = requester.send_message(light, {"type": "request_green"})
                if response["status"] == "rejected":
                    # 需要等待
                    wait_time = response["available_time"]
                    print(f"需要等待 {wait_time} 秒后才能为 {requester.agent_id} 变绿灯")
                    return {"status": "wait", "time": wait_time}
        
        # 如果没有其他绿灯,可以直接变绿灯
        requester.change_state("green")
        return {"status": "granted"}
        
    def update(self):
        """更新协调状态"""
        # 检查是否有交通灯需要协调
        for light in self.traffic_lights:
            if light.state == "red" and light.timer > 40:  # 红灯时间过长
                # 请求协调
                self.request_coordination(light)


# 测试交通灯协调系统
if __name__ == "__main__":
    # 创建交通灯
    north_south = TrafficLightAgent("TL1", "南北方向")
    east_west = TrafficLightAgent("TL2", "东西方向")
    
    # 创建协调器
    coordinator = TrafficCoordinator()
    coordinator.register_light(north_south)
    coordinator.register_light(east_west)
    
    # 初始状态设置
    north_south.change_state("green")
    east_west.change_state("red")
    
    # 模拟时间流逝
    print("\n--- 模拟交通灯运行 ---")
    for i in range(1, 60):
        if i % 10 == 0:  # 每10秒打印一次状态
            print(f"\n--- 时间 {i} 秒 ---")
            print(north_south)
            print(east_west)
            
            # 调用协调器
            coordinator.update()
            
        # 更新交通灯状态
        north_south.update()
        east_west.update()

图 7:交通灯协调流程图

9.5 多智能体系统的协作

        多智能体系统的协作 (Collaboration) 是指多个智能体为了实现共同目标而共同工作的过程。

多智能体的协作类型

常见的多智能体协作类型包括:

  • 分工协作:将复杂任务分解为子任务,由不同智能体分别完成
  • 协同协作:多个智能体共同执行同一任务,相互配合
  • 帮助协作:一个智能体在遇到困难时,其他智能体提供帮助
合同网协作方法

        合同网 (Contract Net) 是一种经典的多智能体协作方法,通过 "招标 - 投标 - 中标" 的过程分配任务。

代码语言:javascript
复制
import random

class ContractorAgent:
    """承包商智能体,能够执行任务并投标"""
    def __init__(self, agent_id, capabilities):
        self.agent_id = agent_id
        self.capabilities = capabilities  # 能力列表
        self.current_task = None  # 当前任务
        self.busy = False  # 是否忙碌
        
    def receive_message(self, message, sender):
        """接收消息"""
        if message["type"] == "call_for_proposal":
            # 处理招标信息
            return self.handle_cfp(message, sender)
        elif message["type"] == "award_contract":
            # 处理中标通知
            return self.handle_award(message)
        elif message["type"] == "cancel_contract":
            # 处理取消合同
            return self.handle_cancel(message)
        return {"status": "unknown_message"}
        
    def handle_cfp(self, message, sender):
        """处理招标信息"""
        task = message["task"]
        print(f"Agent {self.agent_id} 收到招标: {task['name']}")
        
        # 检查是否有能力完成任务且不忙碌
        if not self.busy and any(cap in self.capabilities for cap in task["required_capabilities"]):
            # 计算投标价格(简化:基于任务复杂度)
            cost = random.randint(10, 50) * task["complexity"]
            print(f"Agent {self.agent_id} 投标: {cost}")
            return {
                "status": "proposal",
                "agent_id": self.agent_id,
                "cost": cost,
                "estimated_time": task["complexity"] * 5
            }
        else:
            print(f"Agent {self.agent_id} 拒绝投标")
            return {"status": "reject"}
            
    def handle_award(self, message):
        """处理中标通知"""
        self.current_task = message["task"]
        self.busy = True
        print(f"Agent {self.agent_id} 中标: {self.current_task['name']}")
        return {"status": "accepted"}
        
    def handle_cancel(self, message):
        """处理取消合同"""
        self.current_task = None
        self.busy = False
        print(f"Agent {self.agent_id} 的合同被取消")
        return {"status": "cancelled"}
        
    def execute_task(self):
        """执行任务"""
        if self.current_task:
            print(f"Agent {self.agent_id} 正在执行任务: {self.current_task['name']}")
            # 简化:任务执行完成
            task = self.current_task
            self.current_task = None
            self.busy = False
            return {
                "status": "completed",
                "task_id": task["id"],
                "agent_id": self.agent_id
            }
        return {"status": "no_task"}


class ManagerAgent:
    """经理智能体,负责任务分配"""
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.contractors = []  # 承包商列表
        self.tasks = []  # 任务列表
        self.active_contracts = {}  # 活跃合同
        
    def register_contractor(self, contractor):
        """注册承包商"""
        self.contractors.append(contractor)
        print(f"Manager {self.agent_id} 注册了承包商: {contractor.agent_id}")
        
    def add_task(self, task):
        """添加任务"""
        self.tasks.append(task)
        print(f"Manager {self.agent_id} 收到新任务: {task['name']}")
        # 立即启动招标
        self.call_for_proposals(task)
        
    def call_for_proposals(self, task):
        """发起招标"""
        print(f"\nManager {self.agent_id} 为任务 {task['name']} 发起招标")
        # 向所有承包商发送招标信息
        proposals = []
        for contractor in self.contractors:
            response = contractor.receive_message({
                "type": "call_for_proposal",
                "task": task,
                "deadline": 5  # 投标截止时间(简化)
            }, self)
            
            if response["status"] == "proposal":
                proposals.append(response)
                
        # 评估投标并授予合同
        if proposals:
            # 选择价格最低的投标
            best_proposal = min(proposals, key=lambda x: x["cost"])
            self.award_contract(task, best_proposal["agent_id"])
        else:
            print(f"任务 {task['name']} 没有收到任何投标")
            
    def award_contract(self, task, agent_id):
        """授予合同"""
        # 找到中标Agent
        for contractor in self.contractors:
            if contractor.agent_id == agent_id:
                # 发送中标通知
                response = contractor.receive_message({
                    "type": "award_contract",
                    "task": task
                }, self)
                
                if response["status"] == "accepted":
                    self.active_contracts[task["id"]] = {
                        "task": task,
                        "contractor": agent_id,
                        "status": "active"
                    }
                    print(f"任务 {task['name']} 已授予 Agent {agent_id}")
                break


# 测试合同网协作系统
if __name__ == "__main__":
    # 创建承包商智能体
    agent1 = ContractorAgent("A1", ["数据收集", "数据分析"])
    agent2 = ContractorAgent("A2", ["模型训练", "数据分析"])
    agent3 = ContractorAgent("A3", ["报告生成", "数据可视化"])
    
    # 创建经理智能体
    manager = ManagerAgent("M1")
    
    # 注册承包商
    manager.register_contractor(agent1)
    manager.register_contractor(agent2)
    manager.register_contractor(agent3)
    
    # 添加任务
    tasks = [
        {
            "id": 1,
            "name": "市场数据分析",
            "required_capabilities": ["数据收集", "数据分析"],
            "complexity": 3
        },
        {
            "id": 2,
            "name": "用户行为预测模型",
            "required_capabilities": ["数据分析", "模型训练"],
            "complexity": 5
        },
        {
            "id": 3,
            "name": "季度报告生成",
            "required_capabilities": ["报告生成", "数据可视化"],
            "complexity": 2
        }
    ]
    
    for task in tasks:
        manager.add_task(task)
        
    # 执行任务
    print("\n--- 开始执行任务 ---")
    results = []
    results.append(agent1.execute_task())
    results.append(agent2.execute_task())
    results.append(agent3.execute_task())
    
    print("\n--- 任务执行结果 ---")
    for result in results:
        print(result)
黑板模型协作方法

        黑板模型 (Blackboard Model) 是一种通过共享 "黑板" 进行间接协作的方法,智能体可以读取黑板上的信息并写入自己的结果。

代码语言:javascript
复制
class Blackboard:
    """黑板,用于存储和共享信息"""
    def __init__(self):
        self.data = {}  # 存储数据
        self.tasks = []  # 任务列表
        self.results = {}  # 结果存储
        
    def write_data(self, key, value, agent_id):
        """写入数据"""
        self.data[key] = {
            "value": value,
            "agent_id": agent_id,
            "timestamp": len(self.data)  # 简单的时间戳
        }
        print(f"黑板: Agent {agent_id} 写入数据 {key} = {value}")
        
    def read_data(self, key):
        """读取数据"""
        if key in self.data:
            return self.data[key]["value"]
        return None
        
    def add_task(self, task, agent_id):
        """添加任务"""
        self.tasks.append({
            "task_id": len(self.tasks) + 1,
            "description": task,
            "status": "pending",
            "creator": agent_id,
            "assignee": None
        })
        print(f"黑板: Agent {agent_id} 添加任务: {task}")
        
    def claim_task(self, agent_id, capabilities):
        """认领任务"""
        for task in self.tasks:
            if task["status"] == "pending":
                # 简单检查:任务描述是否匹配能力
                if any(cap in task["description"].lower() for cap in capabilities):
                    task["status"] = "in_progress"
                    task["assignee"] = agent_id
                    print(f"Agent {agent_id} 认领了任务: {task['description']}")
                    return task
        return None
        
    def submit_result(self, task_id, result, agent_id):
        """提交结果"""
        self.results[task_id] = {
            "result": result,
            "agent_id": agent_id,
            "timestamp": len(self.results)
        }
        
        # 更新任务状态
        for task in self.tasks:
            if task["task_id"] == task_id:
                task["status"] = "completed"
                print(f"Agent {agent_id} 完成了任务 {task_id}: {task['description']}")
                break


class BlackboardAgent:
    """基于黑板模型的智能体"""
    def __init__(self, agent_id, capabilities):
        self.agent_id = agent_id
        self.capabilities = capabilities  # 能力列表
        self.blackboard = None  # 黑板引用
        
    def set_blackboard(self, blackboard):
        """设置黑板引用"""
        self.blackboard = blackboard
        
    def contribute_data(self, key, value):
        """向黑板贡献数据"""
        if self.blackboard:
            self.blackboard.write_data(key, value, self.agent_id)
            
    def create_task(self, task_description):
        """创建任务"""
        if self.blackboard:
            self.blackboard.add_task(task_description, self.agent_id)
            
    def work(self):
        """执行工作:认领并完成任务"""
        if not self.blackboard:
            return
            
        # 认领任务
        task = self.blackboard.claim_task(self.agent_id, self.capabilities)
        if task:
            # 执行任务(简化)
            print(f"Agent {self.agent_id} 正在执行任务: {task['description']}")
            
            # 模拟任务执行
            result = f"{task['description']} 的执行结果"
            
            # 提交结果
            self.blackboard.submit_result(task["task_id"], result, self.agent_id)
            
            # 可能产生新数据
            self.contribute_data(f"task_{task['task_id']}_result", result)
            
            # 可能创建新任务
            if "分析" in task["description"]:
                self.create_task(f"基于 {task['task_id']} 的结果生成报告")


# 测试黑板模型协作系统
if __name__ == "__main__":
    # 创建黑板
    blackboard = Blackboard()
    
    # 创建智能体
    data_agent = BlackboardAgent("A1", ["收集", "数据"])
    analysis_agent = BlackboardAgent("A2", ["分析", "处理"])
    report_agent = BlackboardAgent("A3", ["报告", "总结", "可视化"])
    
    # 设置黑板
    data_agent.set_blackboard(blackboard)
    analysis_agent.set_blackboard(blackboard)
    report_agent.set_blackboard(blackboard)
    
    # 初始数据贡献
    data_agent.contribute_data("raw_sales_data", [1500, 2300, 1800, 3200])
    data_agent.contribute_data("customer_count", 450)
    
    # 创建初始任务
    data_agent.create_task("分析销售数据趋势")
    
    # 智能体工作
    print("\n--- 智能体开始工作 ---")
    data_agent.work()
    analysis_agent.work()
    report_agent.work()
    
    # 查看结果
    print("\n--- 黑板中的结果 ---")
    for task_id, result in blackboard.results.items():
        print(f"任务 {task_id} 结果: {result['result']} (由 Agent {result['agent_id']} 完成)")
    
    print("\n--- 任务状态 ---")
    for task in blackboard.tasks:
        print(f"任务 {task['task_id']}: {task['description']} - {task['status']} (由 Agent {task['assignee']} 执行)")
市场机制协作方法

        市场机制协作方法模拟经济市场,通过价格机制来分配资源和任务。

代码语言:javascript
复制
class Resource:
    """资源类"""
    def __init__(self, resource_id, resource_type, quantity=1):
        self.resource_id = resource_id
        self.resource_type = resource_type
        self.quantity = quantity
        self.owner = None
        
    def __str__(self):
        return f"资源 {self.resource_id}: {self.resource_type} (数量: {self.quantity})"


class MarketAgent:
    """市场中的智能体"""
    def __init__(self, agent_id, budget=100):
        self.agent_id = agent_id
        self.budget = budget  # 预算
        self.resources = []  # 拥有的资源
        self.needs = []  # 需要的资源
        self.offers = []  # 提供的报价
        
    def add_resource(self, resource):
        """获得资源"""
        resource.owner = self.agent_id
        self.resources.append(resource)
        print(f"Agent {self.agent_id} 获得了 {resource}")
        
    def need_resource(self, resource_type, quantity=1):
        """声明需要的资源"""
        self.needs.append({
            "type": resource_type,
            "quantity": quantity,
            "fulfilled": False
        })
        print(f"Agent {self.agent_id} 需要 {quantity} 个 {resource_type}")
        
    def create_offer(self, resource_id, price):
        """为资源创建报价"""
        # 检查是否拥有该资源
        for resource in self.resources:
            if resource.resource_id == resource_id:
                offer = {
                    "offer_id": len(self.offers) + 1,
                    "resource_id": resource_id,
                    "resource_type": resource.resource_type,
                    "price": price,
                    "seller": self.agent_id,
                    "status": "active"
                }
                self.offers.append(offer)
                print(f"Agent {self.agent_id} 为 {resource} 报价 {price} 元")
                return offer
        return None
        
    def make_bid(self, offer, bid_price):
        """对报价进行投标"""
        if bid_price > offer["price"] and self.budget >= bid_price:
            print(f"Agent {self.agent_id} 对报价 {offer['offer_id']} 投标 {bid_price} 元")
            return {
                "status": "valid",
                "bid_price": bid_price,
                "bidder": self.agent_id
            }
        return {
            "status": "invalid",
            "reason": "insufficient_budget" if self.budget < bid_price else "low_price"
        }
        
    def buy_resource(self, offer, price):
        """购买资源"""
        if self.budget >= price:
            # 扣钱
            self.budget -= price
            
            # 从卖家处获取资源
            for seller in market_agents:
                if seller.agent_id == offer["seller"]:
                    for resource in seller.resources[:]:
                        if resource.resource_id == offer["resource_id"]:
                            # 转移资源
                            seller.resources.remove(resource)
                            self.add_resource(resource)
                            
                            # 更新预算
                            seller.budget += price
                            
                            # 更新报价状态
                            for o in seller.offers:
                                if o["offer_id"] == offer["offer_id"]:
                                    o["status"] = "sold"
                                    break
                            
                            print(f"交易完成: Agent {self.agent_id} 以 {price} 元购买了 {resource}")
                            return True
        return False


class Market:
    """市场,负责管理交易"""
    def __init__(self):
        self.offers = []  # 所有报价
        self.transactions = []  # 交易记录
        
    def add_offer(self, offer):
        """添加报价"""
        self.offers.append(offer)
        print(f"市场添加了报价: {offer['offer_id']} (资源类型: {offer['resource_type']}, 价格: {offer['price']})")
        
    def find_offers(self, resource_type):
        """查找特定类型的有效报价"""
        return [
            offer for offer in self.offers 
            if offer["resource_type"] == resource_type and offer["status"] == "active"
        ]
        
    def process_bids(self):
        """处理投标,完成交易"""
        # 简化:为每个资源类型找到最高投标
        for resource_type in set(offer["resource_type"] for offer in self.offers if offer["status"] == "active"):
            # 获取该类型的所有有效报价
            type_offers = self.find_offers(resource_type)
            
            # 对每个报价,找到最高投标
            for offer in type_offers:
                # 查找该报价的所有投标(简化:这里我们假设已经有投标)
                # 在实际系统中,应该有投标记录
                highest_bid = offer["price"] * 1.1  # 假设最高投标比报价高10%
                
                # 找到愿意出这个价格的买家
                for agent in market_agents:
                    if agent.agent_id != offer["seller"] and agent.budget >= highest_bid:
                        # 检查买家是否需要该资源
                        for need in agent.needs:
                            if need["type"] == resource_type and not need["fulfilled"]:
                                # 完成交易
                                if agent.buy_resource(offer, highest_bid):
                                    need["fulfilled"] = True
                                    self.transactions.append({
                                        "transaction_id": len(self.transactions) + 1,
                                        "offer_id": offer["offer_id"],
                                        "buyer": agent.agent_id,
                                        "seller": offer["seller"],
                                        "price": highest_bid,
                                        "resource_type": resource_type
                                    })
                                break


# 测试市场机制协作系统
if __name__ == "__main__":
    # 创建市场智能体
    agent1 = MarketAgent("A1", budget=200)
    agent2 = MarketAgent("A2", budget=300)
    agent3 = MarketAgent("A3", budget=150)
    market_agents = [agent1, agent2, agent3]
    
    # 创建资源
    resources = [
        Resource(1, "计算资源"),
        Resource(2, "存储资源"),
        Resource(3, "网络带宽"),
        Resource(4, "计算资源")
    ]
    
    # 分配初始资源
    agent1.add_resource(resources[0])
    agent1.add_resource(resources[1])
    agent2.add_resource(resources[2])
    agent3.add_resource(resources[3])
    
    # 声明需求
    agent1.need_resource("网络带宽")
    agent2.need_resource("计算资源")
    agent3.need_resource("存储资源")
    
    # 创建市场
    market = Market()
    
    # 创建报价
    offer1 = agent1.create_offer(1, 50)  # 出售计算资源
    offer2 = agent1.create_offer(2, 30)  # 出售存储资源
    offer3 = agent2.create_offer(3, 40)  # 出售网络带宽
    offer4 = agent3.create_offer(4, 60)  # 出售计算资源
    
    # 将报价添加到市场
    market.add_offer(offer1)
    market.add_offer(offer2)
    market.add_offer(offer3)
    market.add_offer(offer4)
    
    # 处理投标和交易
    print("\n--- 开始市场交易 ---")
    market.process_bids()
    
    # 显示最终状态
    print("\n--- 最终状态 ---")
    for agent in market_agents:
        print(f"\nAgent {agent.agent_id}:")
        print(f"  预算: {agent.budget} 元")
        print("  拥有资源:")
        for resource in agent.resources:
            print(f"    - {resource}")
        print("  需求满足情况:")
        for need in agent.needs:
            status = "已满足" if need["fulfilled"] else "未满足"
            print(f"    - {need['quantity']} 个 {need['type']}: {status}")

9.6 多智能体系统的协商

        协商 (Negotiation) 是多智能体系统中解决冲突和分配资源的重要机制,当智能体之间存在利益冲突或资源竞争时,通过协商达成共识。

代码语言:javascript
复制
class NegotiationAgent:
    """参与协商的智能体"""

    def __init__(self, agent_id, resources, goals):
        self.agent_id = agent_id
        self.resources = resources  # 拥有的资源(集合)
        self.goals = goals  # 目标列表
        self.preferences = {}  # 偏好:资源的重要性(1-10,越高越重要)
        self.concessions = 0  # 让步次数
        self.max_concessions = 3  # 最大让步次数
        self.utilities = {}  # 资源对自身的效用值

    def set_preferences(self, preferences):
        """设置偏好(资源重要性)"""
        self.preferences = preferences
        # 根据偏好计算效用值
        self.calculate_utilities()
        print(f"Agent {self.agent_id} 的偏好设置: {preferences}")

    def calculate_utilities(self):
        """根据偏好计算资源效用值"""
        total = sum(self.preferences.values())
        for resource, importance in self.preferences.items():
            self.utilities[resource] = importance / total  # 归一化到0-1之间

    def get_desired_resource(self):
        """获取希望得到的资源(自己没有但重要性高的)"""
        # 假设知道其他资源类型(实际系统中可能需要通过感知或通信获取)
        all_resources = set(self.preferences.keys())
        missing_resources = all_resources - self.resources

        if not missing_resources:
            return None

        # 按重要性排序缺失的资源
        sorted_missing = sorted(
            missing_resources,
            key=lambda x: self.preferences[x],
            reverse=True
        )
        return sorted_missing[0]  # 返回最重要的缺失资源

    def receive_proposal(self, proposal, issue, sender):
        """接收提议并处理"""
        print(f"Agent {self.agent_id} 收到 Agent {sender.agent_id} 的提议: {proposal}")

        # 评估提议
        if self.evaluate_proposal(proposal) > 0.6:  # 效用大于0.6则接受
            return {"status": "accept"}
        elif self.concessions < self.max_concessions:  # 还能让步则提出反提议
            counter_proposal = self.make_counter_proposal(proposal, issue)
            self.concessions += 1
            return {"status": "counter", "proposal": counter_proposal}
        else:  # 无法再让步则拒绝
            return {"status": "reject"}

    def evaluate_proposal(self, proposal):
        """评估提议的效用"""
        # 计算收益:得到请求资源的效用
        gain = self.utilities.get(proposal["request"], 0) if proposal["request"] in self.preferences else 0

        # 计算损失:失去提供资源的效用
        loss = self.utilities.get(proposal["offer"], 0) if proposal["offer"] in self.preferences else 0

        # 净效用
        net_utility = gain - loss
        print(f"Agent {self.agent_id} 评估提议效用: {net_utility:.2f} (收益: {gain:.2f}, 损失: {loss:.2f})")
        return net_utility

    def make_counter_proposal(self, original_proposal, issue):
        """提出反提议(让步)"""
        # 反提议策略:逐渐降低要求,增加提供
        if issue == "资源交换":
            # 找到次重要的资源作为提议
            sorted_resources = sorted(
                self.preferences.items(),
                key=lambda x: x[1]
            )

            # 让步:提供比原来更有价值的资源,或接受次优的请求
            offer_index = min(self.concessions, len(sorted_resources) - 1)
            new_offer = sorted_resources[offer_index][0]

            # 调整请求:如果让步次数多,接受次优资源
            desired = self.get_desired_resource()
            if self.concessions >= 1:
                all_resources = set(self.preferences.keys())
                missing_resources = sorted(
                    all_resources - self.resources,
                    key=lambda x: self.preferences[x],
                    reverse=True
                )
                desired = missing_resources[min(1, len(missing_resources) - 1)]  # 次优选择

            counter_proposal = {
                "offer": new_offer,
                "request": desired
            }
            print(f"Agent {self.agent_id} 提出反提议: {counter_proposal} (让步次数: {self.concessions})")
            return counter_proposal
        return {}

    def initiate_negotiation(self, other_agent, issue):
        """发起协商"""
        print(f"\nAgent {self.agent_id} 向 Agent {other_agent.agent_id} 发起关于 {issue} 的协商")

        # 提出初始提议
        initial_proposal = self.make_proposal(issue)
        response = other_agent.receive_proposal(initial_proposal, issue, self)

        # 处理响应
        if response["status"] == "accept":
            print(f"Agent {self.agent_id} 的提议被接受,协商成功")
            return self.execute_agreement(initial_proposal, other_agent)
        elif response["status"] == "counter":
            # 进入协商循环
            return self.negotiation_loop(other_agent, issue, response["proposal"])
        else:
            print(f"Agent {self.agent_id} 的提议被拒绝,协商失败")
            return {"status": "failed"}

    def make_proposal(self, issue):
        """提出提议"""
        # 简化:根据偏好提出提议
        if issue == "资源交换":
            # 找出自己不太重要但对方可能需要的资源
            least_important = min(self.preferences.items(), key=lambda x: x[1])[0]
            return {
                "offer": least_important,
                "request": self.get_desired_resource()
            }
        return {}

    def negotiation_loop(self, other_agent, issue, proposal):
        """协商循环:处理多轮提议与反提议"""
        current_proposal = proposal
        round = 1

        while round <= 5:  # 最多5轮协商
            print(f"\n--- 协商轮次 {round} ---")

            # 评估对方的反提议
            if self.evaluate_proposal(current_proposal) > 0.5:  # 降低接受阈值
                print(f"Agent {self.agent_id} 接受提议: {current_proposal}")
                return self.execute_agreement(current_proposal, other_agent)

            # 提出新的反提议
            if self.concessions < self.max_concessions:
                counter = self.make_counter_proposal(current_proposal, issue)
                self.concessions += 1

                # 发送反提议并获取响应
                response = other_agent.receive_proposal(counter, issue, self)

                if response["status"] == "accept":
                    print(f"Agent {self.agent_id} 的反提议被接受,协商成功")
                    return self.execute_agreement(counter, other_agent)
                elif response["status"] == "counter":
                    current_proposal = response["proposal"]
                else:
                    print(f"Agent {self.agent_id} 的反提议被拒绝,协商失败")
                    return {"status": "failed"}
            else:
                print(f"Agent {self.agent_id} 无法再让步,协商失败")
                return {"status": "failed"}

            round += 1

        print("协商轮次耗尽,协商失败")
        return {"status": "failed"}

    def execute_agreement(self, proposal, other_agent):
        """执行协商达成的协议"""
        # 交换资源
        if proposal["offer"] in self.resources and proposal["request"] in other_agent.resources:
            # 移除自己提供的资源
            self.resources.remove(proposal["offer"])
            # 添加获得的资源
            self.resources.add(proposal["request"])

            # 对方移除资源
            other_agent.resources.remove(proposal["request"])
            # 对方添加资源
            other_agent.resources.add(proposal["offer"])

            print(f"协议执行成功:Agent {self.agent_id} 用 {proposal['offer']} 交换到 {proposal['request']}")
            return {
                "status": "success",
                "agent_resources": self.resources,
                "other_resources": other_agent.resources
            }
        else:
            print("资源交换失败:一方不拥有约定的资源")
            return {"status": "failed", "reason": "resource_unavailable"}


# 测试多智能体协商系统
if __name__ == "__main__":
    # 创建两个协商智能体
    agent_a = NegotiationAgent(
        agent_id="A",
        resources={"木材", "石料"},
        goals=["建造房屋", "制作工具"]
    )

    agent_b = NegotiationAgent(
        agent_id="B",
        resources={"金属", "布料"},
        goals=["制作工具", "缝制衣物"]
    )

    # 设置资源偏好(1-10,数字越大越重要)
    agent_a.set_preferences({
        "木材": 3,
        "石料": 4,
        "金属": 10,  # 非常需要金属来制作工具
        "布料": 2
    })

    agent_b.set_preferences({
        "木材": 8,  # 非常需要木材
        "石料": 2,
        "金属": 5,
        "布料": 6
    })

    # 发起资源交换协商
    result = agent_a.initiate_negotiation(agent_b, "资源交换")

    # 显示最终结果
    print("\n--- 协商最终结果 ---")
    print(f"状态: {result['status']}")
    if result["status"] == "success":
        print(f"Agent A 最终资源: {result['agent_resources']}")
        print(f"Agent B 最终资源: {result['other_resources']}")

图 8:多智能体协商流程图

9.7 小结

本章详细介绍了智能体与多智能体系统的核心概念、结构和关键技术,主要内容包括:

  1. 智能体的概念与结构
    • 智能体是能够自主感知环境、做出决策并执行动作的实体
    • 主要类型包括反应式 Agent、慎思式 Agent 和复合式 Agent
    • 每种智能体都有其适用场景,复合式 Agent 结合了多种类型的优点
  2. 多智能体系统的概念与结构
    • 多智能体系统由多个相互作用的智能体组成
    • 具有自主性、分布性、协作性等特点
    • 主要体系结构包括集中式、分布式和混合式
  3. 多智能体系统的通信
    • 通信方式包括同步通信、异步通信、广播通信等
    • 智能体通信语言(如 FIPA ACL)确保不同智能体能够理解彼此
  4. 多智能体系统的协调与协作
    • 协调用于避免冲突,确保系统有序运行
    • 协作通过合同网、黑板模型等机制实现任务分配和共同问题求解
  5. 多智能体系统的协商
    • 协商是解决智能体间冲突的重要机制
    • 包含提议生成、评估、反提议等过程
    • 合理的让步策略是协商成功的关键

        多智能体系统在分布式人工智能、机器人协作、智能决策支持等领域有广泛应用,随着物联网和边缘计算的发展,其重要性将进一步提升。

图 10:智能体与多智能体系统知识思维导图

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-01-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 9.1 智能体的概念与结构
    • 智能体的概念
    • 智能体的特性
    • 智能体的结构
    • 反应式 Agent
    • 慎思式 Agent
    • 复合式 Agent
    • Agent 的应用
  • 9.2 多智能体系统的概念与结构
    • 多智能体系统的特点
    • 多智能体系统的基本类型
    • 多智能体系统的体系结构
  • 9.3 多智能体系统的通信
    • 智能体通信的类型
    • Agent 通信的方式
    • 智能体通信语言
  • 9.4 多智能体系统的协调
  • 9.5 多智能体系统的协作
    • 多智能体的协作类型
    • 合同网协作方法
    • 黑板模型协作方法
    • 市场机制协作方法
  • 9.6 多智能体系统的协商
  • 9.7 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档