7.5
【项目说明】
连续两天,都在学习云天老师的小智和M10MCP的教程,这个“智能家”小助手的项目算是一个课后习题,来巩固所学知识。
项目中有小风扇的开启和关闭,灯环的点亮和熄灭,获取环境温度和湿度,做为一个小练习还是蛮好的,下面是作业完成情况。
材料清单
- M10行空板 X1
- M10电池扩展板组合 X1
- RGB灯环(16灯) X1
- 儿童小风扇 X1
- 锂电池盒(风扇供电) X1
- 数字继电器 X1
- K10行空板(安装小智) X1
- 锂电池(K10供电) X1
- USB线 X1
- DHT22温湿度传感器 X1
步骤1 行空板M10上安装pyenv环境和Python 3.12.7
这一步已在上个项目中按照官方教程完成,记录在:M10仰望小车(MCP)的学习与尝试
https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714

同时感谢云天老师的教程和指导。
步骤2 K10小智聊天机器人配置
K10刷小智固件见官方教程。
链接上一帖子中都有。


我的仰望小车和智能家共用同一个角色配置,用了同一个MCP接入点地址。
步骤3 硬件连接
硬件接线简图

实物图

步骤4 M10程序编写
1、编写mcp服务文件,move.py。
新建文件move.py,复制云天老师写的代码,修改为智能家的控制内容。
功能描述:控制M10风扇开启、关闭,灯环点亮、熄灭。
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging
#配置日志
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器
mcp = FastMCP("MoveServer")
#======================= 设备控制工具 =======================
#添加一个工具
@mcp.tool()
def open_fan() -> dict:
"""
控制风扇开启。
该函数将设置P21引脚为高电平,控制接在该引脚上的继电器闭合电源开关,使风扇开启。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "风扇开启"
logger.info(result)
return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_fan() -> dict:
"""
控制风扇关闭。
该函数将设置P21引脚为低电平,控制接在该引脚上的继电器断开电源开关,使风扇关闭。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "风扇关闭"
logger.info(result)
return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def open_light() -> dict:
"""
控制RGB灯环点亮。
该函数将设置P23引脚RGB灯环点亮,设置为红色,亮度100。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "灯环已点亮"
logger.info(result)
return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_light() -> dict:
"""
控RGB灯环熄灭。
该函数将设置P23引脚RGB灯环熄灭。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "灯环已熄灭"
logger.info(result)
return {"success": True, "result": result}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
这个代码文件放到DeepSeek中优化,得到如下代码,增加了获取实时温度、湿度。。
"""
真实硬件版本:MCP服务器设备控制工具
版本: 0.3.0
"""
import sys
import logging
import time
from typing import Dict
from mcp.server.fastmcp import FastMCP
from pinpong.board import Board, Pin, DHT22, NeoPixel
# 配置日志系统
def setup_logger() -> logging.Logger:
logger = logging.getLogger('MoveServer')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
file_handler = logging.FileHandler('move_server.log')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
try:
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
import io
sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8')
# 创建日志记录器
logger = setup_logger()
# 创建MCP服务器
mcp = FastMCP("MoveServer")
# ======================= 硬件配置 =======================
HARDWARE_CONFIG = {
"FAN_PIN": 21, # 风扇控制引脚
"SENSOR_PIN": 22, # 温湿度传感器引脚
"LIGHT_PIN": 23, # 灯带控制引脚
"NUM_LEDS": 16 # LED数量
}
# ======================= 设备控制工具 =======================
class HardwareController:
"""真实硬件操作类"""
def __init__(self, config: dict):
"""初始化真实硬件"""
try:
# 初始化开发板
Board().begin()
# 初始化风扇控制引脚
self.fan_pin = Pin(config["FAN_PIN"], Pin.OUT)
# 初始化温湿度传感器
self.dht_sensor = DHT22(Pin(config["SENSOR_PIN"]))
# 初始化灯带
self.light_pin = NeoPixel(Pin(config["LIGHT_PIN"]), config["NUM_LEDS"])
self.light_pin.brightness(100)
logger.info("真实硬件初始化成功")
except Exception as e:
logger.critical(f"硬件初始化失败: {e}")
raise
def control_fan(self, state: bool) -> str:
try:
self.fan_pin.write_digital(1 if state else 0)
return f"风扇已{'开启' if state else '关闭'}"
except Exception as e:
logger.error(f"风扇控制失败: {e}")
return f"风扇控制失败: {str(e)}"
def control_light(self, state: bool) -> str:
try:
if state:
self.light_pin.range_color(0, self.light_pin.num, 0xFF0000) # 红色
else:
self.light_pin.clear()
return f"灯带已{'点亮' if state else '熄灭'}"
except Exception as e:
logger.error(f"灯带控制失败: {e}")
return f"灯带控制失败: {str(e)}"
def get_temperature(self) -> str:
"""获取真实温度数据"""
try:
# 尝试最多3次读取,防止偶尔读取失败
for _ in range(3):
try:
temp = self.dht_sensor.temp_c()
return f"当前温度: {temp}℃"
except:
time.sleep(0.2) # 短暂延迟后重试
return "温度读取失败"
except Exception as e:
logger.error(f"温度读取失败: {e}")
return f"温度读取失败: {str(e)}"
def get_humidity(self) -> str:
"""获取真实湿度数据"""
try:
for _ in range(3):
try:
humidity = self.dht_sensor.humidity()
return f"当前湿度: {humidity}%"
except:
time.sleep(0.2)
return "湿度读取失败"
except Exception as e:
logger.error(f"湿度读取失败: {e}")
return f"湿度读取失败: {str(e)}"
def tool_response(success: bool, result: str) -> Dict[str, object]:
return {
"success": success,
"result": result
}
# 创建硬件控制器实例
try:
hardware_controller = HardwareController(HARDWARE_CONFIG)
except Exception as e:
logger.critical("硬件初始化失败,服务器无法启动")
sys.exit(1)
@mcp.tool()
def open_fan() -> Dict[str, object]:
logger.info("执行风扇开启命令")
result = hardware_controller.control_fan(True)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def close_fan() -> Dict[str, object]:
logger.info("执行风扇关闭命令")
result = hardware_controller.control_fan(False)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def open_light() -> Dict[str, object]:
logger.info("执行灯带开启命令")
result = hardware_controller.control_light(True)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def close_light() -> Dict[str, object]:
logger.info("执行灯带关闭命令")
result = hardware_controller.control_light(False)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def get_temp() -> Dict[str, object]:
logger.info("读取温度数据")
result = hardware_controller.get_temperature()
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def get_humidity() -> Dict[str, object]:
logger.info("读取湿度数据")
result = hardware_controller.get_humidity()
logger.info(result)
return tool_response("失败" not in result, result)
# 启动服务器
if __name__ == "__main__":
try:
logger.info("启动MCP服务器...")
logger.info(f"硬件配置: {HARDWARE_CONFIG}")
mcp.run(transport="stdio")
logger.info("MCP服务器已停止")
except Exception as e:
logger.critical(f"服务器运行失败: {e}")
sys.exit(1)
修改小智AI管道文件mcp_pipe.py。
新建文件mcp_pipe.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
加入了开机画面。
"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0
Usage:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
# -*- coding: UTF-8 -*-
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Board,Pin
from pinpong.board import DHT22
from pinpong.board import NeoPixel
import json
Board().begin()
u_gui=GUI()
pin1 = Pin(Pin.P21, Pin.OUT)
pin2 = Pin(Pin.P22, Pin.IN)
pin3 = Pin(Pin.P23, Pin.PWM)
dht1 = DHT22(pin2)
np1 = NeoPixel(pin3,16)
np1.brightness(128)
np1.brightness(100)
u_gui.draw_image(image="back.png",x=0,y=0)
u_gui.draw_text(text="M10智能助手",x=10,y=20,font_size=25, color="#FF00FF")
# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# Reconnection settings
INITIAL_BACKOFF = 1 # Initial wait time in seconds
MAX_BACKOFF = 600 # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
async def connect_with_retry(uri):
"""Connect to WebSocket server with retry mechanism"""
global reconnect_attempt, backoff
while True: # Infinite reconnection
try:
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1) # Add some random jitter
logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
await asyncio.sleep(wait_time)
# Attempt to connect
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")
# Calculate wait time for next reconnection (exponential backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri):
"""Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
global reconnect_attempt, backoff
try:
logger.info(f"Connecting to WebSocket server...")
async with websockets.connect(uri) as websocket:
logger.info(f"Successfully connected to WebSocket server")
# Reset reconnection counter if connection closes normally
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# Start mcp_script process
process = subprocess.Popen(
['python', mcp_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True # Use text mode
)
logger.info(f"Started {mcp_script} process")
# Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed: {e}")
raise # Re-throw exception to trigger reconnection
except Exception as e:
logger.error(f"Connection error: {e}")
raise # Re-throw exception
finally:
# Ensure the child process is properly terminated
if 'process' in locals():
logger.info(f"Terminating {mcp_script} process")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"{mcp_script} process terminated")
async def pipe_websocket_to_process(websocket, process):
"""Read data from WebSocket and write to process stdin"""
try:
while True:
# Read message from WebSocket
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
# Write to process stdin (in text mode)
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"Error in WebSocket to process pipe: {e}")
raise # Re-throw exception to trigger reconnection
finally:
# Close process stdin
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""Read data from process stdout and send to WebSocket"""
try:
while True:
# Read data from process stdout
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data: # If no data, the process may have ended
logger.info("Process has ended output")
break
# Send data to WebSocket
logger.debug(f">> {data[:120]}...")
print(data)
# 解析 JSON 字符串
json_str = json.loads(data)
print(json_str['id'])
if json_str['id']>1:
print(json_str['id'])
if json_str.get('result', {}):
text=json.loads(json_str['result']['content'][0]['text'])
if text['success']:
print(text['result'])
L=480
if "开启" in text['result']:
print("******************************")
pin1.write_digital(1)
if "关闭" in text['result']:
print("******************************")
pin1.write_digital(0)
if "点亮" in text['result']:
print("******************************")
np1.range_color(0,15,0xFF0000)
if "熄灭" in text['result']:
print("******************************")
np1.clear()
# In text mode, data is already a string, no need to decode
await websocket.send(data)
except Exception as e:
logger.error(f"Error in process to WebSocket pipe: {e}")
raise # Re-throw exception to trigger reconnection
async def pipe_process_stderr_to_terminal(process):
"""Read data from process stderr and print to terminal"""
try:
while True:
# Read data from process stderr
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data: # If no data, the process may have ended
logger.info("Process has ended stderr output")
break
# Print stderr data to terminal (in text mode, data is already a string)
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"Error in process stderr pipe: {e}")
raise # Re-throw exception to trigger reconnection
def signal_handler(sig, frame):
"""Handle interrupt signals"""
logger.info("Received interrupt signal, shutting down...")
sys.exit(0)
if __name__ == "__main__":
# Register signal handler
signal.signal(signal.SIGINT, signal_handler)
# mcp_script
#if len(sys.argv) < 2:
# logger.error("Usage: mcp_pipe.py ")
# sys.exit(1)
mcp_script = "move.py"
# Get token from environment variable or command line arguments
#endpoint_url = os.environ.get('MCP_ENDPOINT')
endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
if not endpoint_url:
logger.error("Please set the `MCP_ENDPOINT` environment variable")
sys.exit(1)
# Start main loop
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("Program interrupted by user")
except Exception as e:
logger.error(f"Program execution error: {e}")
跑通后放在DeepSeek中优化,得到如下代码。
这个文件配合上一组文件中下面的DS优化版使用。
[小贴士]
教程中放了4个代码文件,上move.pyt和上mcp_pipe.py两个文件一组配合使用,只能控制开关和风扇开关。
下move.pyt和下mcp_pipe.py两个DeepSeek优化版文件一组配合使用,增加了获取温湿度传感器实时读数的功能,从文件风格和内容中可看出来,请不要混合,混合不能运行。
MCP接入地址要换成自己小智的接入点地址。
"""
优化版本:连接MCP服务器并通过WebSocket传输输入输出
版本: 0.2.0
改进内容:
1. 模块化设计 - 功能拆分为独立函数
2. 增强错误处理 - 添加更完善的异常处理
3. 硬件初始化封装 - 集中管理硬件配置
4. JSON处理优化 - 简化解析逻辑
5. 资源管理改进 - 确保正确释放所有资源
6. 日志系统优化 - 提供更详细的错误信息
用法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
# -*- coding: UTF-8 -*-
import asyncio
import json
import logging
import signal
import sys
import random
import subprocess
from unihiker import GUI
import websockets
from pinpong.board import Board, Pin, DHT22, NeoPixel
# 全局常量
INITIAL_BACKOFF = 1 # 初始重连等待时间(秒)
MAX_BACKOFF = 600 # 最大重连等待时间(秒)
MCP_SCRIPT = "move.py" # MCP脚本名称
# 配置日志系统
def setup_logger():
"""配置并返回日志记录器"""
logger = logging.getLogger('MCP_PIPE_OPT')
logger.setLevel(logging.INFO)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建文件处理器
file_handler = logging.FileHandler('mcp_pipe.log')
file_handler.setLevel(logging.DEBUG)
# 创建日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器到日志记录器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
# 硬件初始化
def init_hardware():
"""初始化所有硬件设备并返回对象"""
try:
# 初始化开发板
Board().begin()
# 初始化GUI
gui = GUI()
gui.draw_image(image="back.png", x=0, y=0)
gui.draw_text(
text="M10智能助手",
x=10,
y=20,
font_size=25,
color="#FF00FF"
)
# 初始化引脚
pin1 = Pin(Pin.P21, Pin.OUT)
pin2 = Pin(Pin.P22, Pin.IN)
pin3 = Pin(Pin.P23, Pin.PWM)
# 初始化传感器
dht = DHT22(pin2)
# 初始化NeoPixel
np = NeoPixel(pin3, 16)
np.brightness(100)
logger.info("硬件初始化成功")
return {
'gui': gui,
'pin1': pin1,
'pin2': pin2,
'pin3': pin3,
'dht': dht,
'np': np
}
except Exception as e:
logger.error(f"硬件初始化失败: {e}")
raise
# 处理硬件控制命令
def handle_hardware_command(command: str, hardware: dict):
"""
根据命令执行硬件操作
参数:
command: 接收到的命令字符串
hardware: 包含所有硬件对象的字典
"""
try:
if "开启" in command:
hardware['pin1'].write_digital(1)
logger.info("设备已开启")
elif "关闭" in command:
hardware['pin1'].write_digital(0)
logger.info("设备已关闭")
elif "点亮" in command:
hardware['np'].range_color(0, 15, 0xFF0000)
logger.info("灯带已点亮")
elif "熄灭" in command:
hardware['np'].clear()
logger.info("灯带已熄灭")
elif "温度" in command:
temp = hardware['dht'].temp_c()
logger.info(f"当前温度: {temp}℃")
elif "湿度" in command:
humidity = hardware['dht'].humidity()
logger.info(f"当前湿度: {humidity}%")
except Exception as e:
logger.error(f"执行硬件命令时出错: {command} - {e}")
# 处理JSON数据
def process_json_data(data: str, hardware: dict):
"""
处理JSON数据并执行相应的硬件操作
参数:
data: JSON格式的字符串数据
hardware: 包含所有硬件对象的字典
"""
try:
json_data = json.loads(data)
message_id = json_data.get('id', 0)
# 只处理ID大于1的消息
if message_id <= 1:
return
result = json_data.get('result', {})
if not result:
return
# 提取内容文本
contents = result.get('content', [])
if not contents:
return
first_content = contents[0]
text_content = first_content.get('text', '')
if not text_content:
return
# 尝试解析内部JSON
try:
inner_data = json.loads(text_content)
if inner_data.get('success', False):
command_result = inner_data.get('result', '')
if command_result:
handle_hardware_command(command_result, hardware)
except json.JSONDecodeError:
# 如果不是JSON,直接作为命令处理
handle_hardware_command(text_content, hardware)
except Exception as e:
logger.error(f"处理JSON数据时出错: {e}\n原始数据: {data[:200]}")
# WebSocket到进程的管道
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程的stdin"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
if isinstance(message, bytes):
message = message.decode('utf-8')
try:
process.stdin.write(message + '\n')
process.stdin.flush()
except BrokenPipeError:
logger.error("进程stdin管道已断开")
break
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket连接已关闭")
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
finally:
if not process.stdin.closed:
process.stdin.close()
# 进程到WebSocket的管道
async def pipe_process_to_websocket(process, websocket, hardware):
"""从进程的stdout读取数据并发送到WebSocket"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data:
logger.info("进程输出结束")
break
logger.debug(f">> 发送消息: {data[:120]}...")
# 处理数据并控制硬件
process_json_data(data, hardware)
# 发送到WebSocket
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
# 进程stderr到终端的管道
async def pipe_process_stderr_to_terminal(process):
"""从进程的stderr读取数据并打印到终端"""
try:
while True:
err_data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not err_data:
logger.info("进程错误输出结束")
break
sys.stderr.write(err_data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程stderr管道错误: {e}")
# 连接WebSocket服务器
async def connect_to_server(uri, hardware):
"""连接到WebSocket服务器并建立与MCP脚本的双向通信"""
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f} 秒后尝试重新连接 #{reconnect_attempt}...")
await asyncio.sleep(wait_time)
logger.info(f"尝试连接到WebSocket服务器: {uri[:60]}...")
async with websockets.connect(uri) as websocket:
logger.info("成功连接到WebSocket服务器")
# 重置重连计数器
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# 启动MCP脚本进程
process = subprocess.Popen(
['python', MCP_SCRIPT],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"已启动 {MCP_SCRIPT} 进程")
# 创建通信任务
try:
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket, hardware),
pipe_process_stderr_to_terminal(process)
)
except Exception as e:
logger.error(f"通信任务出错: {e}")
finally:
# 确保子进程正确终止
terminate_process(process)
except websockets.exceptions.ConnectionClosed as e:
reconnect_attempt += 1
logger.warning(f"WebSocket连接关闭: {e}")
backoff = min(backoff * 2, MAX_BACKOFF)
except Exception as e:
reconnect_attempt += 1
logger.error(f"连接错误: {e}")
backoff = min(backoff * 2, MAX_BACKOFF)
# 终止进程
def terminate_process(process):
"""安全终止子进程"""
if process.poll() is None: # 检查进程是否仍在运行
logger.info(f"终止 {MCP_SCRIPT} 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("进程未响应,强制终止")
process.kill()
logger.info(f"{MCP_SCRIPT} 进程已终止")
# 信号处理
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("接收到中断信号,正在关闭...")
sys.exit(0)
# 主函数
async def main():
"""主异步函数"""
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
# 初始化硬件
try:
hardware = init_hardware()
except Exception as e:
logger.critical(f"硬件初始化失败,程序终止: {e}")
return
# WebSocket端点URL
endpoint_url = (
"wss://api.xiaozhi.me/mcp/?token="
"eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0."
"ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
)
# 启动主循环
try:
await connect_to_server(endpoint_url, hardware)
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
if __name__ == "__main__":
asyncio.run(main())
在mind+中点击运行。

稍等一会,成功连接服务器。

此时接入点刷新后是这样的:在线,工具可用。

下面就可以用K10小智和M10智能家(MCP)互动了。

小结:
1、在实操过程中,硬件操作的代码(小白)先用图形化写好再自动生成,在修改move.py和mcp_pipe.py时参考。
2、在本项目中参考云天老师代码顺利完成灯环和小风扇的控制,但是实时读取dht22的温度和湿度数据的代码不会写,这部分交给DeepSeeK优化完成。
评论