回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

M10仰望小车(MCP)的学习与尝试(2) 简单

头像 rzyzzxw 2025.08.01 8 0

8.1

【写在前面】

7.3,学习云天老师项目后写了M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)。基本上一个月的时间主要都在玩小智+M10MCP,有小智的加入,对硬件的操控有趣多了。

1f3b1646b1272628988ecd4b0e09e24.jpg

今天这个帖子,做为续集,记录对代码的优化,同时还有我手搓的roco小智的加入。自从手搓出那个头发蓬乱的小智后,它还没有在视频中出现,今天,它将和仰望小车配合,完成小车新功能的测试。用DF FireBeetle 2 Board ESP32-S3(N16R8)开发板搓一个头发蓬乱的小智- Makelog(造物记)

77f2085ab936326ebdf2668a1dd83fd.jpg

下面有测试通过了两组代码,视频中用的是第二组,可以控制运动时间的版本。

视频是第二组代码第一次测试录制,测试顺利通过。

68f8834dc905945b19ee84166944303.jpg

材料清单

  • M10+M10电池扩展板 X1
  • 仰望小车 X1
  • roco小智 X1

步骤1 准备工作:记录前面两个帖子中了,仰望小车,MCP服务,roco小智

M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)

提示:小智的版本不重要,只要有小智就可以,要用它的MCP接入点地址。

用DF FireBeetle 2 Board ESP32-S3(N16R8)开发板搓一个头发蓬乱的小智- Makelog(造物记)

步骤2 roco小智的配置与MCP接入点获取

image.png
image.png

步骤3 M10程序:优化版

这组代码是在M10仰望小车(MCP)的学习与尝试(1)- Makelog(造物记)代码的基础上优化的,主要是中文注释,便于代码阅读,简化代码,便于维护和改写,优化连接效率,更加快捷的与服务器连接。

yw.png

优化版在保持功能不变的情况下,通过:

结构化封装 - 硬件操作独立成类

代码精简 - 消除30%重复代码

错误处理 - 细化异常捕获

维护友好 - 模块化设计

文档完善 - 添加函数说明

使代码更易于理解、扩展和维护,同时提高了系统的稳定性和可靠性。特别是将硬件操作集中管理,极大简化了未来的硬件调整和功能扩展工作。

image.png

1. mcp_pipe.py (核心控制器)

主要功能:

WebSocket客户端: 连接云端MCP服务

硬件控制器: 直接操作小车硬件

进程管理器: 启动和控制move.py进程

指令中继: 在WebSocket和move.py之间传递消息

2. move.py (MCP工具服务器)

主要功能:

定义硬件控制API

接收结构化指令

返回执行结果

组件说明
1. 小智AI (云端)
功能:接收用户自然语言指令,解析为结构化命令

输出:MCP格式的JSON指令

示例指令:

"前进3秒"

"左转90度"

"停车"

2. MCP接入点
功能:云端WebSocket服务端点

作用:作为小智AI与本地代理的通信桥梁

特点:

提供认证(token)

保持长连接

双向通信

3. mcp_pipe.py (WebSocket中继)
核心功能:

WebSocket客户端

硬件控制器

MCP服务器管理器

关键模块:

python
class CarController:
def __init__(self): # 硬件初始化
async def execute_command(): # 执行指令
def _move_forward(): # 前进
def _turn_left(): # 左转
# ...其他动作
async def _auto_stop_after_delay(): # 自动停止
4. move.py (MCP服务器)
功能:

注册硬件控制工具

定义API接口

返回结构化响应

核心工具:

python
@mcp.tool()
def forward(duration: float = None): # 前进
@mcp.tool()
def left(duration: float = None): # 左转
# ...其他工具
5. 仰望小车硬件
控制引脚:

P0: 舵机(方向控制)

P5/P6: 电机方向

P8/P16: 电机速度(PWM)

运动原理:

前进: 舵机87° + 双电机正转

后退: 舵机87° + 双电机反转

左转: 舵机60° + 双电机正转

右转: 舵机120° + 双电机正转

代码
# mcp_pipe.py
"""
连接MCP服务器并将输入/输出通过WebSocket传输
Version: 0.1.1
Usage: 
python mcp_pipe.py
"""

import asyncio
import json
import logging
import signal
import subprocess
import sys
import random
import websockets
from unihiker import GUI
from pinpong.board import Board, Pin, Servo

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# MCP接入点地址 - 直接硬编码在程序中
MCP_ENDPOINT = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludE……ntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"

