
大家好!今天我们来深入学习《人工智能导论》第 9 章的内容 —— 智能体与多智能体系统。这一章内容非常重要,因为智能体 (Agent) 是人工智能领域中一个核心概念,而多智能体系统 (MAS) 则广泛应用于分布式问题求解、机器人协作、智能决策等多个领域。

智能体 (Agent) 是人工智能领域的核心概念之一,可以理解为能够自主感知环境、做出决策并执行动作的实体。
智能体是处于某个环境中的实体,它可以通过传感器感知环境,通过执行器作用于环境,并能根据自身的目标自主决策。

图 1:智能体与环境交互示意图
智能体通常具有以下特性:
智能体的基本结构可以分为以下几个部分:

图 2:智能体结构思维导图

反应式 Agent (Reactive Agent) 是最简单的智能体类型,它没有内部状态,仅根据当前环境的感知做出反应,遵循 "感知 - 行动" 规则。
class ReactiveAgent:
"""反应式Agent实现"""
def __init__(self, name):
self.name = name
def perceive(self, environment):
"""感知环境,返回当前环境状态"""
return environment.get_state()
def decide(self, state):
"""根据当前状态做出决策"""
# 简单的规则库:根据不同状态返回不同动作
if state == "危险":
return "逃跑"
elif state == "安全":
return "探索"
elif state == "发现食物":
return "进食"
else:
return "静止"
def act(self, environment):
"""执行动作"""
# 1. 感知环境
state = self.perceive(environment)
print(f"{self.name}感知到环境状态:{state}")
# 2. 做出决策
action = self.decide(state)
print(f"{self.name}决定采取行动:{action}")
# 3. 执行动作并影响环境
environment.update(self.name, action)
return action
class Environment:
"""环境类,模拟Agent所处的环境"""
def __init__(self):
self.state = "安全" # 初始环境状态
self.history = [] # 记录环境历史
def get_state(self):
"""返回当前环境状态"""
return self.state
def update(self, agent_name, action):
"""根据Agent的动作更新环境状态"""
self.history.append(f"{agent_name}执行了{action}动作")
# 简单的环境状态转换规则
if action == "探索" and self.state == "安全":
# 探索时有一定概率发现食物
import random
if random.random() < 0.3: # 30%概率发现食物
self.state = "发现食物"
elif action == "进食":
self.state = "安全"
elif action == "逃跑":
self.state = "安全"
# 测试反应式Agent
if __name__ == "__main__":
# 创建环境
env = Environment()
# 创建反应式Agent
robot = ReactiveAgent("机器人Agent")
# 模拟5个时间步的行为
for i in range(5):
print(f"\n--- 时间步 {i+1} ---")
robot.act(env)
print(f"当前环境状态:{env.get_state()}")

慎思式 Agent (Deliberative Agent) 也称为认知 Agent,它拥有内部状态和目标,能够进行规划和推理,根据当前状态和目标做出决策。
class DeliberativeAgent:
"""慎思式Agent实现"""
def __init__(self, name):
self.name = name
self.beliefs = {} # 信念库,存储Agent对世界的认知
self.desires = [] # 愿望/目标列表
self.intentions = [] # 意图,即Agent承诺要实现的目标
def perceive(self, environment):
"""感知环境,更新信念库"""
state = environment.get_state()
self.beliefs['current_state'] = state
print(f"{self.name}感知到环境状态:{state}")
return state
def generate_desires(self):
"""生成愿望/目标"""
# 根据当前信念生成合理的目标
if self.beliefs.get('current_state') == "低电量":
self.desires = ["寻找充电器", "节省能源"]
elif self.beliefs.get('current_state') == "有任务":
self.desires = ["完成任务"]
else:
self.desires = ["保持待机", "监控环境"]
print(f"{self.name}生成愿望:{self.desires}")
def select_intentions(self):
"""从愿望中选择意图(承诺要实现的目标)"""
# 简单的意图选择策略:选择第一个愿望
if self.desires:
self.intentions = [self.desires[0]]
else:
self.intentions = []
print(f"{self.name}选择意图:{self.intentions}")
def plan(self):
"""根据意图制定计划"""
plans = []
for intention in self.intentions:
if intention == "寻找充电器":
plans.append(["移动到充电站", "连接充电器", "开始充电"])
elif intention == "完成任务":
plans.append(["分析任务", "执行任务步骤", "检查任务结果"])
elif intention == "保持待机":
plans.append(["降低功耗", "定期感知环境"])
print(f"{self.name}制定计划:{plans}")
return plans
def act(self, environment):
"""执行动作"""
# 1. 感知环境,更新信念
self.perceive(environment)
# 2. 生成愿望
self.generate_desires()
# 3. 选择意图
self.select_intentions()
# 4. 制定计划
plans = self.plan()
# 5. 执行计划中的第一个动作
if plans and plans[0]:
action = plans[0][0]
print(f"{self.name}执行动作:{action}")
environment.update(self.name, action)
return action
return None
class Environment:
"""环境类,模拟Agent所处的环境"""
def __init__(self):
self.state = "正常" # 初始环境状态
self.history = [] # 记录环境历史
def get_state(self):
"""返回当前环境状态"""
return self.state
def update(self, agent_name, action):
"""根据Agent的动作更新环境状态"""
self.history.append(f"{agent_name}执行了{action}动作")
# 环境状态转换规则
if action == "移动到充电站":
self.state = "充电站附近"
elif action == "连接充电器":
self.state = "充电中"
elif action == "开始充电":
self.state = "电量恢复"
elif action == "分析任务":
self.state = "任务分析完成"
# 测试慎思式Agent
if __name__ == "__main__":
# 创建环境
env = Environment()
# 创建慎思式Agent
ai_agent = DeliberativeAgent("智能助手Agent")
# 模拟不同环境状态下的Agent行为
print("--- 初始环境状态:正常 ---")
ai_agent.act(env)
print("\n--- 改变环境状态为:低电量 ---")
env.state = "低电量"
ai_agent.act(env)
print("\n--- 改变环境状态为:有任务 ---")
env.state = "有任务"
ai_agent.act(env)

