7.27
【写在前面】
这个帖子记录了双电机组成的小智运载平台的进化过程,从基本的前后左右停到速度与时间可调节,一共给了3组代码,为了方便伙伴们复刻,代码注释全部用中文,代码改造起来更简单。

视频中录的是下面的第三组代码,拍摄场地所限,只展示了小范围活动控制。同时语音控制+小智互动,延时较大,小车这种快速运动的装置娱乐一下就好。
材料清单
- M10+电池扩展板 X1
- 双电机小车平台 X1
- K10小智+锂电池 X
步骤1 行空板M10上安装pyenv环境、Python 3.12.7和mcp库
记录在:M10仰望小车(MCP)的学习与尝试
https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714
步骤2 配置小智AI,获取MCP接入点地址
步骤3 M10编程基础功能-小智平台前进、后退、左转、右转、停止
这个版本完全保留了原始代码的功能和结构,中文注释,使用户更容易理解和维护代码,方便有兴趣的伙伴进行修改。
相比较原始版本,代码简洁,更加健壮,连接服务器速度更快。
move.py
小智运动控制服务器
提供前进、后退、左转、右转、停止等基本运动控制功能
"""
小智运动控制服务器
提供前进、后退、左转、右转、停止等基本运动控制功能
"""
from mcp.server.fastmcp import FastMCP
import sys
import logging
# 设置日志记录器
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器实例
mcp = FastMCP("MoveServer")
@mcp.tool()
def forward() -> dict:
"""
控制小智前进
该函数将设置P5和P6引脚为低电平,P8和P16引脚为100%占空比的PWM输出,使小智前进。
返回:
dict: 包含操作结果的字典
"""
result = "小智前进"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def back() -> dict:
"""
控制小智后退
该函数将设置P5和P6引脚为高电平,P8和P16引脚为100%占空比的PWM输出,使小智后退。
返回:
dict: 包含操作结果的字典
"""
result = "小智后退"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def left() -> dict:
"""
控制小智左转
该函数将设置P5引脚为高电平和P6引脚为低电平,P8和P16引脚为100%占空比的PWM输出,使小智左转。
返回:
dict: 包含操作结果的字典
"""
result = "小智左转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def right() -> dict:
"""
控制小智右转
该函数将设置P5引脚为低电平和P6引脚为高电平,P8和P16引脚为100%占空比的PWM输出,使小智右转。
返回:
dict: 包含操作结果的字典
"""
result = "小智右转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def stop() -> dict:
"""
控制小智停止
该函数将设置P8和P16引脚的PWM输出为0,使小智停止。
返回:
dict: 包含操作结果的字典
"""
result = "小智停止"
logger.info(result)
return {"success": True, "result": result}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
mcp_pipe.py
此脚本用于连接MCP服务器,并将输入输出通过WebSocket端点传输。
# -*- coding: UTF-8 -*-
"""
此脚本用于连接MCP服务器,并将输入输出通过WebSocket端点传输。
版本: 0.1.0
使用方法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
import json
from pinpong.board import Board, Pin
# 初始化开发板
Board().begin()
# 创建GUI对象
u_gui = GUI()
# 初始化引脚
p_p5_out = Pin(Pin.P5, Pin.OUT)
p_p8_pwm = Pin(Pin.P8, Pin.PWM)
p_p6_out = Pin(Pin.P6, Pin.OUT)
p_p16_pwm = Pin(Pin.P16, Pin.PWM)
# 绘制UI界面
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10小智MCP", x=30, y=10, font_size=20, color="#FFFFFF")
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF = 1 # 初始重连等待时间(秒)
MAX_BACKOFF = 600 # 最大重连等待时间(秒)
reconnect_attempt = 0 # 重连尝试次数
backoff = INITIAL_BACKOFF # 当前重连等待时间
async def connect_with_retry(uri):
"""带重试机制的WebSocket连接"""
global reconnect_attempt, backoff
while True: # 无限重连
try:
if reconnect_attempt > 0:
# 添加随机抖动避免同时重连
wait_time = backoff * (1 + random.random() * 0.1)
logger.info(f"等待 {wait_time:.2f} 秒后尝试第 {reconnect_attempt} 次重连...")
await asyncio.sleep(wait_time)
# 尝试连接服务器
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
logger.warning(f"连接关闭 (尝试次数: {reconnect_attempt}): {e}")
# 指数退避算法计算下次等待时间
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri):
"""连接到WebSocket服务器并与move.py进程建立双向通信"""
global reconnect_attempt, backoff
try:
logger.info(f"正在连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info(f"成功连接到WebSocket服务器")
# 重置重连计数器
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# 启动move.py进程
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True # 文本模式
)
logger.info(f"已启动 move.py 进程")
# 创建并行任务:处理WebSocket和进程间的通信
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise # 触发重连
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
# 确保子进程正确终止
if 'process' in locals():
logger.info(f"终止 move.py 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"move.py 进程已终止")
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程stdin"""
try:
while True:
# 从WebSocket读取消息
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
# 写入进程stdin(文本模式)
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
raise
finally:
# 关闭进程stdin
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""从进程stdout读取数据并发送到WebSocket"""
try:
while True:
# 从进程stdout读取数据
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data: # 无数据表示进程可能已结束
logger.info("进程已停止输出")
break
# 发送数据到WebSocket
logger.debug(f">> 发送数据: {data[:120]}...")
try:
# 解析JSON数据
json_data = json.loads(data)
command_id = json_data.get('id', 0)
# 处理有效命令
if command_id > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
command_data = json.loads(content)
if command_data['success']:
result_text = command_data['result']
logger.info(f"收到命令: {result_text}")
# 根据命令文本执行相应动作
if "前进" in result_text:
logger.info("执行前进命令")
p_p5_out.write_digital(0)
p_p8_pwm.write_analog(1023)
p_p6_out.write_digital(0)
p_p16_pwm.write_analog(1023)
elif "后退" in result_text:
logger.info("执行后退命令")
p_p5_out.write_digital(1)
p_p8_pwm.write_analog(1023)
p_p6_out.write_digital(1)
p_p16_pwm.write_analog(1023)
elif "左转" in result_text:
logger.info("执行左转命令")
p_p5_out.write_digital(1)
p_p8_pwm.write_analog(1023)
p_p6_out.write_digital(0)
p_p16_pwm.write_analog(1023)
elif "右转" in result_text:
logger.info("执行右转命令")
p_p5_out.write_digital(0)
p_p8_pwm.write_analog(1023)
p_p6_out.write_digital(1)
p_p16_pwm.write_analog(1023)
elif "停止" in result_text:
logger.info("执行停止命令")
p_p8_pwm.write_analog(0)
p_p16_pwm.write_analog(0)
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}")
# 发送数据到WebSocket
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
raise
async def pipe_process_stderr_to_terminal(process):
"""从进程stderr读取数据并打印到终端"""
try:
while True:
# 从进程stderr读取数据
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data: # 无数据表示进程可能已结束
logger.info("进程已停止stderr输出")
break
# 打印stderr数据到终端
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程stderr管道错误: {e}")
raise
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("收到中断信号,正在关闭...")
# 停止所有电机
p_p8_pwm.write_analog(0)
p_p16_pwm.write_analog(0)
sys.exit(0)
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
# 设置WebSocket端点
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzUxOTM5M30.CrjLbJ46CDPWhu0raJm79IoO9VTATxb923XpoDjPzWY-33DcLrk6mr_3mtIQM60crOg9jYxkTmfOf6qQEFzC8Q"
if not endpoint_url:
logger.error("请设置 MCP_ENDPOINT 环境变量")
sys.exit(1)
# 启动主循环
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
步骤4 M10编程DeepSeek优化版本,仍然是基础功能
我把上面代码给DeepSeek进行优化,得到下面文件。
主要优化点:
硬件初始化优化:
使用更清晰的引脚命名(motor_left_dir等)
简化了UI初始化代码
添加了电机停止函数
错误处理增强:
添加了更多异常捕获
确保程序退出时停止电机
改进了JSON解析错误处理
日志系统优化:
统一了日志格式
添加了更有意义的日志信息
设置了日志级别
代码结构优化:
删除了冗余的导入和代码
使用更清晰的函数命名
优化了代码缩进和格式
资源管理改进:
确保进程正确终止
添加了finally块保证资源释放
改进了信号处理
这个版本保留了所有原始功能,但在可读性、稳定性和可维护性上做了显著改进。代码更加简洁清晰,同时增强了错误处理能力。
"""
小智运动控制服务器
提供前进、后退、左转、右转、停止等基本运动控制
"""
from mcp.server.fastmcp import FastMCP
import sys
import logging
# 配置日志记录器
logger = logging.getLogger('MoveServer')
logger.setLevel(logging.INFO)
# 解决Windows控制台编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器实例
mcp = FastMCP("MoveServer")
@mcp.tool()
def forward() -> dict:
"""
控制小智前进
设置电机引脚状态使小车前进
返回:
dict: 操作结果
"""
result = "小智前进"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def back() -> dict:
"""
控制小智后退
设置电机引脚状态使小车后退
返回:
dict: 操作结果
"""
result = "小智后退"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def left() -> dict:
"""
控制小智左转
设置电机引脚状态使小车左转
返回:
dict: 操作结果
"""
result = "小智左转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def right() -> dict:
"""
控制小智右转
设置电机引脚状态使小车右转
返回:
dict: 操作结果
"""
result = "小智右转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def stop() -> dict:
"""
控制小智停止
停止所有电机
返回:
dict: 操作结果
"""
result = "小智停止"
logger.info(result)
return {"success": True, "result": result}
# 服务器启动
if __name__ == "__main__":
try:
mcp.run(transport="stdio")
except KeyboardInterrupt:
logger.info("服务器已停止")
except Exception as e:
logger.error(f"服务器错误: {e}")
# -*- coding: UTF-8 -*-
"""
此脚本用于连接MCP服务器,并将输入输出通过WebSocket端点传输。
版本: 0.1.0
主要功能:
1. 建立WebSocket连接
2. 启动move.py进程并建立双向通信
3. 解析JSON命令并控制小智运动
使用方法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import json
from unihiker import GUI
from pinpong.board import Board, Pin
# === 硬件初始化 ===
Board().begin() # 初始化开发板
# 创建UI对象
ui = GUI()
# 绘制背景和标题
ui.draw_image(image="back.png", x=0, y=0)
ui.draw_text(text="M10小智MCP", x=30, y=10, font_size=20, color="#FFFFFF")
# 初始化电机控制引脚
motor_left_dir = Pin(Pin.P5, Pin.OUT) # 左电机方向控制
motor_left_pwm = Pin(Pin.P8, Pin.PWM) # 左电机PWM控制
motor_right_dir = Pin(Pin.P6, Pin.OUT) # 右电机方向控制
motor_right_pwm = Pin(Pin.P16, Pin.PWM) # 右电机PWM控制
# === 日志配置 ===
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# === 重连设置 ===
INITIAL_BACKOFF = 1 # 初始重连等待时间(秒)
MAX_BACKOFF = 600 # 最大重连等待时间(秒)
reconnect_attempt = 0 # 重连尝试次数
backoff = INITIAL_BACKOFF # 当前重连等待时间
async def connect_with_retry(uri):
"""带重试机制的WebSocket连接"""
global reconnect_attempt, backoff
while True: # 无限重连
try:
# 重连等待处理
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1) # 添加随机抖动
logger.info(f"等待 {wait_time:.2f}秒后尝试第 {reconnect_attempt}次重连...")
await asyncio.sleep(wait_time)
# 尝试连接服务器
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
logger.warning(f"连接关闭 (尝试次数: {reconnect_attempt}): {e}")
# 指数退避算法
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri):
"""连接到WebSocket服务器并与move.py进程建立双向通信"""
global reconnect_attempt, backoff
try:
logger.info(f"正在连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info(f"成功连接到WebSocket服务器")
# 重置重连计数器
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# 启动move.py进程
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"已启动move.py进程")
# 创建并行任务处理通信
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
# 确保子进程终止
if 'process' in locals():
logger.info(f"终止move.py进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"move.py进程已终止")
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程stdin"""
try:
while True:
# 读取WebSocket消息
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
# 写入进程stdin
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
raise
finally:
# 安全关闭stdin
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""从进程stdout读取数据并发送到WebSocket"""
try:
while True:
# 读取进程输出
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data: # 进程结束
logger.info("进程输出结束")
break
logger.debug(f">> 发送数据: {data[:120]}...")
try:
# 解析JSON命令
json_data = json.loads(data)
command_id = json_data.get('id', 0)
# 处理有效命令
if command_id > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
command_data = json.loads(content)
if command_data['success']:
result_text = command_data['result']
logger.info(f"执行命令: {result_text}")
# 根据命令控制电机
if "前进" in result_text:
motor_left_dir.write_digital(0)
motor_right_dir.write_digital(0)
motor_left_pwm.write_analog(1023)
motor_right_pwm.write_analog(1023)
elif "后退" in result_text:
motor_left_dir.write_digital(1)
motor_right_dir.write_digital(1)
motor_left_pwm.write_analog(1023)
motor_right_pwm.write_analog(1023)
elif "左转" in result_text:
motor_left_dir.write_digital(1)
motor_right_dir.write_digital(0)
motor_left_pwm.write_analog(1023)
motor_right_pwm.write_analog(1023)
elif "右转" in result_text:
motor_left_dir.write_digital(0)
motor_right_dir.write_digital(1)
motor_left_pwm.write_analog(1023)
motor_right_pwm.write_analog(1023)
elif "停止" in result_text:
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
except json.JSONDecodeError:
logger.error("JSON解析失败")
except KeyError:
logger.error("命令格式错误")
# 转发数据到WebSocket
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
raise
async def pipe_process_stderr_to_terminal(process):
"""从进程stderr读取数据并打印到终端"""
try:
while True:
# 读取错误输出
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data: # 进程结束
logger.info("进程错误输出结束")
break
# 打印到终端
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程stderr管道错误: {e}")
raise
def stop_motors():
"""停止所有电机"""
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
logger.info("电机已停止")
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("收到中断信号,正在关闭...")
stop_motors()
sys.exit(0)
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
# WebSocket端点配置
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzUxOTM5M30.CrjLbJ46CDPWhu0raJm79IoO9VTATxb923XpoDjPzWY-33DcLrk6mr_3mtIQM60crOg9jYxkTmfOf6qQEFzC8Q"
# 启动主循环
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
stop_motors()
except Exception as e:
logger.error(f"程序执行错误: {e}")
stop_motors()
步骤5 M10程序DeepSeek加强版,增加速度和时间调节功能,完全拿捏双电机平台
小智运动控制系统功能简介
基础运动控制
前进、后退、左转、右转、停止
智能参数解析
支持自然语言指令:
"前进一点"(0.5秒)
"左转3秒"
"以90%速度后退"
精确调速控制
速度范围:50%-100%
PWM精准映射:50%→512,100%→1023
时间精准控制
毫秒级定时器
支持0.1-60秒精确时长
安全机制
新命令自动取消当前动作
程序退出自动停止电机
异常状态保护
稳定连接
WebSocket自动重连
指数退避算法
这套系统可以通过自然语言精准控制M10双电机小智平台的运动,真正实现了"说走就走,说停就停"的智能交互体验!
"""
小智运动控制服务器
功能:
1. 提供前进、后退、左转、右转、停止等基本运动控制
2. 支持速度调节(50%-100%)
3. 支持时间参数控制运动持续时间
"""
from mcp.server.fastmcp import FastMCP
import sys
import logging
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器
mcp = FastMCP("MoveServer")
@mcp.tool()
def forward(speed: int = 80, duration: float = 0) -> dict:
"""
控制小智前进
参数:
speed (int): 前进速度(50-100%),默认80%
duration (float): 前进持续时间(秒),0表示持续前进直到停止命令
返回:
dict: 包含操作结果的字典
"""
result = f"小智以{speed}%速度前进"
if duration > 0:
result += f" {duration}秒"
logger.info(result)
return {"success": True, "result": result, "speed": speed, "duration": duration}
@mcp.tool()
def back(speed: int = 80, duration: float = 0) -> dict:
"""
控制小智后退
参数:
speed (int): 后退速度(50-100%),默认80%
duration (float): 后退持续时间(秒),0表示持续后退直到停止命令
返回:
dict: 包含操作结果的字典
"""
result = f"小智以{speed}%速度后退"
if duration > 0:
result += f" {duration}秒"
logger.info(result)
return {"success": True, "result": result, "speed": speed, "duration": duration}
@mcp.tool()
def left(speed: int = 80, duration: float = 0) -> dict:
"""
控制小智左转
参数:
speed (int): 转向速度(50-100%),默认80%
duration (float): 转向持续时间(秒),0表示持续转向直到停止命令
返回:
dict: 包含操作结果的字典
"""
result = f"小智以{speed}%速度左转"
if duration > 0:
result += f" {duration}秒"
logger.info(result)
return {"success": True, "result": result, "speed": speed, "duration": duration}
@mcp.tool()
def right(speed: int = 80, duration: float = 0) -> dict:
"""
控制小智右转
参数:
speed (int): 转向速度(50-100%),默认80%
duration (float): 转向持续时间(秒),0表示持续转向直到停止命令
返回:
dict: 包含操作结果的字典
"""
result = f"小智以{speed}%速度右转"
if duration > 0:
result += f" {duration}秒"
logger.info(result)
return {"success": True, "result": result, "speed": speed, "duration": duration}
@mcp.tool()
def stop() -> dict:
"""
控制小智停止
返回:
dict: 包含操作结果的字典
"""
result = "小智停止"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def set_speed(speed: int) -> dict:
"""
设置默认运动速度
参数:
speed (int): 运动速度(50-100%)
返回:
dict: 包含操作结果的字典
"""
if 50 <= speed <= 100:
result = f"设置速度为{speed}%"
logger.info(result)
return {"success": True, "result": result}
else:
result = "速度值必须在50-100之间"
logger.warning(result)
return {"success": False, "result": result}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
# -*- coding: UTF-8 -*-
"""
此脚本用于连接MCP服务器,并将输入输出通过WebSocket端点传输。
版本: 0.1.4
主要修复:
1. 速度和时间参数解析冲突问题
2. 增强参数提取的准确性
"""
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import json
import time
import re
from unihiker import GUI
from pinpong.board import Board, Pin
# === 硬件初始化 ===
Board().begin() # 初始化开发板
# 创建UI对象
ui = GUI()
# 绘制背景和标题
ui.draw_image(image="back.png", x=0, y=0)
ui.draw_text(text="M10小智MCP", x=30, y=10, font_size=20, color="#FFFFFF")
# 初始化电机控制引脚
motor_left_dir = Pin(Pin.P5, Pin.OUT) # 左电机方向控制
motor_left_pwm = Pin(Pin.P8, Pin.PWM) # 左电机PWM控制
motor_right_dir = Pin(Pin.P6, Pin.OUT) # 右电机方向控制
motor_right_pwm = Pin(Pin.P16, Pin.PWM) # 右电机PWM控制
# === 日志配置 ===
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# === 全局状态 ===
CURRENT_SPEED = 80 # 默认速度(80%)
current_action_task = None # 当前动作任务
# === 重连设置 ===
INITIAL_BACKOFF = 1 # 初始重连等待时间(秒)
MAX_BACKOFF = 600 # 最大重连等待时间(秒)
reconnect_attempt = 0 # 重连尝试次数
backoff = INITIAL_BACKOFF # 当前重连等待时间
async def connect_with_retry(uri):
"""带重试机制的WebSocket连接"""
global reconnect_attempt, backoff
while True: # 无限重连
try:
# 重连等待处理
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1) # 添加随机抖动
logger.info(f"等待 {wait_time:.2f}秒后尝试第 {reconnect_attempt}次重连...")
await asyncio.sleep(wait_time)
# 尝试连接服务器
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
logger.warning(f"连接关闭 (尝试次数: {reconnect_attempt}): {e}")
# 指数退避算法
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri):
"""连接到WebSocket服务器并与move.py进程建立双向通信"""
global reconnect_attempt, backoff
try:
logger.info(f"正在连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info(f"成功连接到WebSocket服务器")
# 重置重连计数器
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# 启动move.py进程
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"已启动move.py进程")
# 创建并行任务处理通信
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
# 确保子进程终止
if 'process' in locals():
logger.info(f"终止move.py进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"move.py进程已终止")
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程stdin"""
try:
while True:
# 读取WebSocket消息
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
# 写入进程stdin
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
raise
finally:
# 安全关闭stdin
if not process.stdin.closed:
process.stdin.close()
async def stop_motors():
"""停止所有电机"""
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
logger.info("电机已停止")
def calculate_speed(speed_percent):
"""计算实际PWM值(50%-100%对应512-1023)"""
speed_percent = max(50, min(100, speed_percent)) # 限制在50-100%范围
return int(512 + (speed_percent - 50) * (1023 - 512) / 50)
async def execute_action(action, speed_percent, duration):
"""执行动作并设置持续时间"""
global current_action_task
# 取消当前正在执行的动作任务
if current_action_task and not current_action_task.done():
current_action_task.cancel()
logger.info("已取消当前动作")
# 计算实际速度值
actual_speed = calculate_speed(speed_percent)
logger.info(f"执行动作: {action}, 速度: {speed_percent}% (PWM: {actual_speed}), 持续时间: {duration}秒")
# 根据动作类型设置电机
if action == "前进":
motor_left_dir.write_digital(0)
motor_right_dir.write_digital(0)
motor_left_pwm.write_analog(actual_speed)
motor_right_pwm.write_analog(actual_speed)
elif action == "后退":
motor_left_dir.write_digital(1)
motor_right_dir.write_digital(1)
motor_left_pwm.write_analog(actual_speed)
motor_right_pwm.write_analog(actual_speed)
elif action == "左转":
motor_left_dir.write_digital(1)
motor_right_dir.write_digital(0)
motor_left_pwm.write_analog(actual_speed)
motor_right_pwm.write_analog(actual_speed)
elif action == "右转":
motor_left_dir.write_digital(0)
motor_right_dir.write_digital(1)
motor_left_pwm.write_analog(actual_speed)
motor_right_pwm.write_analog(actual_speed)
elif action == "停止":
await stop_motors()
return
# 如果有持续时间,设置定时停止
if duration > 0:
async def timed_stop():
start = time.time()
# 精确等待指定时间
await asyncio.sleep(duration)
# 检查是否应该停止(防止任务被取消后仍执行停止)
if time.time() - start >= duration:
await stop_motors()
logger.info(f"动作完成,已停止电机")
current_action_task = asyncio.create_task(timed_stop())
def extract_speed(text):
"""
从文本中提取速度值(50-100%)
使用正则表达式精确匹配速度模式
"""
# 匹配"速度"前的数字(可能包含小数点)
speed_pattern = r'(\d+)%速度'
match = re.search(speed_pattern, text)
if match:
try:
speed = int(match.group(1))
if 50 <= speed <= 100:
return speed
except ValueError:
return None
# 匹配"设置速度"命令
set_speed_pattern = r'设置速度[为]?(\d+)%'
match = re.search(set_speed_pattern, text, re.IGNORECASE)
if match:
try:
speed = int(match.group(1))
if 50 <= speed <= 100:
return speed
except ValueError:
return None
# 匹配独立的百分数
percent_pattern = r'(\d+)%'
match = re.search(percent_pattern, text)
if match:
try:
speed = int(match.group(1))
if 50 <= speed <= 100:
return speed
except ValueError:
return None
return None
def extract_duration(text):
"""
从文本中提取持续时间(秒)
使用正则表达式精确匹配时间模式
"""
# 匹配"秒"前的数字(可能包含小数点)
seconds_pattern = r'(\d+\.?\d*)\s*秒'
match = re.search(seconds_pattern, text)
if match:
try:
duration = float(match.group(1))
if duration > 0:
return duration
except ValueError:
return None
# 匹配"一点"或"一点儿"
if "一点" in text or "点儿" in text:
return 0.5
# 匹配数字+时间单位(秒)
time_pattern = r'(\d+\.?\d*)\s*(秒|s)'
match = re.search(time_pattern, text, re.IGNORECASE)
if match:
try:
duration = float(match.group(1))
if duration > 0:
return duration
except ValueError:
return None
return 0 # 默认0秒(持续执行)
async def pipe_process_to_websocket(process, websocket):
"""从进程stdout读取数据并发送到WebSocket"""
global CURRENT_SPEED
try:
while True:
# 读取进程输出
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data: # 进程结束
logger.info("进程输出结束")
break
logger.debug(f">> 发送数据: {data[:120]}...")
try:
# 解析JSON命令
json_data = json.loads(data)
command_id = json_data.get('id', 0)
# 处理有效命令
if command_id > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
command_data = json.loads(content)
if command_data['success']:
result_text = command_data.get('result', '')
logger.info(f"收到命令: {result_text}")
# 默认参数
action = "停止"
duration = 0
new_speed = None
# 解析动作
if "前进" in result_text:
action = "前进"
elif "后退" in result_text:
action = "后退"
elif "左转" in result_text:
action = "左转"
elif "右转" in result_text:
action = "右转"
elif "停止" in result_text:
action = "停止"
# 解析时间参数
duration = extract_duration(result_text)
if duration > 0:
logger.info(f"解析到持续时间: {duration}秒")
# 解析速度参数
speed_value = extract_speed(result_text)
if speed_value is not None:
if 50 <= speed_value <= 100:
CURRENT_SPEED = speed_value
logger.info(f"更新速度为: {CURRENT_SPEED}%")
else:
logger.warning(f"速度值 {speed_value}% 超出范围(50-100)")
# 执行动作
await execute_action(action, CURRENT_SPEED, duration)
except json.JSONDecodeError:
logger.error("JSON解析失败")
except KeyError:
logger.error("命令格式错误")
except Exception as e:
logger.error(f"命令处理错误: {e}")
# 转发数据到WebSocket
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
raise
async def pipe_process_stderr_to_terminal(process):
"""从进程stderr读取数据并打印到终端"""
try:
while True:
# 读取错误输出
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data: # 进程结束
logger.info("进程错误输出结束")
break
# 打印到终端
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程stderr管道错误: {e}")
raise
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("收到中断信号,正在关闭...")
# 停止所有电机
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
sys.exit(0)
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
# WebSocket端点配置
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzUxOTM5M30.CrjLbJ46CDPWhu0raJm79IoO9VTATxb923XpoDjPzWY-33DcLrk6mr_3mtIQM60crOg9jYxkTmfOf6qQEFzC8Q"
# 启动主循环
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
# 确保停止电机
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
except Exception as e:
logger.error(f"程序执行错误: {e}")
# 确保停止电机
motor_left_pwm.write_analog(0)
motor_right_pwm.write_analog(0)
## 使用说明
1. **部署**
- 将两个文件放在同一目录
- 准备背景图片 `back.png`
2. **运行**
```bash
python mcp_pipe.py
```
在Mind+中运行mcp_pipe.py。

此时,小智控制台接入点状态:在线,可用工具:可见。

用小智AI进行各种指令互动。
评论