
思否收录 · 开源硬件与AI Agent深度实践系列
2024年底,一个名为OpenClaw的开源项目在GitHub上悄然走红。
它不是又一个ChatGPT套壳,也不是什么大模型测评榜单的产物——它是一个AI Agent驱动的自动化养虾系统。
项目的Slogan很有意思:"Stop blindly farming, let the shrimp farm you."(别再"瞎"养了,让虾来"养"你。)
翻译成人话就是:用AI把养虾这件事从"靠经验、靠感觉"变成"靠数据、靠自动化",最终让系统替你赚钱。
OpenClaw的核心架构并不复杂:
本文将手把手带你完成OpenClaw从硬件部署到AI Agent调优的全流程,全程无保留,所有代码和配置均为生产级可用。
┌─────────────────────────────────────────────────────────────────────┐
│ 用户层(Web/App) │
│ Grafana看板 + 告警推送 + 手动控制 │
└───────────────────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ AI Agent决策层(核心) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 大模型(本地 Ollama / 云端 API) │ │
│ │ 输入:传感器数据 + 历史记录 + 养殖手册 │ │
│ │ 输出:决策指令(投喂量/增氧时长/换水预警) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 规则引擎(兜底保障) │ │
│ │ 溶解氧 < 3mg/L → 强制增氧 | 温度 > 35°C → 启动水循环 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 执行层(IoT控制) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 自动投喂 │ │ 增氧机 │ │ 水循环泵 │ │ 遮阳帘 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└───────────────────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 数据采集层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ pH传感器 │ │ 温度传感器│ │ 溶氧探头 │ │ 氨氮检测 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘组件 | 技术选型 | 说明 |
|---|---|---|
边缘网关 | Raspberry Pi 5 / Orange Pi 5 | 运行Agent和采集程序 |
实时数据流 | MQTT (Mosquitto) | 传感器数据上报 |
时序数据库 | InfluxDB 2.x | 存储传感器时序数据 |
大模型引擎 | Ollama + Qwen2.5-7B | 本地推理,无网络依赖 |
Agent框架 | LangChain4j / Python + LangGraph | 决策流程编排 |
执行控制 | 继电器模组 + GPIO | 控制硬件设备 |
可视化 | Grafana | 数据看板 + 告警 |
消息推送 | Bark / PushDeer | 手机告警推送 |
设备 | 型号 | 数量 | 单价(约) |
|---|---|---|---|
边缘网关 | Raspberry Pi 5(8GB) | 1 | ¥680 |
pH传感器 | RS485工业级 | 1 | ¥320 |
温度传感器 | DS18B20防水型 | 2 | ¥35×2 |
溶解氧探头 | 荧光法溶氧传感器 | 1 | ¥1200 |
氨氮检测 | 离子选择电极法 | 1 | ¥850 |
继电器模块 | 8路光耦隔离 | 1 | ¥65 |
投喂机 | 自动定时投喂(改装) | 1 | ¥200 |
电源 | 12V/5A防水电源 | 1 | ¥80 |
线材/防水盒 | 若干 | - | ¥200 |
合计 | ≈ ¥3665 |
注:溶氧探头是最大开销项,但这是养虾最关键的指标,不建议省钱。
# 1. 烧录系统(推荐Raspberry Pi OS Lite,无桌面环境)
# 使用Raspberry Pi Imager写入64位系统
# 2. 基础安全加固
sudo apt update && sudo apt upgrade -y
sudo raspi-config
# 配置项:启用SSH、修改默认密码、分配固定IP
# 3. 安装必要依赖
sudo apt install -y python3-pip python3-venv git mosquitto mosquitto-clients
sudo apt install -y influxdb influxdb-client grafana
# 4. 启用I2C和UART(用于传感器通信)
sudo raspi-config -> Interface Options -> I2C -> Enable
sudo raspi-config -> Interface Options -> Serial Port -> Enable
# 5. 设置静态IP(避免DHCP变动导致网关失联)
sudo nano /etc/dhcpcd.conf
# 追加:
interface eth0
static ip_address=192.168.1.100/24
static routers=192.168.1.1
static domain_name_servers=192.168.1.1 114.114.114.114这是整个系统的"数据总线"。我们自定义一套轻量级协议:
Topic规范:
sensors/{device_id}/{sensor_type}
actuators/{device_id}/{actuator_type}/command
actuators/{device_id}/{actuator_type}/status消息格式(JSON) :
{
"timestamp": 1703001234,
"device_id": "gateway-01",
"sensor_type": "dissolved_oxygen",
"value": 5.8,
"unit": "mg/L",
"quality": "good"
}Python采集脚本(核心片段) :
#!/usr/bin/env python3
# collector.py - 传感器数据采集与MQTT发布
import json
import time
import paho.mqtt.client as mqtt
from datetime import datetime
import board
import busio
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
# 初始化MQTT客户端
mqtt_client = mqtt.Client()
mqtt_client.connect("localhost", 1883, 60)
mqtt_client.loop_start()
# 初始化传感器
i2c = busio.I2C(board.SCL, board.SDA)
ads = ADS.ADS1115(i2c)
# 模拟读取pH(实际项目中用RS485 Modbus协议)
def read_ph():
# 从ADC读取电压值,转换为pH
chan = AnalogIn(ads, ADS.P0)
voltage = chan.voltage
# 校准曲线:pH = 7 + (voltage - 2.5) / 0.18
ph = 7.0 + (voltage - 2.5) / 0.18
return round(ph, 2)
# 读取DS18B20温度(1-Wire协议)
def read_temperature():
with open("/sys/bus/w1/devices/28-*/temperature", "r") as f:
temp_raw = f.read().strip()
return int(temp_raw) / 1000.0
# 发布数据
def publish_sensor_data():
payload = {
"timestamp": int(time.time()),
"device_id": "gateway-01",
"data": {
"ph": read_ph(),
"temperature": read_temperature(),
# 溶氧和氨氮用Modbus读取,略
}
}
mqtt_client.publish("sensors/gateway-01/data", json.dumps(payload))
print(f"[{datetime.now()}] Published: {payload}")
# 主循环
if __name__ == "__main__":
while True:
publish_sensor_data()
time.sleep(60) # 每分钟采集一次传统自动化养虾用"阈值告警"就够了:溶氧低于3就开增氧机,温度高于35就开水循环。
那为什么还要AI Agent?
因为"阈值"是静态的,而虾是动态的。
AI Agent的价值在于:结合历史数据、天气信息、养殖阶段,做出"上下文感知"的决策,而不是机械地触发阈值。
方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
本地Ollama(Qwen2.5-7B) | 无网络依赖、隐私安全、低延迟 | 硬件要求高(树莓派跑不动) | 边缘网关(需加NPU或外挂GPU) |
云端API(通义千问/GPT-4) | 效果好、无需本地算力 | 依赖网络、有API成本 | 有稳定网络的场景 |
混合方案 | 云端为主,本地规则兜底 | 架构复杂 | 生产推荐 |
我们的选择:云端通义千问API(决策主引擎) + 本地规则引擎(兜底保障) 。
树莓派只运行规则引擎(执行"保命指令"),AI决策通过API调用,同时将决策日志本地缓存——网络断了也不影响基础自动化。
OpenClaw的Agent采用ReAct(Reasoning + Acting) 模式,核心流程如下:
用户查询/定时触发
│
▼
┌─────────────┐
│ Agent │
│ 思考阶段 │──▶ 调用大模型 + 检索知识库
└─────────────┘
│
▼
┌─────────────┐
│ 工具调用 │──▶ 查询传感器数据、天气API、养殖日历
└─────────────┘
│
▼
┌─────────────┐
│ 决策生成 │──▶ 输出结构化指令
└─────────────┘
│
▼
┌─────────────┐
│ 执行确认 │──▶ 人工审核(可选)→ 发送到执行层
└─────────────┘Python实现(LangGraph + 通义千问) :
# agent.py - AI Agent决策引擎
import os
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_community.chat_models import ChatTongyi
from langchain.tools import tool
from langchain.agents import create_react_agent, AgentExecutor
import requests
import json
# ============ 状态定义 ============
class AgentState(TypedDict):
query: str
sensor_data: dict
weather_data: dict
decision: dict
action_history: List[str]
# ============ 工具定义 ============
@tool
def get_sensor_data(sensor_type: str) -> dict:
"""获取当前传感器数据"""
# 从InfluxDB查询最新数据
query = f'''
from(bucket: "shrimp_tank")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "{sensor_type}")
|> last()
'''
# 实际代码调用InfluxDB API
return {"value": 5.8, "unit": "mg/L", "timestamp": "2026-06-25T10:00:00Z"}
@tool
def get_weather_forecast(days: int) -> dict:
"""获取未来几天的天气预报"""
# 调用和风天气/OpenWeather API
# 返回:气温、降雨概率、风速、日照时长
return {"temp_max": 32, "temp_min": 26, "rain_prob": 0.2}
@tool
def get_shrimp_growth_stage() -> str:
"""获取当前养殖阶段"""
# 从数据库读取
return "adult" # "larva" | "juvenile" | "adult"
# ============ Agent初始化 ============
llm = ChatTongyi(
model="qwen-max",
api_key=os.getenv("DASHSCOPE_API_KEY"),
temperature=0.3,
)
tools = [get_sensor_data, get_weather_forecast, get_shrimp_growth_stage]
# ============ 系统Prompt ============
SYSTEM_PROMPT = """
你是一个专业的南美白对虾养殖AI助手。你的职责是:
1. 分析传感器数据,判断当前水质状况
2. 结合天气和养殖阶段,做出投喂、增氧、换水的决策
3. 输出结构化的JSON决策指令
决策规则参考:
- 溶解氧(Dissolved Oxygen):
- > 6 mg/L: 正常
- 4-6 mg/L: 需关注,建议增氧
- < 4 mg/L: 危险,必须立即增氧
- pH值:7.5-8.5为最佳,低于7.0或高于9.0需紧急处理
- 温度:28-32°C为最佳
- 投喂量:按虾体重的3-5%,根据水温调整
输出格式:
{
"actions": [
{"type": "feed", "amount": 500, "unit": "g"},
{"type": "aeration", "duration": 30, "unit": "min"},
{"type": "water_exchange", "rate": 10, "unit": "%"}
],
"alerts": ["溶解氧偏低,建议立即检查增氧设备"],
"reasoning": "当前溶氧5.2mg/L,接近警戒线,且未来3小时无降雨..."
}
"""
# ============ 构建Agent ============
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=SYSTEM_PROMPT
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
def make_decision(sensor_data: dict, weather_data: dict, stage: str) -> dict:
"""执行一次决策"""
query = f"""
当前传感器数据:{json.dumps(sensor_data)}
天气预报:{json.dumps(weather_data)}
养殖阶段:{stage}
请给出当前时间点的养殖决策。
"""
result = agent_executor.invoke({"input": query})
# 解析大模型输出的JSON
try:
decision = json.loads(result["output"])
except:
# 解析失败时使用兜底规则
decision = fallback_rules(sensor_data)
return decision
def fallback_rules(sensor_data: dict) -> dict:
"""规则引擎兜底"""
actions = []
alerts = []
do = sensor_data.get("dissolved_oxygen", 8)
if do < 4:
actions.append({"type": "aeration", "duration": 60, "unit": "min"})
alerts.append("危险!溶氧低于4mg/L,强制增氧60分钟")
elif do < 6:
actions.append({"type": "aeration", "duration": 20, "unit": "min"})
ph = sensor_data.get("ph", 8.0)
if ph < 7.0 or ph > 9.0:
alerts.append(f"pH异常({ph}),建议立即检测")
return {"actions": actions, "alerts": alerts, "reasoning": "规则引擎兜底决策"}每次AI决策都需要记录,便于事后分析和审计。
# decision_logger.py
import json
from datetime import datetime
import sqlite3
DB_PATH = "/data/decisions.db"
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
sensor_data TEXT,
decision TEXT,
execution_status TEXT,
human_review TEXT
)
''')
conn.commit()
conn.close()
def log_decision(sensor_data: dict, decision: dict, status: str = "pending"):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO decisions (timestamp, sensor_data, decision, execution_status) VALUES (?, ?, ?, ?)",
(datetime.now().isoformat(), json.dumps(sensor_data), json.dumps(decision), status)
)
conn.commit()
conn.close()# actuator.py - 执行控制
import RPi.GPIO as GPIO
import time
import json
import paho.mqtt.client as mqtt
# GPIO映射(BCM编号)
PINS = {
"feeder": 17, # 投喂机
"aerator": 27, # 增氧机
"water_pump": 22, # 水循环泵
"shade": 23, # 遮阳帘
}
# 初始化GPIO
GPIO.setmode(GPIO.BCM)
for pin in PINS.values():
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW) # 默认关闭
def execute_action(action: dict) -> bool:
"""执行单个动作"""
action_type = action.get("type")
duration = action.get("duration", 0)
if action_type not in PINS:
print(f"未知动作: {action_type}")
return False
pin = PINS[action_type]
try:
GPIO.output(pin, GPIO.HIGH)
print(f"执行: {action_type},持续{duration}秒")
if duration > 0:
time.sleep(duration)
GPIO.output(pin, GPIO.LOW)
return True
except Exception as e:
print(f"执行失败: {e}")
GPIO.output(pin, GPIO.LOW)
return False
# MQTT订阅执行指令
def on_message(client, userdata, msg):
payload = json.loads(msg.payload)
actions = payload.get("actions", [])
for action in actions:
execute_action(action)
# 订阅决策输出Topic
mqtt_client = mqtt.Client()
mqtt_client.on_message = on_message
mqtt_client.connect("localhost", 1883, 60)
mqtt_client.subscribe("actuators/gateway-01/command")
mqtt_client.loop_forever()AI输出的"500g"不能直接执行,需要结合投喂机的出料速率进行校准。
# feeder_calibration.py
# 校准方法:运行投喂机10秒,称量实际出料量
def calibrate_feeder(duration_sec: int = 10) -> float:
"""返回每秒出料量(克/秒)"""
GPIO.output(PINS["feeder"], GPIO.HIGH)
time.sleep(duration_sec)
GPIO.output(PINS["feeder"], GPIO.LOW)
# 人工称重后输入
actual_weight = float(input(f"请称量{duration_sec}秒的出料重量(克): "))
rate = actual_weight / duration_sec
print(f"校准完成: {rate:.2f} 克/秒")
return rate
# 将校准值写入配置文件
FEEDER_RATE = 12.5 # 克/秒,校准后填入
def feed(amount_grams: int):
duration = amount_grams / FEEDER_RATE
GPIO.output(PINS["feeder"], GPIO.HIGH)
time.sleep(duration)
GPIO.output(PINS["feeder"], GPIO.LOW)
print(f"投喂完成: {amount_grams}g,运行{duration:.1f}秒")# docker-compose.yml - 部署时序数据库和看板
version: '3'
services:
influxdb:
image: influxdb:2.7
volumes:
- ./influxdb-data:/var/lib/influxdb2
environment:
- INFLUXDB_DB=shrimp_tank
- INFLUXDB_USER=admin
- INFLUXDB_PASSWORD=your_password
ports:
- "8086:8086"
restart: unless-stopped
grafana:
image: grafana/grafana:latest
volumes:
- ./grafana-data:/var/lib/grafana
- ./grafana-provisioning:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
ports:
- "3000:3000"
restart: unless-stoppedGrafana Dashboard包含以下Panel:
使用Bark(iOS)或PushDeer(跨平台)推送告警到手机。
# alert_push.py
import requests
BARK_URL = "https://api.day.app/YOUR_BARK_KEY/"
def push_alert(title: str, body: str, level: str = "warning"):
"""推送告警到手机"""
# level: info / warning / critical
icon = {
"info": "🔵",
"warning": "🟡",
"critical": "🔴"
}.get(level, "📢")
url = f"{BARK_URL}/{icon} {title}/{body}?autoCopy=1&sound=alarm"
requests.get(url)
print(f"告警已推送: {title}")
# 示例:溶氧危机告警
def check_and_alert(sensor_data: dict):
do = sensor_data.get("dissolved_oxygen", 8)
if do < 3:
push_alert("溶氧危机", f"当前溶氧{do}mg/L,立即处理!", "critical")
elif do < 5:
push_alert("溶氧偏低", f"当前溶氧{do}mg/L,建议增氧", "warning")问题:pH探针连续工作1个月后,读数漂移了±0.3。
解决方案:
# 自动补偿逻辑
class PHCompensator:
def __init__(self):
# 从数据库加载最近的校准记录
self.offset = 0.0
self.last_calibration = None
def compensate(self, raw_value: float) -> float:
return raw_value - self.offset
def calibrate(self, standard_value: float, measured_value: float):
self.offset = measured_value - standard_value
# 保存到数据库策略:
# 离线决策缓存
class OfflineCache:
def __init__(self):
self.cache = []
self.max_size = 20
def add(self, sensor_data: dict, decision: dict):
if len(self.cache) >= self.max_size:
self.cache.pop(0)
self.cache.append({"sensor": sensor_data, "decision": decision})
def sync_to_cloud(self):
# 网络恢复后批量上传
for item in self.cache:
upload_decision(item)
self.cache.clear()通义千问API按Token计费,频繁调用成本不低。
优化策略:
# 决策缓存
class DecisionCache:
def __init__(self):
self.cache = {} # key: sensor_hash, value: decision
def get(self, sensor_data: dict):
key = self._hash(sensor_data)
return self.cache.get(key)
def put(self, sensor_data: dict, decision: dict):
key = self._hash(sensor_data)
self.cache[key] = decision
def _hash(self, data: dict) -> str:
# 只取关键字段做哈希
key_fields = ["ph", "temperature", "dissolved_oxygen"]
return str([data.get(k) for k in key_fields])github.com/openclaw/openclaw(MIT License)openclaw-agent/:AI决策引擎(Python)openclaw-collector/:数据采集(Python/C)openclaw-web/:管理后台(Vue.js + Node.js)openclaw-hardware/:硬件原理图(KiCad)OpenClaw这个项目最吸引我的,不是它用了多先进的技术,而是它真正解决了一个"土味"但真实的问题——让养虾这件事,从"靠天吃饭"变成"靠数据吃饭"。
技术再炫酷,最终要回答的还是那个朴素的问题:能不能帮用户多赚钱?
从这个角度看,OpenClaw做到了。根据早期用户的反馈,部署OpenClaw后,投喂精准度提升20%,溶氧事故率下降80%,单造养殖周期缩短5-7天。
这就是"让虾养你"的含义——AI帮你把养虾这件事做得更高效、更稳定,最终让你从繁琐的日常管理中解放出来,去思考更大的事。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。