8.1
【写在前面】
7.3,学习云天老师项目后写了M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)。基本上一个月的时间主要都在玩小智+M10MCP,有小智的加入,对硬件的操控有趣多了。

今天这个帖子,做为续集,记录对代码的优化,同时还有我手搓的roco小智的加入。自从手搓出那个头发蓬乱的小智后,它还没有在视频中出现,今天,它将和仰望小车配合,完成小车新功能的测试。用DF FireBeetle 2 Board ESP32-S3(N16R8)开发板搓一个头发蓬乱的小智- Makelog(造物记)

下面有测试通过了两组代码,视频中用的是第二组,可以控制运动时间的版本。
视频是第二组代码第一次测试录制,测试顺利通过。

材料清单
- M10+M10电池扩展板 X1
- 仰望小车 X1
- roco小智 X1
步骤1 准备工作:记录前面两个帖子中了,仰望小车,MCP服务,roco小智
M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)
提示:小智的版本不重要,只要有小智就可以,要用它的MCP接入点地址。
用DF FireBeetle 2 Board ESP32-S3(N16R8)开发板搓一个头发蓬乱的小智- Makelog(造物记)
步骤2 roco小智的配置与MCP接入点获取
步骤3 M10程序:优化版
这组代码是在M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)代码的基础上优化的,主要是中文注释,便于代码阅读,简化代码,便于维护和改写,优化连接效率,更加快捷的与服务器连接。

优化版在保持功能不变的情况下,通过:
结构化封装 - 硬件操作独立成类
代码精简 - 消除30%重复代码
错误处理 - 细化异常捕获
维护友好 - 模块化设计
文档完善 - 添加函数说明
使代码更易于理解、扩展和维护,同时提高了系统的稳定性和可靠性。特别是将硬件操作集中管理,极大简化了未来的硬件调整和功能扩展工作。