图 3:慎思式 Agent 工作流程图
复合式 Agent (Hybrid Agent) 结合了反应式 Agent 和慎思式 Agent 的优点,既能够对环境变化做出快速反应,又能够进行复杂的推理和规划。
class HybridAgent:
"""复合式Agent实现"""
def __init__(self, name):
self.name = name
# 反应式部分
self.reactive_rules = {
"紧急情况": "立即停止并报警",
"障碍物": "避开障碍物",
"危险接近": "迅速撤离"
}
# 慎思式部分
self.beliefs = {} # 信念库
self.goals = [] # 目标列表
self.plan = [] # 计划
def reactive_layer(self, state):
"""反应层:快速响应紧急情况"""
for condition, action in self.reactive_rules.items():
if condition in state:
print(f"{self.name}反应层触发:{condition} -> {action}")
return action
return None
def deliberative_layer(self, environment):
"""慎思层:进行推理和规划"""
# 更新信念
self.beliefs['current_state'] = environment.get_state()
# 设置目标
if "清洁任务" in self.beliefs['current_state']:
self.goals = ["完成清洁任务"]
elif "充电" in self.beliefs['current_state']:
self.goals = ["找到充电器", "充电至80%"]
else:
self.goals = ["巡逻", "监控异常"]
# 制定计划
self.plan = []
for goal in self.goals:
if goal == "完成清洁任务":
self.plan.extend(["规划清洁路线", "开始清洁", "检查清洁效果"])
elif goal == "找到充电器":
self.plan.extend(["搜索充电器位置", "前往充电器", "连接充电"])
print(f"{self.name}慎思层:目标={self.goals}, 计划={self.plan}")
return self.plan[0] if self.plan else None
def act(self, environment):
"""Agent的行为:先检查反应层,再调用慎思层"""
state = environment.get_state()
print(f"\n{self.name}感知到状态:{state}")
# 1. 先检查反应层,看是否有紧急情况需要处理
action = self.reactive_layer(state)
if action:
environment.update(self.name, action)
return action
# 2. 如果没有紧急情况,调用慎思层
action = self.deliberative_layer(environment)
if action:
environment.update(self.name, action)
return action
return "待机"
class Environment:
"""环境类"""
def __init__(self):
self.state = "正常运行"
def get_state(self):
return self.state
def update(self, agent_name, action):
print(f"环境因{agent_name}的{action}动作而更新")
# 根据动作简单更新环境
if action == "避开障碍物":
self.state = "障碍物已避开,继续前进"
elif action == "开始清洁":
self.state = "清洁进行中"
elif action == "连接充电":
self.state = "充电中"
# 测试复合式Agent
if __name__ == "__main__":
env = Environment()
robot = HybridAgent("清洁机器人")
# 测试正常情况(触发慎思层)
robot.act(env)
# 测试紧急情况(触发反应层)
env.state = "发现障碍物,紧急情况"
robot.act(env)
# 测试另一正常情况
env.state = "有清洁任务,区域A"
robot.act(env)

