首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >安信可AI语音开发板AiPi-PalChatV1 + MCP,通过HomeAssistant自动化控制设备

安信可AI语音开发板AiPi-PalChatV1 + MCP,通过HomeAssistant自动化控制设备

原创
作者头像
安信可科技
发布2025-06-25 14:03:22
发布2025-06-25 14:03:22
4420
举报
文章被收录于专栏:DIYDIY

以下作品由安信可社区用户

WT_0213制作

自从拥有了安信可AiPi-PalChatV1

Ai-M61+VC02做的语音控制器不香了

这是之前做的两个版本

通过VC02控制HA灯光的小项目

Ai-M61+VC02语音控制HA设备​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=45021

Ai-M61+VC02语音控制HA设备v1.1版​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=45059

这两个项目都是基于HA的自动化来完成的。

参考:

[智能家居]MQTT控制HomeAssistant设备​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=44644&fromuid=15918

现在都在链接智能化、大模型、MCP、AI,那就接入吧

不得不说虾哥开源的这个小智真的太棒了

方案落地

下面介绍下如何通过AiPi-PalChatV1+MCP控制HA设备。

参考:

【AiPi-PalChatV1语音开发板】小智 MCP 接入 Home Assistant​bbs.ai-thinker.com/forum.php?mod=viewthread&tid=47027

1、下载部署小智MCP服务代码

git clone https://gitee.com/lazy-ai/ai-pi-pal-chat-v1-ha.git

克隆代码后,先运行

pip install paho-mqtt pip install -r requirements.txt 如果是mac pip install paho-mqtt pip install -r requirements_mac.txt

等待安装完成即可

安装好后如果报 XXXX 模块没有找到,就继续执行 pip install xxx把缺失的部分安装全就好了。

2、下载小智客户端

https://github.com/huangjunsen0406/py-xiaozhi.git

github 地址可能间歇性的打不开,在gitee上面找了个

git clone https://gitee.com/tinytaro/py-xiaozhi0.git

克隆下来以后和上面基本一样先安装依赖

pip install -r requirements.txt 如果是mac pip install -r requirements_mac.txt

这里需要注意的是有个包 **cv2**找不到

pip install opencv-python

还有一个修改点,就是将小智客户端中的IoT相关代码注释掉,不然不走MCP

Iot代码在src下面iot目录下

注释 **application.py**第138行

# 初始化物联网设备

# self._initialize_iot_devices()

这样就好了然后进到项目目录打开命令行

执行

python main.py

正常情况下他会播报一个设备码,和AiPi-PalChatV1 配网时效果时是一样的。将设备码添加到智能体。

3、配置MCP接入点

打开 https://xiaozhi.me/

点击**控制台**, 登录后

点击**配置角色**,拉到屏幕最下方

右下角**MCP接入点**

复制接入点地址到第一步命令行

先执行

export MCP_ENDPOINT=接入点地址

再执行命令

python mcp_pipe.py switch_lamp.py

成功运行输出

(base) ➜ mcp git:(main) python mcp_pipe.py switch_lamp.py 2025-06-17 18:15:30,888 - MCP_PIPE - INFO - Connecting to WebSocket server... 2025-06-17 18:15:31,541 - MCP_PIPE - INFO - Successfully connected to WebSocket server 2025-06-17 18:15:31,548 - MCP_PIPE - INFO - Started switch_lamp.py process /Users/tengyun1/AI/mcp-calculator/switch_lamp.py:30: DeprecationWarning: Callback API version 1 is deprecated, update to latest version client = mqtt.Client() # 创建MQTT客户端实例 [06/17/25 18:15:32] INFO Processing request of type server.py:523 ListToolsRequest 2025-06-17 18:15:32,397 - Home Assistant MCP - ERROR - MQTT连接: 0 ERROR MQTT连接: 0 switch_lamp.py:51 2025-06-17 18:15:32,845 - Home Assistant MCP - INFO - Connected with result code 0 INFO Connected with result code 0 switch_lamp.py:33

这样就都跑起来了,然后就是测试

在会话框中输入“开灯/关灯”,右侧会同步限制MQTT接收到的信息。

下面是MCP 源代码

# server.py import sys import json import logging import paho.mqtt.client as mqtt import threading from mcp.server.fastmcp import FastMCP # 配置日志 logger = logging.getLogger('Home Assistant MCP') handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(http://logging.INFO) # Fix UTF-8 encoding for Windows console if sys.platform == 'win32': sys.stderr.reconfigure(encoding='utf-8') sys.stdout.reconfigure(encoding='utf-8') # MQTT配置 MQTT_CONFIG = { "broker_address": "http://broker.emqx.io", "broker_port": 1883, "username": None, "password": None, } client = mqtt.Client() # 创建MQTT客户端实例 def on_connect(client, userdata, flags, rc): http://logger.info(f"Connected with result code {rc}") client.publish("ha/ai", "Hello MQTT") # 连接后发布消息 client.subscribe("ha/ai") # 订阅主题以接收消息 def on_message(client, userdata, message): try: payload = message.payload.decode() except UnicodeDecodeError: payload = f"<无法解码的消息内容: {message.payload}>" http://logger.info(f"Received message: {payload} on topic {message.topic}") def init_mqtt(): try: client.on_connect = on_connect client.on_message = on_message if MQTT_CONFIG["username"] and MQTT_CONFIG["password"]: client.username_pw_set(MQTT_CONFIG["username"], MQTT_CONFIG["password"]) ret = client.connect(MQTT_CONFIG["broker_address"], MQTT_CONFIG["broker_port"], 60) logger.error(f"MQTT连接: {ret}") client.loop_forever() except Exception as e: logger.error(f"MQTT连接失败: {e}") sys.exit(1) # Create an MCP server mcp = FastMCP("Home Assistant MCP") # Add a lamp control tool @mcp.tool() def switchLamp(on_off: bool) -> dict: """这是一个用于控制HomeAssistant的灯的接口""" result = "开灯成功" if on_off else "关灯成功" http://logger.info(f"switchLamp formula: {on_off}, result: {result}") status = 1 if on_off else 0 payload = json.dumps({"status": status}) client.publish("ha/ai", payload) # 连接后发布消息 return {"success": True, "result": result} # Start the server if __name__ == "__main__": # 启动 MQTT 客户端在单独的线程中 mqtt_thread = threading.Thread(target=init_mqtt, daemon=True) mqtt_thread.start() mcp.run(transport="stdio")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 以下作品由安信可社区用户
  • WT_0213制作
  • 方案落地
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档