# 重连设置
INITIAL_BACKOFF = 1  # 初始等待时间(秒)
MAX_BACKOFF = 600    # 最大等待时间(秒)
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

class CarController:
    """控制仰望小车的硬件操作"""
    def __init__(self):
        Board().begin()
        self.gui = GUI()
        self.gui.draw_image(image="back.png", x=0, y=0)
        self.gui.draw_text(text="M10仰望小车", x=10, y=0, font_size=25, color="#FFFFFF")
        
        # 初始化硬件引脚
        self.p5 = Pin(Pin.P5, Pin.OUT)
        self.p6 = Pin(Pin.P6, Pin.OUT)
        self.p8 = Pin(Pin.P8, Pin.PWM)
        self.p16 = Pin(Pin.P16, Pin.PWM)
        self.servo = Servo(Pin(Pin.P0))
        self.servo.write_angle(87)  # 初始角度

    def execute_command(self, command: str):
        """根据指令执行相应的硬件操作"""
        logger.info(f"执行指令: {command}")
        
        if "前进" in command:
            self._move_forward()
        elif "后退" in command:
            self._move_backward()
        elif "左转" in command:
            self._turn_left()
        elif "右转" in command:
            self._turn_right()
        elif "停车" in command:
            self._stop()

    def _move_forward(self):
        """前进操作"""
        self.servo.write_angle(87)
        self.p5.write_digital(1)
        self.p8.write_analog(1023)  # 100%占空比
        self.p6.write_digital(1)
        self.p16.write_analog(1023)

    def _move_backward(self):
        """后退操作"""
        self.servo.write_angle(87)
        self.p5.write_digital(0)
        self.p8.write_analog(1023)
        self.p6.write_digital(0)
        self.p16.write_analog(1023)

    def _turn_left(self):
        """左转操作"""
        self.servo.write_angle(60)
        self.p5.write_digital(1)
        self.p8.write_analog(512)
        self.p6.write_digital(1)
        self.p16.write_analog(512)

    def _turn_right(self):
        """右转操作"""
        self.servo.write_angle(120)
        self.p5.write_digital(1)
        self.p8.write_analog(512)
        self.p6.write_digital(1)
        self.p16.write_analog(512)

    def _stop(self):
        """停止操作"""
        self.servo.write_angle(87)
        self.p8.write_analog(0)
        self.p16.write_analog(0)

async def connect_with_retry(uri, car_controller):
    """带重试机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)
                logger.info(f"等待 {wait_time:.2f} 秒后尝试重连 #{reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            await connect_to_server(uri, car_controller)
            # 重置重连状态
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

        except websockets.exceptions.ConnectionClosed as e:
            logger.error(f"WebSocket连接关闭: {e}")
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
        except Exception as e:
            logger.error(f"连接错误: {e}")
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri, car_controller):
    """连接到WebSocket服务器并建立通信管道"""
    logger.info(f"正在连接到WebSocket服务器: {uri}")
    async with websockets.connect(uri) as websocket:
        logger.info("成功连接到WebSocket服务器")
        
        # 启动MCP脚本进程
        process = subprocess.Popen(
            ['python', 'move.py'],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            encoding='utf-8',
            text=True
        )
        logger.info("已启动MCP进程")

        try:
            # 创建通信任务
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket, car_controller),
                pipe_process_stderr_to_terminal(process)
            )
        finally:
            # 确保子进程终止
            terminate_process(process)

async def pipe_websocket_to_process(websocket, process):
    """从WebSocket读取数据并写入进程stdin"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< 收到消息: {message[:120]}...")
            
            if isinstance(message, bytes):
                message = message.decode('utf-8')
                
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程管道错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket, car_controller):
    """从进程stdout读取数据并发送到WebSocket"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )
            
            if not data:
                logger.info("进程输出结束")
                break

            logger.debug(f">> 发送消息: {data[:120]}...")
            
            # 处理有效指令
            try:
                json_data = json.loads(data)
                if json_data['id'] > 1 and json_data.get('result', {}):
                    content = json_data['result']['content'][0]['text']
                    command_data = json.loads(content)
                    
                    if command_data['success']:
                        car_controller.execute_command(command_data['result'])
            except (json.JSONDecodeError, KeyError) as e:
                logger.warning(f"消息解析错误: {e}")

            await websocket.send(data)
    except Exception as e:
        logger.error(f"进程到WebSocket管道错误: {e}")
        raise

async def pipe_process_stderr_to_terminal(process):
    """将进程stderr输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )
            
            if not data:
                logger.info("进程错误输出结束")
                break
                
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"进程错误管道错误: {e}")
        raise