图 4:复合式 Agent 结构流程图
智能体在多个领域都有广泛应用,下面是一个智能家居控制 Agent 的综合案例:
class HomeEnvironment:
"""家庭环境类,模拟智能家居环境状态"""
def __init__(self):
self.temperature = 24 # 温度,单位℃
self.light = "off" # 灯光状态:on/off
self.humidity = 45 # 湿度,百分比
self.security = "normal" # 安防状态:normal/alarm
self.people_in_room = 0 # 房间人数
def get_state(self):
"""返回当前环境状态"""
return {
"temperature": self.temperature,
"light": self.light,
"humidity": self.humidity,
"security": self.security,
"people_in_room": self.people_in_room
}
def update(self, action):
"""根据Agent的动作更新环境"""
print(f"环境执行动作:{action}")
if action == "turn_on_light":
self.light = "on"
elif action == "turn_off_light":
self.light = "off"
elif action.startswith("set_temperature"):
temp = int(action.split(":")[1])
self.temperature = temp
elif action == "sound_alarm":
self.security = "alarm"
class SmartHomeAgent:
"""智能家居控制Agent"""
def __init__(self):
# 反应式规则:处理紧急情况和简单响应
self.reactive_rules = {
("security", "alarm"): "sound_alarm",
("people_in_room", 0): "turn_off_light",
("temperature", lambda x: x > 28): "set_temperature:26",
("temperature", lambda x: x < 18): "set_temperature:20"
}
# 慎思式部分:处理复杂规划
self.schedules = { # 定时任务
"morning": {"time": "07:00", "action": "turn_on_light"},
"night": {"time": "22:30", "action": "turn_off_light"}
}
def perceive(self, env):
"""感知环境状态"""
return env.get_state()
def reactive_response(self, state):
"""反应式响应"""
for (key, condition), action in self.reactive_rules.items():
value = state[key]
# 检查条件是否满足
if isinstance(condition, tuple):
if value in condition:
return action
elif callable(condition):
if condition(value):
return action
elif value == condition:
return action
return None
def deliberative_planning(self, state, current_time):
"""慎思式规划"""
# 检查是否到了定时任务时间
for schedule in self.schedules.values():
if schedule["time"] == current_time:
# 检查是否需要执行(例如晚上关灯前检查是否还有人)
if schedule["action"] == "turn_off_light" and state["people_in_room"] > 0:
continue # 有人则不执行关灯
return schedule["action"]
# 根据房间人数调整灯光和温度
if state["people_in_room"] > 0 and state["light"] == "off":
return "turn_on_light"
return None
def act(self, env, current_time):
"""执行动作"""
state = self.perceive(env)
print(f"\n当前环境状态:{state}")
# 1. 先检查反应式响应(优先级高)
action = self.reactive_response(state)
if action:
print(f"反应式响应:{action}")
env.update(action)
return action
# 2. 执行慎思式规划
action = self.deliberative_planning(state, current_time)
if action:
print(f"慎思式规划:{action}")
env.update(action)
return action
print("无动作需要执行")
return None
# 测试智能家居Agent
if __name__ == "__main__":
home_env = HomeEnvironment()
agent = SmartHomeAgent()
# 模拟一天中的几个时间点
test_times = [
("06:30", {"people_in_room": 0, "temperature": 30}),
("07:00", {"people_in_room": 1, "temperature": 24}),
("14:00", {"people_in_room": 2, "temperature": 17}),
("22:30", {"people_in_room": 0, "temperature": 24}),
("23:00", {"people_in_room": 1, "security": "alarm"})
]
for time, changes in test_times:
# 更新环境状态
for key, value in changes.items():
setattr(home_env, key, value)
print(f"--- 时间:{time} ---")
agent.act(home_env, time)
多智能体系统 (Multi-Agent System, MAS) 是由多个相互作用、相互协作的智能体组成的系统。这些智能体可以是同质的(具有相同功能),也可以是异质的(具有不同功能)。
多智能体系统具有以下特点:
根据智能体之间的关系和协作方式,多智能体系统可以分为以下几种基本类型:
多智能体系统的体系结构主要有以下几种:
下面是一个分布式多智能体系统的实现案例,模拟一个简单的物流配送系统:
class Package:
"""包裹类"""
def __init__(self, package_id, source, destination):
self.package_id = package_id
self.source = source # 起始位置
self.destination = destination # 目标位置
self.status = "pending" # 状态:pending/in_transit/delivered
def __str__(self):
return f"包裹{self.package_id}: {self.source} -> {self.destination} ({self.status})"
class DeliveryAgent:
"""配送智能体"""
def __init__(self, agent_id, location):
self.agent_id = agent_id
self.location = location # 当前位置
self.load = [] # 装载的包裹
self.capacity = 3 # 最大装载量
def receive_message(self, message, sender):
"""接收消息"""
print(f"Agent {self.agent_id} 收到来自 Agent {sender.agent_id} 的消息: {message}")
if message["type"] == "request_delivery":
# 处理配送请求
return self.handle_delivery_request(message["package"])
elif message["type"] == "status_update":
# 处理状态更新
return {"status": "received"}
return {"status": "unknown_message"}
def handle_delivery_request(self, package):
"""处理配送请求"""
# 检查是否有容量
if len(self.load) < self.capacity:
self.load.append(package)
package.status = "in_transit"
return {"status": "accepted", "agent_id": self.agent_id}
else:
return {"status": "rejected", "reason": "full_capacity"}
def deliver(self):
"""执行配送"""
delivered = []
for package in self.load[:]: # 使用副本迭代
# 简化:假设到达目的地
print(f"Agent {self.agent_id} 配送包裹 {package.package_id} 到 {package.destination}")
package.status = "delivered"
delivered.append(package)
self.load.remove(package)
return delivered
def __str__(self):
return f"DeliveryAgent {self.agent_id} (位置: {self.location}, 负载: {len(self.load)}/{self.capacity})"
class DispatcherAgent:
"""调度智能体"""
def __init__(self, agent_id):
self.agent_id = agent_id
self.delivery_agents = [] # 配送智能体列表
self.packages = [] # 待配送包裹
def register_agent(self, agent):
"""注册配送智能体"""
self.delivery_agents.append(agent)
print(f"Dispatcher {self.agent_id} 注册了 Agent {agent.agent_id}")
def add_package(self, package):
"""添加待配送包裹"""
self.packages.append(package)
print(f"Dispatcher 收到新包裹: {package}")
# 立即尝试分配
self.assign_package(package)
def assign_package(self, package):
"""分配包裹给配送智能体"""
# 简单策略:依次询问每个智能体
for agent in self.delivery_agents:
# 发送配送请求
response = agent.receive_message({
"type": "request_delivery",
"package": package
}, self)
if response["status"] == "accepted":
print(f"包裹 {package.package_id} 已分配给 Agent {response['agent_id']}")
return
print(f"包裹 {package.package_id} 分配失败,所有Agent均已满载")
# 测试多智能体物流系统
if __name__ == "__main__":
# 创建调度智能体
dispatcher = DispatcherAgent("D1")
# 创建配送智能体
agent1 = DeliveryAgent("A1", "仓库A")
agent2 = DeliveryAgent("A2", "仓库B")
# 注册配送智能体
dispatcher.register_agent(agent1)
dispatcher.register_agent(agent2)
# 添加包裹
packages = [
Package(1, "仓库A", "地址1"),
Package(2, "仓库A", "地址2"),
Package(3, "仓库A", "地址3"),
Package(4, "仓库A", "地址4"), # 这会导致第一个Agent满载
Package(5, "仓库A", "地址5")
]
for pkg in packages:
dispatcher.add_package(pkg)
# 执行配送
print("\n--- 开始配送 ---")
print(agent1)
print(agent2)
print("\n--- 配送结果 ---")
agent1.deliver()
agent2.deliver()