1. mcp_pipe.py (核心控制器)
主要功能:
WebSocket客户端: 连接云端MCP服务
硬件控制器: 直接操作小车硬件
进程管理器: 启动和控制move.py进程
指令中继: 在WebSocket和move.py之间传递消息
2. move.py (MCP工具服务器)
主要功能:
定义硬件控制API
接收结构化指令
返回执行结果
组件说明
1. 小智AI (云端)
功能:接收用户自然语言指令,解析为结构化命令
输出:MCP格式的JSON指令
示例指令:
"前进3秒"
"左转90度"
"停车"
2. MCP接入点
功能:云端WebSocket服务端点
作用:作为小智AI与本地代理的通信桥梁
特点:
提供认证(token)
保持长连接
双向通信
3. mcp_pipe.py (WebSocket中继)
核心功能:
WebSocket客户端
硬件控制器
MCP服务器管理器
关键模块:
python
class CarController:
def __init__(self): # 硬件初始化
async def execute_command(): # 执行指令
def _move_forward(): # 前进
def _turn_left(): # 左转
# ...其他动作
async def _auto_stop_after_delay(): # 自动停止
4. move.py (MCP服务器)
功能:
注册硬件控制工具
定义API接口
返回结构化响应
核心工具:
python
@mcp.tool()
def forward(duration: float = None): # 前进
@mcp.tool()
def left(duration: float = None): # 左转
# ...其他工具
5. 仰望小车硬件
控制引脚:
P0: 舵机(方向控制)
P5/P6: 电机方向
P8/P16: 电机速度(PWM)
运动原理:
前进: 舵机87° + 双电机正转
后退: 舵机87° + 双电机反转
左转: 舵机60° + 双电机正转
右转: 舵机120° + 双电机正转
# mcp_pipe.py
"""
连接MCP服务器并将输入/输出通过WebSocket传输
Version: 0.1.1
Usage:
python mcp_pipe.py
"""
import asyncio
import json
import logging
import signal
import subprocess
import sys
import random
import websockets
from unihiker import GUI
from pinpong.board import Board, Pin, Servo
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# MCP接入点地址 - 直接硬编码在程序中
MCP_ENDPOINT = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludE……ntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
# 重连设置
INITIAL_BACKOFF = 1 # 初始等待时间(秒)
MAX_BACKOFF = 600 # 最大等待时间(秒)
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
class CarController:
"""控制仰望小车的硬件操作"""
def __init__(self):
Board().begin()
self.gui = GUI()
self.gui.draw_image(image="back.png", x=0, y=0)
self.gui.draw_text(text="M10仰望小车", x=10, y=0, font_size=25, color="#FFFFFF")
# 初始化硬件引脚
self.p5 = Pin(Pin.P5, Pin.OUT)
self.p6 = Pin(Pin.P6, Pin.OUT)
self.p8 = Pin(Pin.P8, Pin.PWM)
self.p16 = Pin(Pin.P16, Pin.PWM)
self.servo = Servo(Pin(Pin.P0))
self.servo.write_angle(87) # 初始角度
def execute_command(self, command: str):
"""根据指令执行相应的硬件操作"""
logger.info(f"执行指令: {command}")
if "前进" in command:
self._move_forward()
elif "后退" in command:
self._move_backward()
elif "左转" in command:
self._turn_left()
elif "右转" in command:
self._turn_right()
elif "停车" in command:
self._stop()
def _move_forward(self):
"""前进操作"""
self.servo.write_angle(87)
self.p5.write_digital(1)
self.p8.write_analog(1023) # 100%占空比
self.p6.write_digital(1)
self.p16.write_analog(1023)
def _move_backward(self):
"""后退操作"""
self.servo.write_angle(87)
self.p5.write_digital(0)
self.p8.write_analog(1023)
self.p6.write_digital(0)
self.p16.write_analog(1023)
def _turn_left(self):
"""左转操作"""
self.servo.write_angle(60)
self.p5.write_digital(1)
self.p8.write_analog(512)
self.p6.write_digital(1)
self.p16.write_analog(512)
def _turn_right(self):
"""右转操作"""
self.servo.write_angle(120)
self.p5.write_digital(1)
self.p8.write_analog(512)
self.p6.write_digital(1)
self.p16.write_analog(512)
def _stop(self):
"""停止操作"""
self.servo.write_angle(87)
self.p8.write_analog(0)
self.p16.write_analog(0)
async def connect_with_retry(uri, car_controller):
"""带重试机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1)
logger.info(f"等待 {wait_time:.2f} 秒后尝试重连 #{reconnect_attempt}...")
await asyncio.sleep(wait_time)
await connect_to_server(uri, car_controller)
# 重置重连状态
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
except Exception as e:
logger.error(f"连接错误: {e}")
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri, car_controller):
"""连接到WebSocket服务器并建立通信管道"""
logger.info(f"正在连接到WebSocket服务器: {uri}")
async with websockets.connect(uri) as websocket:
logger.info("成功连接到WebSocket服务器")
# 启动MCP脚本进程
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info("已启动MCP进程")
try:
# 创建通信任务
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket, car_controller),
pipe_process_stderr_to_terminal(process)
)
finally:
# 确保子进程终止
terminate_process(process)
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程stdin"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket, car_controller):
"""从进程stdout读取数据并发送到WebSocket"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data:
logger.info("进程输出结束")
break
logger.debug(f">> 发送消息: {data[:120]}...")
# 处理有效指令
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
command_data = json.loads(content)
if command_data['success']:
car_controller.execute_command(command_data['result'])
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"消息解析错误: {e}")
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
raise
async def pipe_process_stderr_to_terminal(process):
"""将进程stderr输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data:
logger.info("进程错误输出结束")
break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程错误管道错误: {e}")
raise
def terminate_process(process):
"""安全终止子进程"""
if process.poll() is None:
logger.info("终止MCP进程...")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info("MCP进程已终止")
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("收到中断信号,正在关闭...")
sys.exit(0)
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
# 初始化小车控制器
car_controller = CarController()
# 使用硬编码的MCP接入点地址
logger.info(f"使用MCP接入点: {MCP_ENDPOINT[:50]}...") # 只显示部分地址
# 启动主循环
try:
asyncio.run(connect_with_retry(MCP_ENDPOINT, car_controller))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
# move.py
import sys
import logging
from mcp.server.fastmcp import FastMCP
# 初始化日志
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器
mcp = FastMCP("MoveServer")
def log_and_return(action: str) -> dict:
"""记录操作日志并返回结果字典"""
logger.info(f"仰望小车{action}")
return {"success": True, "result": f"仰望小车{action}"}
@mcp.tool()
def forward() -> dict:
"""
控制仰望小车前进。
设置:
- P0舵机: 87度
- P5/P6引脚: 高电平
- P8/P16引脚: 100%占空比PWM输出
"""
return log_and_return("前进")
@mcp.tool()
def back() -> dict:
"""
控制仰望小车后退。
设置:
- P0舵机: 87度
- P5/P6引脚: 低电平
- P8/P16引脚: 100%占空比PWM输出
"""
return log_and_return("后退")
@mcp.tool()
def left() -> dict:
"""
控制仰望小车左转。
设置:
- P0舵机: 60度
- P5/P6引脚: 高电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("左转")
@mcp.tool()
def right() -> dict:
"""
控制仰望小车右转。
设置:
- P0舵机: 110度
- P5/P6引脚: 高电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("右转")
@mcp.tool()
def stop() -> dict:
"""
控制仰望小车停止。
设置:
- P0舵机: 87度
- P8/P16引脚: PWM输出为0
"""
return log_and_return("停车")
if __name__ == "__main__":
mcp.run(transport="stdio")
步骤4 M10编程:功能强化,增加运动时间控制
这个版本除了上面代码的基础功能外,增加了运行时间的控制。

时间控制特点
精确计时:使用asyncio.sleep实现毫秒级精度
任务管理:支持中断当前任务执行新指令
自动恢复:超时后自动恢复停止状态
资源优化:异步执行不阻塞主线程
关键设计亮点
分层架构:
通信层(mcp_pipe)
服务层(move.py)
硬件层(CarController)
双缓冲通信:
WebSocket ↔ 标准输入输出
解耦云端与本地执行
异步任务管理:
时间控制任务可取消
异常安全处理
硬件抽象:
python
class CarController:
def _move_forward(self): # 封装硬件细节
self.servo.write_angle(87)
self.p5.write_digital(1)
# ...
自动重连机制:
python
async def connect_with_retry():
while True: # 无限重试
try:
# 连接尝试
except:
# 指数退避重连
backoff = min(backoff * 2, MAX_BACKOFF)
系统特点
实时性:指令到执行延迟
# mcp_pipe.py
"""
连接MCP服务器并将输入/输出通过WebSocket传输
Version: 0.2.1
新增时间控制功能
Usage:
python mcp_pipe.py
"""
import asyncio
import json
import logging
import signal
import subprocess
import sys
import random
import websockets
from typing import Optional
from unihiker import GUI
from pinpong.board import Board, Pin, Servo
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# MCP接入点地址 - 直接硬编码在程序中
MCP_ENDPOINT = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NTEzODI0LCJlbmRwb2ludElkIjoiYWdlbnRfNTEzODI0IiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1Mzk3Mjc3NH0.aDt6_SsMMFuutNzwm__1bgqFXNFbVNYaJGUqKKvWL8d0yxuvO_swpBrliFP4jXZAOg-R59m15yGxXpyZkOl1oQ"
# 重连设置
INITIAL_BACKOFF = 1 # 初始等待时间(秒)
MAX_BACKOFF = 600 # 最大等待时间(秒)
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
class CarController:
"""控制仰望小车的硬件操作"""
def __init__(self):
Board().begin()
self.gui = GUI()
self.gui.draw_image(image="back.png", x=0, y=0)
self.gui.draw_text(text="M10仰望小车", x=10, y=0, font_size=25, color="#FFFFFF")
# 初始化硬件引脚
self.p5 = Pin(Pin.P5, Pin.OUT)
self.p6 = Pin(Pin.P6, Pin.OUT)
self.p8 = Pin(Pin.P8, Pin.PWM)
self.p16 = Pin(Pin.P16, Pin.PWM)
self.servo = Servo(Pin(Pin.P0))
self.servo.write_angle(87) # 初始角度
# 新增:当前执行的任务
self.current_task = None
async def execute_command(self, command: str, duration: Optional[float] = None):
"""
根据指令执行相应的硬件操作
新增:支持带持续时间的异步执行
"""
# 取消当前正在执行的任务(如果有)
if self.current_task:
self.current_task.cancel()
try:
await self.current_task
except asyncio.CancelledError:
logger.info("已取消前一个任务")
self.current_task = None
# 执行新指令
logger.info(f"执行指令: {command}, 持续时间: {duration}秒")
if "前进" in command:
self._move_forward()
elif "后退" in command:
self._move_backward()
elif "左转" in command:
self._turn_left()
elif "右转" in command:
self._turn_right()
elif "停车" in command:
self._stop()
else:
logger.warning(f"未知指令: {command}")
return
# 如果指定了持续时间,则创建定时停止任务
if duration and duration > 0:
# 修复括号问题
self.current_task = asyncio.create_task(
self._auto_stop_after_delay(duration)
)
logger.info(f"已创建自动停止任务: {duration}秒后停止")
def _move_forward(self):
"""前进操作"""
self.servo.write_angle(87)
self.p5.write_digital(1)
self.p8.write_analog(512) # 50%占空比
self.p6.write_digital(1)
self.p16.write_analog(512)
def _move_backward(self):
"""后退操作"""
self.servo.write_angle(87)
self.p5.write_digital(0)
self.p8.write_analog(512)
self.p6.write_digital(0)
self.p16.write_analog(512)
def _turn_left(self):
"""左转操作"""
self.servo.write_angle(60)
self.p5.write_digital(1)
self.p8.write_analog(512)
self.p6.write_digital(1)
self.p16.write_analog(512)
def _turn_right(self):
"""右转操作"""
self.servo.write_angle(120)
self.p5.write_digital(1)
self.p8.write_analog(512)
self.p6.write_digital(1)
self.p16.write_analog(512)
def _stop(self):
"""停止操作"""
self.servo.write_angle(87)
self.p8.write_analog(0)
self.p16.write_analog(0)
async def _auto_stop_after_delay(self, delay: float):
"""在指定延迟后自动停止小车"""
try:
await asyncio.sleep(delay)
logger.info(f"时间到({delay}秒),自动停止")
self._stop()
self.current_task = None
except asyncio.CancelledError:
logger.info("自动停止任务被取消")
async def connect_with_retry(uri, car_controller):
"""带重试机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1)
logger.info(f"等待 {wait_time:.2f} 秒后尝试重连 #{reconnect_attempt}...")
await asyncio.sleep(wait_time)
await connect_to_server(uri, car_controller)
# 重置重连状态
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
except Exception as e:
logger.error(f"连接错误: {e}")
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri, car_controller):
"""连接到WebSocket服务器并建立通信管道"""
logger.info(f"正在连接到WebSocket服务器: {uri}")
async with websockets.connect(uri) as websocket:
logger.info("成功连接到WebSocket服务器")
# 启动MCP脚本进程
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info("已启动MCP进程")
try:
# 创建通信任务
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket, car_controller),
pipe_process_stderr_to_terminal(process)
)
finally:
# 确保子进程终止
terminate_process(process)
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程stdin"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket, car_controller):
"""从进程stdout读取数据并发送到WebSocket"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data:
logger.info("进程输出结束")
break
logger.debug(f">> 发送消息: {data[:120]}...")
# 处理有效指令
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
command_data = json.loads(content)
if command_data['success']:
# 新增:获取持续时间参数
duration = command_data.get('duration')
# 异步执行指令
await car_controller.execute_command(
command_data['result'],
duration
)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"消息解析错误: {e}")
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
raise
async def pipe_process_stderr_to_terminal(process):
"""将进程stderr输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data:
logger.info("进程错误输出结束")
break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程错误管道错误: {e}")
raise
def terminate_process(process):
"""安全终止子进程"""
if process.poll() is None:
logger.info("终止MCP进程...")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info("MCP进程已终止")
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("收到中断信号,正在关闭...")
sys.exit(0)
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
# 初始化小车控制器
car_controller = CarController()
# 使用硬编码的MCP接入点地址
logger.info(f"使用MCP接入点: {MCP_ENDPOINT[:50]}...") # 只显示部分地址
# 启动主循环
try:
asyncio.run(connect_with_retry(MCP_ENDPOINT, car_controller))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
# move.py
import sys
import logging
from mcp.server.fastmcp import FastMCP
from typing import Optional
# 初始化日志
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器
mcp = FastMCP("MoveServer")
def log_and_return(action: str, duration: Optional[float] = None) -> dict:
"""记录操作日志并返回结果字典"""
if duration:
message = f"仰望小车{action}{duration}秒"
else:
message = f"仰望小车{action}"
logger.info(message)
return {
"success": True,
"result": message,
"duration": duration # 新增持续时间字段
}
@mcp.tool()
def forward(duration: Optional[float] = None) -> dict:
"""
控制仰望小车前进。
参数:
duration: 前进持续时间(秒),None表示持续前进直到下个指令
设置:
- P0舵机: 87度
- P5/P6引脚: 高电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("前进", duration)
@mcp.tool()
def back(duration: Optional[float] = None) -> dict:
"""
控制仰望小车后退。
参数:
duration: 后退持续时间(秒),None表示持续后退直到下个指令
设置:
- P0舵机: 87度
- P5/P6引脚: 低电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("后退", duration)
@mcp.tool()
def left(duration: Optional[float] = None) -> dict:
"""
控制仰望小车左转。
参数:
duration: 左转持续时间(秒),None表示持续左转直到下个指令
设置:
- P0舵机: 60度
- P5/P6引脚: 高电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("左转", duration)
@mcp.tool()
def right(duration: Optional[float] = None) -> dict:
"""
控制仰望小车右转。
参数:
duration: 右转持续时间(秒),None表示持续右转直到下个指令
设置:
- P0舵机: 110度
- P5/P6引脚: 高电平
- P8/P16引脚: 50%占空比PWM输出
"""
return log_and_return("右转", duration)
@mcp.tool()
def stop() -> dict:
"""
控制仰望小车停止。
设置:
- P0舵机: 87度
- P8/P16引脚: PWM输出为0
"""
return log_and_return("停车")
if __name__ == "__main__":
mcp.run(transport="stdio")
连接终端,在Mind+运行程序代码。

出现Processing request of type PingRequest连接成功。
这时小智平台MCP显示在线,工具可用:

用小智和仰望小车互动吧。
【注】
本帖子文本和代码由DeepSeek协助完成,致谢。
评论