def terminate_process(process):
    """安全终止子进程"""
    if process.poll() is None:
        logger.info("终止MCP进程...")
        try:
            process.terminate()
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
        logger.info("MCP进程已终止")

def signal_handler(sig, frame):
    """处理中断信号"""
    logger.info("收到中断信号,正在关闭...")
    sys.exit(0)

if __name__ == "__main__":
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    
    # 初始化小车控制器
    car_controller = CarController()
    
    # 使用硬编码的MCP接入点地址
    logger.info(f"使用MCP接入点: {MCP_ENDPOINT[:50]}...")  # 只显示部分地址
    
    # 启动主循环
    try:
        asyncio.run(connect_with_retry(MCP_ENDPOINT, car_controller))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")
代码
# move.py
import sys
import logging
from mcp.server.fastmcp import FastMCP

# 初始化日志
logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器
mcp = FastMCP("MoveServer")

def log_and_return(action: str) -> dict:
    """记录操作日志并返回结果字典"""
    logger.info(f"仰望小车{action}")
    return {"success": True, "result": f"仰望小车{action}"}

@mcp.tool()
def forward() -> dict:
    """
    控制仰望小车前进。
    
    设置:
    - P0舵机: 87度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 100%占空比PWM输出
    """
    return log_and_return("前进")

@mcp.tool()
def back() -> dict:
    """
    控制仰望小车后退。
    
    设置:
    - P0舵机: 87度
    - P5/P6引脚: 低电平
    - P8/P16引脚: 100%占空比PWM输出
    """
    return log_and_return("后退")

@mcp.tool()
def left() -> dict:
    """
    控制仰望小车左转。
    
    设置:
    - P0舵机: 60度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("左转")

@mcp.tool()
def right() -> dict:
    """
    控制仰望小车右转。
    
    设置:
    - P0舵机: 110度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("右转")

@mcp.tool()
def stop() -> dict:
    """
    控制仰望小车停止。
    
    设置:
    - P0舵机: 87度
    - P8/P16引脚: PWM输出为0
    """
    return log_and_return("停车")

if __name__ == "__main__":
    mcp.run(transport="stdio")

步骤4 M10编程:功能强化,增加运动时间控制

这个版本除了上面代码的基础功能外,增加了运行时间的控制。

image.png

时间控制特点
精确计时:使用asyncio.sleep实现毫秒级精度

任务管理:支持中断当前任务执行新指令

自动恢复:超时后自动恢复停止状态

资源优化:异步执行不阻塞主线程

关键设计亮点
分层架构:

通信层(mcp_pipe)

服务层(move.py)

硬件层(CarController)

双缓冲通信:

WebSocket ↔ 标准输入输出

解耦云端与本地执行

异步任务管理:

时间控制任务可取消

异常安全处理

硬件抽象:

python
class CarController:
def _move_forward(self): # 封装硬件细节
self.servo.write_angle(87)
self.p5.write_digital(1)
# ...
自动重连机制:

python
async def connect_with_retry():
while True: # 无限重试
try:
# 连接尝试
except:
# 指数退避重连
backoff = min(backoff * 2, MAX_BACKOFF)
系统特点
实时性:指令到执行延迟

代码
# mcp_pipe.py
"""
连接MCP服务器并将输入/输出通过WebSocket传输
Version: 0.2.1
新增时间控制功能
Usage: 
python mcp_pipe.py
"""

import asyncio
import json
import logging
import signal
import subprocess
import sys
import random
import websockets
from typing import Optional
from unihiker import GUI
from pinpong.board import Board, Pin, Servo

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# MCP接入点地址 - 直接硬编码在程序中
MCP_ENDPOINT = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NTEzODI0LCJlbmRwb2ludElkIjoiYWdlbnRfNTEzODI0IiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1Mzk3Mjc3NH0.aDt6_SsMMFuutNzwm__1bgqFXNFbVNYaJGUqKKvWL8d0yxuvO_swpBrliFP4jXZAOg-R59m15yGxXpyZkOl1oQ"

# 重连设置
INITIAL_BACKOFF = 1  # 初始等待时间(秒)
MAX_BACKOFF = 600    # 最大等待时间(秒)
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