图 5:分布式物流多智能体系统结构
智能体之间的通信是多智能体系统协作的基础,有效的通信机制能够提高系统的整体性能。
智能体通信主要分为以下几种类型:
常见的智能体通信方式包括:
为了使不同智能体能够理解彼此的消息,需要定义统一的通信语言。最著名的是 FIPA ACL(Agent Communication Language)。
下面是一个实现了多种通信方式的多智能体系统案例:
import time
from collections import defaultdict
class CommunicationAgent:
"""支持多种通信方式的智能体"""
def __init__(self, agent_id):
self.agent_id = agent_id
self.subscriptions = set() # 订阅的主题
self.message_queue = [] # 消息队列(用于异步通信)
def send_sync(self, receiver, message):
"""同步通信:等待响应"""
print(f"\n[同步通信] Agent {self.agent_id} -> Agent {receiver.agent_id}: {message['content']}")
# 添加消息元数据
message['sender'] = self.agent_id
message['timestamp'] = time.time()
# 发送并等待响应
response = receiver.receive_message(message)
return response
def send_async(self, receiver, message):
"""异步通信:不等待响应"""
print(f"\n[异步通信] Agent {self.agent_id} -> Agent {receiver.agent_id}: {message['content']}")
message['sender'] = self.agent_id
message['timestamp'] = time.time()
# 将消息放入接收者的消息队列
receiver.message_queue.append(message)
return {"status": "sent"}
def broadcast(self, agents, message):
"""广播通信:向多个智能体发送消息"""
print(f"\n[广播通信] Agent {self.agent_id} 向 {len(agents)} 个Agent发送: {message['content']}")
message['sender'] = self.agent_id
message['timestamp'] = time.time()
message['type'] = 'broadcast'
responses = []
for agent in agents:
if agent.agent_id != self.agent_id: # 不向自己广播
responses.append(agent.receive_message(message))
return responses
def subscribe(self, topic, broker):
"""订阅主题"""
self.subscriptions.add(topic)
broker.register_subscription(topic, self)
print(f"Agent {self.agent_id} 订阅了主题: {topic}")
def receive_message(self, message):
"""接收消息并处理"""
print(f"Agent {self.agent_id} 收到消息: {message['content']}")
# 处理消息
if message.get('type') == 'query':
return self.handle_query(message)
elif message.get('type') == 'request':
return self.handle_request(message)
else:
return {"status": "received", "receiver": self.agent_id}
def handle_query(self, message):
"""处理查询消息"""
query = message['content']
# 简单的查询处理
if "状态" in query:
return {"status": "response", "content": f"Agent {self.agent_id} 状态正常"}
return {"status": "response", "content": "无法理解查询"}
def handle_request(self, message):
"""处理请求消息"""
request = message['content']
# 简单的请求处理
if "协作" in request:
return {"status": "accepted", "content": f"Agent {self.agent_id} 同意协作"}
return {"status": "rejected", "content": "无法满足请求"}
def process_queue(self):
"""处理消息队列中的异步消息"""
print(f"\nAgent {self.agent_id} 处理 {len(self.message_queue)} 条异步消息")
responses = []
while self.message_queue:
message = self.message_queue.pop(0)
response = self.receive_message(message)
responses.append(response)
return responses
class MessageBroker:
"""消息代理,用于发布-订阅模式"""
def __init__(self):
self.subscriptions = defaultdict(list) # 主题 -> 订阅者列表
def register_subscription(self, topic, agent):
"""注册订阅"""
self.subscriptions[topic].append(agent)
def publish(self, topic, message, sender):
"""发布消息到主题"""
print(f"\n[发布-订阅] Agent {sender.agent_id} 发布到主题 {topic}: {message['content']}")
message['sender'] = sender.agent_id
message['topic'] = topic
message['timestamp'] = time.time()
# 向所有订阅者发送消息
for agent in self.subscriptions.get(topic, []):
if agent.agent_id != sender.agent_id: # 不向发布者自己发送
agent.message_queue.append(message)
return {"status": "published", "topic": topic}
# 测试多智能体通信系统
if __name__ == "__main__":
# 创建智能体
agent1 = CommunicationAgent("A1")
agent2 = CommunicationAgent("A2")
agent3 = CommunicationAgent("A3")
agents = [agent1, agent2, agent3]
# 创建消息代理
broker = MessageBroker()
# 测试同步通信
response = agent1.send_sync(agent2, {
"type": "query",
"content": "你的当前状态是什么?"
})
print("同步通信响应:", response)
# 测试异步通信
response = agent1.send_async(agent3, {
"type": "request",
"content": "请完成数据收集任务"
})
print("异步通信响应:", response)
# 处理异步消息
print("\n处理异步消息:")
agent3.process_queue()
# 测试广播通信
responses = agent2.broadcast(agents, {
"content": "我已完成初始化,准备接受任务"
})
print("广播通信响应:", responses)
# 测试发布-订阅模式
# 订阅主题
agent1.subscribe("weather", broker)
agent3.subscribe("weather", broker)
# 发布消息
broker.publish("weather", {
"content": "明天有雨,请做好准备"
}, agent2)
# 处理订阅消息
print("\n处理订阅消息:")
agent1.process_queue()
agent3.process_queue()

图 6:多智能体通信方式对比图
多智能体系统的协调 (Coordination) 是指智能体之间通过调整各自的行为,以避免冲突并实现系统整体目标的过程。
协调机制的主要目标包括:
下面是一个交通路口智能体协调系统的实现案例:
class TrafficLightAgent:
"""交通灯智能体"""
def __init__(self, agent_id, direction):
self.agent_id = agent_id
self.direction = direction # 方向:南北/东西
self.state = "red" # 初始状态:红灯
self.timer = 0 # 计时器
self.green_duration = 30 # 绿灯持续时间(秒)
self.yellow_duration = 5 # 黄灯持续时间(秒)
def change_state(self, new_state):
"""改变交通灯状态"""
if new_state != self.state:
print(f"交通灯 {self.agent_id} ({self.direction}) 从 {self.state} 变为 {new_state}")
self.state = new_state
self.timer = 0 # 重置计时器
def update(self):
"""更新交通灯状态(基于时间)"""
self.timer += 1
# 自动状态转换(无协调情况下)
if self.state == "green" and self.timer >= self.green_duration:
self.change_state("yellow")
elif self.state == "yellow" and self.timer >= self.yellow_duration:
self.change_state("red")
def send_message(self, receiver, message):
"""发送消息给其他交通灯"""
print(f"交通灯 {self.agent_id} 发送给 {receiver.agent_id} 消息: {message}")
return receiver.receive_message(message, self)
def receive_message(self, message, sender):
"""接收消息"""
if message["type"] == "request_green":
# 处理绿灯请求
if self.state == "red":
# 如果当前是红灯,可以同意请求
return {"status": "accepted"}
else:
# 如果当前不是红灯,拒绝请求
return {
"status": "rejected",
"current_state": self.state,
"available_time": self.green_duration - self.timer + self.yellow_duration
}
elif message["type"] == "grant_green":
# 收到绿灯许可
self.change_state("green")
return {"status": "received"}
return {"status": "ok"}
def __str__(self):
return f"TrafficLight {self.agent_id} ({self.direction}): {self.state} (计时: {self.timer}s)"
class TrafficCoordinator:
"""交通协调器"""
def __init__(self):
self.traffic_lights = []
self.queue = [] # 等待绿灯的队列
def register_light(self, traffic_light):
"""注册交通灯"""
self.traffic_lights.append(traffic_light)
print(f"协调器注册了交通灯: {traffic_light.agent_id}")
def request_coordination(self, requester):
"""处理协调请求"""
print(f"\n协调器收到 {requester.agent_id} 的协调请求")
# 检查是否有其他交通灯处于绿灯状态
for light in self.traffic_lights:
if light.agent_id != requester.agent_id and light.state == "green":
# 请求当前绿灯的交通灯是否可以让行
response = requester.send_message(light, {"type": "request_green"})
if response["status"] == "rejected":
# 需要等待
wait_time = response["available_time"]
print(f"需要等待 {wait_time} 秒后才能为 {requester.agent_id} 变绿灯")
return {"status": "wait", "time": wait_time}
# 如果没有其他绿灯,可以直接变绿灯
requester.change_state("green")
return {"status": "granted"}
def update(self):
"""更新协调状态"""
# 检查是否有交通灯需要协调
for light in self.traffic_lights:
if light.state == "red" and light.timer > 40: # 红灯时间过长
# 请求协调
self.request_coordination(light)
# 测试交通灯协调系统
if __name__ == "__main__":
# 创建交通灯
north_south = TrafficLightAgent("TL1", "南北方向")
east_west = TrafficLightAgent("TL2", "东西方向")
# 创建协调器
coordinator = TrafficCoordinator()
coordinator.register_light(north_south)
coordinator.register_light(east_west)
# 初始状态设置
north_south.change_state("green")
east_west.change_state("red")
# 模拟时间流逝
print("\n--- 模拟交通灯运行 ---")
for i in range(1, 60):
if i % 10 == 0: # 每10秒打印一次状态
print(f"\n--- 时间 {i} 秒 ---")
print(north_south)
print(east_west)
# 调用协调器
coordinator.update()
# 更新交通灯状态
north_south.update()
east_west.update()