class CarController:
    """控制仰望小车的硬件操作"""
    def __init__(self):
        Board().begin()
        self.gui = GUI()
        self.gui.draw_image(image="back.png", x=0, y=0)
        self.gui.draw_text(text="M10仰望小车", x=10, y=0, font_size=25, color="#FFFFFF")
        
        # 初始化硬件引脚
        self.p5 = Pin(Pin.P5, Pin.OUT)
        self.p6 = Pin(Pin.P6, Pin.OUT)
        self.p8 = Pin(Pin.P8, Pin.PWM)
        self.p16 = Pin(Pin.P16, Pin.PWM)
        self.servo = Servo(Pin(Pin.P0))
        self.servo.write_angle(87)  # 初始角度
        
        # 新增:当前执行的任务
        self.current_task = None

    async def execute_command(self, command: str, duration: Optional[float] = None):
        """
        根据指令执行相应的硬件操作
        新增:支持带持续时间的异步执行
        """
        # 取消当前正在执行的任务(如果有)
        if self.current_task:
            self.current_task.cancel()
            try:
                await self.current_task
            except asyncio.CancelledError:
                logger.info("已取消前一个任务")
            self.current_task = None
        
        # 执行新指令
        logger.info(f"执行指令: {command}, 持续时间: {duration}秒")
        
        if "前进" in command:
            self._move_forward()
        elif "后退" in command:
            self._move_backward()
        elif "左转" in command:
            self._turn_left()
        elif "右转" in command:
            self._turn_right()
        elif "停车" in command:
            self._stop()
        else:
            logger.warning(f"未知指令: {command}")
            return
        
        # 如果指定了持续时间,则创建定时停止任务
        if duration and duration > 0:
            # 修复括号问题
            self.current_task = asyncio.create_task(
                self._auto_stop_after_delay(duration)
            )
            logger.info(f"已创建自动停止任务: {duration}秒后停止")

    def _move_forward(self):
        """前进操作"""
        self.servo.write_angle(87)
        self.p5.write_digital(1)
        self.p8.write_analog(512)  # 50%占空比
        self.p6.write_digital(1)
        self.p16.write_analog(512)

    def _move_backward(self):
        """后退操作"""
        self.servo.write_angle(87)
        self.p5.write_digital(0)
        self.p8.write_analog(512)
        self.p6.write_digital(0)
        self.p16.write_analog(512)

    def _turn_left(self):
        """左转操作"""
        self.servo.write_angle(60)
        self.p5.write_digital(1)
        self.p8.write_analog(512)
        self.p6.write_digital(1)
        self.p16.write_analog(512)

    def _turn_right(self):
        """右转操作"""
        self.servo.write_angle(120)
        self.p5.write_digital(1)
        self.p8.write_analog(512)
        self.p6.write_digital(1)
        self.p16.write_analog(512)

    def _stop(self):
        """停止操作"""
        self.servo.write_angle(87)
        self.p8.write_analog(0)
        self.p16.write_analog(0)
    
    async def _auto_stop_after_delay(self, delay: float):
        """在指定延迟后自动停止小车"""
        try:
            await asyncio.sleep(delay)
            logger.info(f"时间到({delay}秒),自动停止")
            self._stop()
            self.current_task = None
        except asyncio.CancelledError:
            logger.info("自动停止任务被取消")