图 7:交通灯协调流程图
多智能体系统的协作 (Collaboration) 是指多个智能体为了实现共同目标而共同工作的过程。
常见的多智能体协作类型包括:
合同网 (Contract Net) 是一种经典的多智能体协作方法,通过 "招标 - 投标 - 中标" 的过程分配任务。
import random
class ContractorAgent:
"""承包商智能体,能够执行任务并投标"""
def __init__(self, agent_id, capabilities):
self.agent_id = agent_id
self.capabilities = capabilities # 能力列表
self.current_task = None # 当前任务
self.busy = False # 是否忙碌
def receive_message(self, message, sender):
"""接收消息"""
if message["type"] == "call_for_proposal":
# 处理招标信息
return self.handle_cfp(message, sender)
elif message["type"] == "award_contract":
# 处理中标通知
return self.handle_award(message)
elif message["type"] == "cancel_contract":
# 处理取消合同
return self.handle_cancel(message)
return {"status": "unknown_message"}
def handle_cfp(self, message, sender):
"""处理招标信息"""
task = message["task"]
print(f"Agent {self.agent_id} 收到招标: {task['name']}")
# 检查是否有能力完成任务且不忙碌
if not self.busy and any(cap in self.capabilities for cap in task["required_capabilities"]):
# 计算投标价格(简化:基于任务复杂度)
cost = random.randint(10, 50) * task["complexity"]
print(f"Agent {self.agent_id} 投标: {cost}")
return {
"status": "proposal",
"agent_id": self.agent_id,
"cost": cost,
"estimated_time": task["complexity"] * 5
}
else:
print(f"Agent {self.agent_id} 拒绝投标")
return {"status": "reject"}
def handle_award(self, message):
"""处理中标通知"""
self.current_task = message["task"]
self.busy = True
print(f"Agent {self.agent_id} 中标: {self.current_task['name']}")
return {"status": "accepted"}
def handle_cancel(self, message):
"""处理取消合同"""
self.current_task = None
self.busy = False
print(f"Agent {self.agent_id} 的合同被取消")
return {"status": "cancelled"}
def execute_task(self):
"""执行任务"""
if self.current_task:
print(f"Agent {self.agent_id} 正在执行任务: {self.current_task['name']}")
# 简化:任务执行完成
task = self.current_task
self.current_task = None
self.busy = False
return {
"status": "completed",
"task_id": task["id"],
"agent_id": self.agent_id
}
return {"status": "no_task"}
class ManagerAgent:
"""经理智能体,负责任务分配"""
def __init__(self, agent_id):
self.agent_id = agent_id
self.contractors = [] # 承包商列表
self.tasks = [] # 任务列表
self.active_contracts = {} # 活跃合同
def register_contractor(self, contractor):
"""注册承包商"""
self.contractors.append(contractor)
print(f"Manager {self.agent_id} 注册了承包商: {contractor.agent_id}")
def add_task(self, task):
"""添加任务"""
self.tasks.append(task)
print(f"Manager {self.agent_id} 收到新任务: {task['name']}")
# 立即启动招标
self.call_for_proposals(task)
def call_for_proposals(self, task):
"""发起招标"""
print(f"\nManager {self.agent_id} 为任务 {task['name']} 发起招标")
# 向所有承包商发送招标信息
proposals = []
for contractor in self.contractors:
response = contractor.receive_message({
"type": "call_for_proposal",
"task": task,
"deadline": 5 # 投标截止时间(简化)
}, self)
if response["status"] == "proposal":
proposals.append(response)
# 评估投标并授予合同
if proposals:
# 选择价格最低的投标
best_proposal = min(proposals, key=lambda x: x["cost"])
self.award_contract(task, best_proposal["agent_id"])
else:
print(f"任务 {task['name']} 没有收到任何投标")
def award_contract(self, task, agent_id):
"""授予合同"""
# 找到中标Agent
for contractor in self.contractors:
if contractor.agent_id == agent_id:
# 发送中标通知
response = contractor.receive_message({
"type": "award_contract",
"task": task
}, self)
if response["status"] == "accepted":
self.active_contracts[task["id"]] = {
"task": task,
"contractor": agent_id,
"status": "active"
}
print(f"任务 {task['name']} 已授予 Agent {agent_id}")
break
# 测试合同网协作系统
if __name__ == "__main__":
# 创建承包商智能体
agent1 = ContractorAgent("A1", ["数据收集", "数据分析"])
agent2 = ContractorAgent("A2", ["模型训练", "数据分析"])
agent3 = ContractorAgent("A3", ["报告生成", "数据可视化"])
# 创建经理智能体
manager = ManagerAgent("M1")
# 注册承包商
manager.register_contractor(agent1)
manager.register_contractor(agent2)
manager.register_contractor(agent3)
# 添加任务
tasks = [
{
"id": 1,
"name": "市场数据分析",
"required_capabilities": ["数据收集", "数据分析"],
"complexity": 3
},
{
"id": 2,
"name": "用户行为预测模型",
"required_capabilities": ["数据分析", "模型训练"],
"complexity": 5
},
{
"id": 3,
"name": "季度报告生成",
"required_capabilities": ["报告生成", "数据可视化"],
"complexity": 2
}
]
for task in tasks:
manager.add_task(task)
# 执行任务
print("\n--- 开始执行任务 ---")
results = []
results.append(agent1.execute_task())
results.append(agent2.execute_task())
results.append(agent3.execute_task())
print("\n--- 任务执行结果 ---")
for result in results:
print(result)
黑板模型 (Blackboard Model) 是一种通过共享 "黑板" 进行间接协作的方法,智能体可以读取黑板上的信息并写入自己的结果。
class Blackboard:
"""黑板,用于存储和共享信息"""
def __init__(self):
self.data = {} # 存储数据
self.tasks = [] # 任务列表
self.results = {} # 结果存储
def write_data(self, key, value, agent_id):
"""写入数据"""
self.data[key] = {
"value": value,
"agent_id": agent_id,
"timestamp": len(self.data) # 简单的时间戳
}
print(f"黑板: Agent {agent_id} 写入数据 {key} = {value}")
def read_data(self, key):
"""读取数据"""
if key in self.data:
return self.data[key]["value"]
return None
def add_task(self, task, agent_id):
"""添加任务"""
self.tasks.append({
"task_id": len(self.tasks) + 1,
"description": task,
"status": "pending",
"creator": agent_id,
"assignee": None
})
print(f"黑板: Agent {agent_id} 添加任务: {task}")
def claim_task(self, agent_id, capabilities):
"""认领任务"""
for task in self.tasks:
if task["status"] == "pending":
# 简单检查:任务描述是否匹配能力
if any(cap in task["description"].lower() for cap in capabilities):
task["status"] = "in_progress"
task["assignee"] = agent_id
print(f"Agent {agent_id} 认领了任务: {task['description']}")
return task
return None
def submit_result(self, task_id, result, agent_id):
"""提交结果"""
self.results[task_id] = {
"result": result,
"agent_id": agent_id,
"timestamp": len(self.results)
}
# 更新任务状态
for task in self.tasks:
if task["task_id"] == task_id:
task["status"] = "completed"
print(f"Agent {agent_id} 完成了任务 {task_id}: {task['description']}")
break
class BlackboardAgent:
"""基于黑板模型的智能体"""
def __init__(self, agent_id, capabilities):
self.agent_id = agent_id
self.capabilities = capabilities # 能力列表
self.blackboard = None # 黑板引用
def set_blackboard(self, blackboard):
"""设置黑板引用"""
self.blackboard = blackboard
def contribute_data(self, key, value):
"""向黑板贡献数据"""
if self.blackboard:
self.blackboard.write_data(key, value, self.agent_id)
def create_task(self, task_description):
"""创建任务"""
if self.blackboard:
self.blackboard.add_task(task_description, self.agent_id)
def work(self):
"""执行工作:认领并完成任务"""
if not self.blackboard:
return
# 认领任务
task = self.blackboard.claim_task(self.agent_id, self.capabilities)
if task:
# 执行任务(简化)
print(f"Agent {self.agent_id} 正在执行任务: {task['description']}")
# 模拟任务执行
result = f"{task['description']} 的执行结果"
# 提交结果
self.blackboard.submit_result(task["task_id"], result, self.agent_id)
# 可能产生新数据
self.contribute_data(f"task_{task['task_id']}_result", result)
# 可能创建新任务
if "分析" in task["description"]:
self.create_task(f"基于 {task['task_id']} 的结果生成报告")
# 测试黑板模型协作系统
if __name__ == "__main__":
# 创建黑板
blackboard = Blackboard()
# 创建智能体
data_agent = BlackboardAgent("A1", ["收集", "数据"])
analysis_agent = BlackboardAgent("A2", ["分析", "处理"])
report_agent = BlackboardAgent("A3", ["报告", "总结", "可视化"])
# 设置黑板
data_agent.set_blackboard(blackboard)
analysis_agent.set_blackboard(blackboard)
report_agent.set_blackboard(blackboard)
# 初始数据贡献
data_agent.contribute_data("raw_sales_data", [1500, 2300, 1800, 3200])
data_agent.contribute_data("customer_count", 450)
# 创建初始任务
data_agent.create_task("分析销售数据趋势")
# 智能体工作
print("\n--- 智能体开始工作 ---")
data_agent.work()
analysis_agent.work()
report_agent.work()
# 查看结果
print("\n--- 黑板中的结果 ---")
for task_id, result in blackboard.results.items():
print(f"任务 {task_id} 结果: {result['result']} (由 Agent {result['agent_id']} 完成)")
print("\n--- 任务状态 ---")
for task in blackboard.tasks:
print(f"任务 {task['task_id']}: {task['description']} - {task['status']} (由 Agent {task['assignee']} 执行)")
市场机制协作方法模拟经济市场,通过价格机制来分配资源和任务。
class Resource:
"""资源类"""
def __init__(self, resource_id, resource_type, quantity=1):
self.resource_id = resource_id
self.resource_type = resource_type
self.quantity = quantity
self.owner = None
def __str__(self):
return f"资源 {self.resource_id}: {self.resource_type} (数量: {self.quantity})"
class MarketAgent:
"""市场中的智能体"""
def __init__(self, agent_id, budget=100):
self.agent_id = agent_id
self.budget = budget # 预算
self.resources = [] # 拥有的资源
self.needs = [] # 需要的资源
self.offers = [] # 提供的报价
def add_resource(self, resource):
"""获得资源"""
resource.owner = self.agent_id
self.resources.append(resource)
print(f"Agent {self.agent_id} 获得了 {resource}")
def need_resource(self, resource_type, quantity=1):
"""声明需要的资源"""
self.needs.append({
"type": resource_type,
"quantity": quantity,
"fulfilled": False
})
print(f"Agent {self.agent_id} 需要 {quantity} 个 {resource_type}")
def create_offer(self, resource_id, price):
"""为资源创建报价"""
# 检查是否拥有该资源
for resource in self.resources:
if resource.resource_id == resource_id:
offer = {
"offer_id": len(self.offers) + 1,
"resource_id": resource_id,
"resource_type": resource.resource_type,
"price": price,
"seller": self.agent_id,
"status": "active"
}
self.offers.append(offer)
print(f"Agent {self.agent_id} 为 {resource} 报价 {price} 元")
return offer
return None
def make_bid(self, offer, bid_price):
"""对报价进行投标"""
if bid_price > offer["price"] and self.budget >= bid_price:
print(f"Agent {self.agent_id} 对报价 {offer['offer_id']} 投标 {bid_price} 元")
return {
"status": "valid",
"bid_price": bid_price,
"bidder": self.agent_id
}
return {
"status": "invalid",
"reason": "insufficient_budget" if self.budget < bid_price else "low_price"
}
def buy_resource(self, offer, price):
"""购买资源"""
if self.budget >= price:
# 扣钱
self.budget -= price
# 从卖家处获取资源
for seller in market_agents:
if seller.agent_id == offer["seller"]:
for resource in seller.resources[:]:
if resource.resource_id == offer["resource_id"]:
# 转移资源
seller.resources.remove(resource)
self.add_resource(resource)
# 更新预算
seller.budget += price
# 更新报价状态
for o in seller.offers:
if o["offer_id"] == offer["offer_id"]:
o["status"] = "sold"
break
print(f"交易完成: Agent {self.agent_id} 以 {price} 元购买了 {resource}")
return True
return False
class Market:
"""市场,负责管理交易"""
def __init__(self):
self.offers = [] # 所有报价
self.transactions = [] # 交易记录
def add_offer(self, offer):
"""添加报价"""
self.offers.append(offer)
print(f"市场添加了报价: {offer['offer_id']} (资源类型: {offer['resource_type']}, 价格: {offer['price']})")
def find_offers(self, resource_type):
"""查找特定类型的有效报价"""
return [
offer for offer in self.offers
if offer["resource_type"] == resource_type and offer["status"] == "active"
]
def process_bids(self):
"""处理投标,完成交易"""
# 简化:为每个资源类型找到最高投标
for resource_type in set(offer["resource_type"] for offer in self.offers if offer["status"] == "active"):
# 获取该类型的所有有效报价
type_offers = self.find_offers(resource_type)
# 对每个报价,找到最高投标
for offer in type_offers:
# 查找该报价的所有投标(简化:这里我们假设已经有投标)
# 在实际系统中,应该有投标记录
highest_bid = offer["price"] * 1.1 # 假设最高投标比报价高10%
# 找到愿意出这个价格的买家
for agent in market_agents:
if agent.agent_id != offer["seller"] and agent.budget >= highest_bid:
# 检查买家是否需要该资源
for need in agent.needs:
if need["type"] == resource_type and not need["fulfilled"]:
# 完成交易
if agent.buy_resource(offer, highest_bid):
need["fulfilled"] = True
self.transactions.append({
"transaction_id": len(self.transactions) + 1,
"offer_id": offer["offer_id"],
"buyer": agent.agent_id,
"seller": offer["seller"],
"price": highest_bid,
"resource_type": resource_type
})
break
# 测试市场机制协作系统
if __name__ == "__main__":
# 创建市场智能体
agent1 = MarketAgent("A1", budget=200)
agent2 = MarketAgent("A2", budget=300)
agent3 = MarketAgent("A3", budget=150)
market_agents = [agent1, agent2, agent3]
# 创建资源
resources = [
Resource(1, "计算资源"),
Resource(2, "存储资源"),
Resource(3, "网络带宽"),
Resource(4, "计算资源")
]
# 分配初始资源
agent1.add_resource(resources[0])
agent1.add_resource(resources[1])
agent2.add_resource(resources[2])
agent3.add_resource(resources[3])
# 声明需求
agent1.need_resource("网络带宽")
agent2.need_resource("计算资源")
agent3.need_resource("存储资源")
# 创建市场
market = Market()
# 创建报价
offer1 = agent1.create_offer(1, 50) # 出售计算资源
offer2 = agent1.create_offer(2, 30) # 出售存储资源
offer3 = agent2.create_offer(3, 40) # 出售网络带宽
offer4 = agent3.create_offer(4, 60) # 出售计算资源
# 将报价添加到市场
market.add_offer(offer1)
market.add_offer(offer2)
market.add_offer(offer3)
market.add_offer(offer4)
# 处理投标和交易
print("\n--- 开始市场交易 ---")
market.process_bids()
# 显示最终状态
print("\n--- 最终状态 ---")
for agent in market_agents:
print(f"\nAgent {agent.agent_id}:")
print(f" 预算: {agent.budget} 元")
print(" 拥有资源:")
for resource in agent.resources:
print(f" - {resource}")
print(" 需求满足情况:")
for need in agent.needs:
status = "已满足" if need["fulfilled"] else "未满足"
print(f" - {need['quantity']} 个 {need['type']}: {status}")
协商 (Negotiation) 是多智能体系统中解决冲突和分配资源的重要机制,当智能体之间存在利益冲突或资源竞争时,通过协商达成共识。
class NegotiationAgent:
"""参与协商的智能体"""
def __init__(self, agent_id, resources, goals):
self.agent_id = agent_id
self.resources = resources # 拥有的资源(集合)
self.goals = goals # 目标列表
self.preferences = {} # 偏好:资源的重要性(1-10,越高越重要)
self.concessions = 0 # 让步次数
self.max_concessions = 3 # 最大让步次数
self.utilities = {} # 资源对自身的效用值
def set_preferences(self, preferences):
"""设置偏好(资源重要性)"""
self.preferences = preferences
# 根据偏好计算效用值
self.calculate_utilities()
print(f"Agent {self.agent_id} 的偏好设置: {preferences}")
def calculate_utilities(self):
"""根据偏好计算资源效用值"""
total = sum(self.preferences.values())
for resource, importance in self.preferences.items():
self.utilities[resource] = importance / total # 归一化到0-1之间
def get_desired_resource(self):
"""获取希望得到的资源(自己没有但重要性高的)"""
# 假设知道其他资源类型(实际系统中可能需要通过感知或通信获取)
all_resources = set(self.preferences.keys())
missing_resources = all_resources - self.resources
if not missing_resources:
return None
# 按重要性排序缺失的资源
sorted_missing = sorted(
missing_resources,
key=lambda x: self.preferences[x],
reverse=True
)
return sorted_missing[0] # 返回最重要的缺失资源
def receive_proposal(self, proposal, issue, sender):
"""接收提议并处理"""
print(f"Agent {self.agent_id} 收到 Agent {sender.agent_id} 的提议: {proposal}")
# 评估提议
if self.evaluate_proposal(proposal) > 0.6: # 效用大于0.6则接受
return {"status": "accept"}
elif self.concessions < self.max_concessions: # 还能让步则提出反提议
counter_proposal = self.make_counter_proposal(proposal, issue)
self.concessions += 1
return {"status": "counter", "proposal": counter_proposal}
else: # 无法再让步则拒绝
return {"status": "reject"}
def evaluate_proposal(self, proposal):
"""评估提议的效用"""
# 计算收益:得到请求资源的效用
gain = self.utilities.get(proposal["request"], 0) if proposal["request"] in self.preferences else 0
# 计算损失:失去提供资源的效用
loss = self.utilities.get(proposal["offer"], 0) if proposal["offer"] in self.preferences else 0
# 净效用
net_utility = gain - loss
print(f"Agent {self.agent_id} 评估提议效用: {net_utility:.2f} (收益: {gain:.2f}, 损失: {loss:.2f})")
return net_utility
def make_counter_proposal(self, original_proposal, issue):
"""提出反提议(让步)"""
# 反提议策略:逐渐降低要求,增加提供
if issue == "资源交换":
# 找到次重要的资源作为提议
sorted_resources = sorted(
self.preferences.items(),
key=lambda x: x[1]
)
# 让步:提供比原来更有价值的资源,或接受次优的请求
offer_index = min(self.concessions, len(sorted_resources) - 1)
new_offer = sorted_resources[offer_index][0]
# 调整请求:如果让步次数多,接受次优资源
desired = self.get_desired_resource()
if self.concessions >= 1:
all_resources = set(self.preferences.keys())
missing_resources = sorted(
all_resources - self.resources,
key=lambda x: self.preferences[x],
reverse=True
)
desired = missing_resources[min(1, len(missing_resources) - 1)] # 次优选择
counter_proposal = {
"offer": new_offer,
"request": desired
}
print(f"Agent {self.agent_id} 提出反提议: {counter_proposal} (让步次数: {self.concessions})")
return counter_proposal
return {}
def initiate_negotiation(self, other_agent, issue):
"""发起协商"""
print(f"\nAgent {self.agent_id} 向 Agent {other_agent.agent_id} 发起关于 {issue} 的协商")
# 提出初始提议
initial_proposal = self.make_proposal(issue)
response = other_agent.receive_proposal(initial_proposal, issue, self)
# 处理响应
if response["status"] == "accept":
print(f"Agent {self.agent_id} 的提议被接受,协商成功")
return self.execute_agreement(initial_proposal, other_agent)
elif response["status"] == "counter":
# 进入协商循环
return self.negotiation_loop(other_agent, issue, response["proposal"])
else:
print(f"Agent {self.agent_id} 的提议被拒绝,协商失败")
return {"status": "failed"}
def make_proposal(self, issue):
"""提出提议"""
# 简化:根据偏好提出提议
if issue == "资源交换":
# 找出自己不太重要但对方可能需要的资源
least_important = min(self.preferences.items(), key=lambda x: x[1])[0]
return {
"offer": least_important,
"request": self.get_desired_resource()
}
return {}
def negotiation_loop(self, other_agent, issue, proposal):
"""协商循环:处理多轮提议与反提议"""
current_proposal = proposal
round = 1
while round <= 5: # 最多5轮协商
print(f"\n--- 协商轮次 {round} ---")
# 评估对方的反提议
if self.evaluate_proposal(current_proposal) > 0.5: # 降低接受阈值
print(f"Agent {self.agent_id} 接受提议: {current_proposal}")
return self.execute_agreement(current_proposal, other_agent)
# 提出新的反提议
if self.concessions < self.max_concessions:
counter = self.make_counter_proposal(current_proposal, issue)
self.concessions += 1
# 发送反提议并获取响应
response = other_agent.receive_proposal(counter, issue, self)
if response["status"] == "accept":
print(f"Agent {self.agent_id} 的反提议被接受,协商成功")
return self.execute_agreement(counter, other_agent)
elif response["status"] == "counter":
current_proposal = response["proposal"]
else:
print(f"Agent {self.agent_id} 的反提议被拒绝,协商失败")
return {"status": "failed"}
else:
print(f"Agent {self.agent_id} 无法再让步,协商失败")
return {"status": "failed"}
round += 1
print("协商轮次耗尽,协商失败")
return {"status": "failed"}
def execute_agreement(self, proposal, other_agent):
"""执行协商达成的协议"""
# 交换资源
if proposal["offer"] in self.resources and proposal["request"] in other_agent.resources:
# 移除自己提供的资源
self.resources.remove(proposal["offer"])
# 添加获得的资源
self.resources.add(proposal["request"])
# 对方移除资源
other_agent.resources.remove(proposal["request"])
# 对方添加资源
other_agent.resources.add(proposal["offer"])
print(f"协议执行成功:Agent {self.agent_id} 用 {proposal['offer']} 交换到 {proposal['request']}")
return {
"status": "success",
"agent_resources": self.resources,
"other_resources": other_agent.resources
}
else:
print("资源交换失败:一方不拥有约定的资源")
return {"status": "failed", "reason": "resource_unavailable"}
# 测试多智能体协商系统
if __name__ == "__main__":
# 创建两个协商智能体
agent_a = NegotiationAgent(
agent_id="A",
resources={"木材", "石料"},
goals=["建造房屋", "制作工具"]
)
agent_b = NegotiationAgent(
agent_id="B",
resources={"金属", "布料"},
goals=["制作工具", "缝制衣物"]
)
# 设置资源偏好(1-10,数字越大越重要)
agent_a.set_preferences({
"木材": 3,
"石料": 4,
"金属": 10, # 非常需要金属来制作工具
"布料": 2
})
agent_b.set_preferences({
"木材": 8, # 非常需要木材
"石料": 2,
"金属": 5,
"布料": 6
})
# 发起资源交换协商
result = agent_a.initiate_negotiation(agent_b, "资源交换")
# 显示最终结果
print("\n--- 协商最终结果 ---")
print(f"状态: {result['status']}")
if result["status"] == "success":
print(f"Agent A 最终资源: {result['agent_resources']}")
print(f"Agent B 最终资源: {result['other_resources']}")

图 8:多智能体协商流程图
本章详细介绍了智能体与多智能体系统的核心概念、结构和关键技术,主要内容包括:
多智能体系统在分布式人工智能、机器人协作、智能决策支持等领域有广泛应用,随着物联网和边缘计算的发展,其重要性将进一步提升。

图 10:智能体与多智能体系统知识思维导图