async def connect_with_retry(uri, car_controller):
    """带重试机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)
                logger.info(f"等待 {wait_time:.2f} 秒后尝试重连 #{reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            await connect_to_server(uri, car_controller)
            # 重置重连状态
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

        except websockets.exceptions.ConnectionClosed as e:
            logger.error(f"WebSocket连接关闭: {e}")
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
        except Exception as e:
            logger.error(f"连接错误: {e}")
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri, car_controller):
    """连接到WebSocket服务器并建立通信管道"""
    logger.info(f"正在连接到WebSocket服务器: {uri}")
    async with websockets.connect(uri) as websocket:
        logger.info("成功连接到WebSocket服务器")
        
        # 启动MCP脚本进程
        process = subprocess.Popen(
            ['python', 'move.py'],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            encoding='utf-8',
            text=True
        )
        logger.info("已启动MCP进程")

        try:
            # 创建通信任务
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket, car_controller),
                pipe_process_stderr_to_terminal(process)
            )
        finally:
            # 确保子进程终止
            terminate_process(process)

async def pipe_websocket_to_process(websocket, process):
    """从WebSocket读取数据并写入进程stdin"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< 收到消息: {message[:120]}...")
            
            if isinstance(message, bytes):
                message = message.decode('utf-8')
                
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程管道错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket, car_controller):
    """从进程stdout读取数据并发送到WebSocket"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )
            
            if not data:
                logger.info("进程输出结束")
                break

            logger.debug(f">> 发送消息: {data[:120]}...")
            
            # 处理有效指令
            try:
                json_data = json.loads(data)
                if json_data['id'] > 1 and json_data.get('result', {}):
                    content = json_data['result']['content'][0]['text']
                    command_data = json.loads(content)
                    
                    if command_data['success']:
                        # 新增:获取持续时间参数
                        duration = command_data.get('duration')
                        # 异步执行指令
                        await car_controller.execute_command(
                            command_data['result'], 
                            duration
                        )
            except (json.JSONDecodeError, KeyError) as e:
                logger.warning(f"消息解析错误: {e}")

            await websocket.send(data)
    except Exception as e:
        logger.error(f"进程到WebSocket管道错误: {e}")
        raise

async def pipe_process_stderr_to_terminal(process):
    """将进程stderr输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )
            
            if not data:
                logger.info("进程错误输出结束")
                break
                
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"进程错误管道错误: {e}")
        raise

def terminate_process(process):
    """安全终止子进程"""
    if process.poll() is None:
        logger.info("终止MCP进程...")
        try:
            process.terminate()
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
        logger.info("MCP进程已终止")

def signal_handler(sig, frame):
    """处理中断信号"""
    logger.info("收到中断信号,正在关闭...")
    sys.exit(0)

if __name__ == "__main__":
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    
    # 初始化小车控制器
    car_controller = CarController()
    
    # 使用硬编码的MCP接入点地址
    logger.info(f"使用MCP接入点: {MCP_ENDPOINT[:50]}...")  # 只显示部分地址
    
    # 启动主循环
    try:
        asyncio.run(connect_with_retry(MCP_ENDPOINT, car_controller))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")
代码
# move.py
import sys
import logging
from mcp.server.fastmcp import FastMCP
from typing import Optional

# 初始化日志
logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器
mcp = FastMCP("MoveServer")

def log_and_return(action: str, duration: Optional[float] = None) -> dict:
    """记录操作日志并返回结果字典"""
    if duration:
        message = f"仰望小车{action}{duration}秒"
    else:
        message = f"仰望小车{action}"
    
    logger.info(message)
    return {
        "success": True, 
        "result": message,
        "duration": duration  # 新增持续时间字段
    }

@mcp.tool()
def forward(duration: Optional[float] = None) -> dict:
    """
    控制仰望小车前进。
    
    参数:
    duration: 前进持续时间(秒),None表示持续前进直到下个指令
    
    设置:
    - P0舵机: 87度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("前进", duration)

@mcp.tool()
def back(duration: Optional[float] = None) -> dict:
    """
    控制仰望小车后退。
    
    参数:
    duration: 后退持续时间(秒),None表示持续后退直到下个指令
    
    设置:
    - P0舵机: 87度
    - P5/P6引脚: 低电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("后退", duration)

@mcp.tool()
def left(duration: Optional[float] = None) -> dict:
    """
    控制仰望小车左转。
    
    参数:
    duration: 左转持续时间(秒),None表示持续左转直到下个指令
    
    设置:
    - P0舵机: 60度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("左转", duration)

@mcp.tool()
def right(duration: Optional[float] = None) -> dict:
    """
    控制仰望小车右转。
    
    参数:
    duration: 右转持续时间(秒),None表示持续右转直到下个指令
    
    设置:
    - P0舵机: 110度
    - P5/P6引脚: 高电平
    - P8/P16引脚: 50%占空比PWM输出
    """
    return log_and_return("右转", duration)

@mcp.tool()
def stop() -> dict:
    """
    控制仰望小车停止。
    
    设置:
    - P0舵机: 87度
    - P8/P16引脚: PWM输出为0
    """
    return log_and_return("停车")

if __name__ == "__main__":
    mcp.run(transport="stdio")

连接终端,在Mind+运行程序代码。

image.png

出现Processing request of type PingRequest连接成功。

这时小智平台MCP显示在线,工具可用:

image.png

用小智和仰望小车互动吧。

【注】

本帖子文本和代码由DeepSeek协助完成,致谢。

评论

user-avatar