回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

小智AI遇上帅气龙舟(MCP) 简单

头像 rzyzzxw 2025.07.24 20 0

7.24

 

【写在前面】

当参加DF的活动,M10电机扩展板组合发到我手中时,是在端午节时,对它的了解从它和龙舟模型的相遇开始,兜兜转转,又回到了龙舟,从简单的项目,一点点增加难度,由于扩展板的出现和活动的号召,我对M10这块板子增加了认识,M10功能很强大,朋友们用它做出了很多高大上的项目,而小白如我,一直在低水平徘徊,不过也在变化,跟着大家的脚步,有了那么一点点提高。

小智和龙舟结合,它只有一个电机,简单把前面代码改改,就可以控制它了,非常简单。

5c8e727ec77aedd6a3cffb86b54bb90.jpg

材料清单

  • M10+电池扩展板 X1
  • 龙舟模型 X1
  • 乐动小智 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7及MCP库安装

步骤2 小智聊天机器人配置及获取MCP接入点地址

步骤3 编写M10程序

系统包含两个核心模块:​​通信管道模块(mcp_pipe.py)​​ 和 ​​动作服务模块(move.py)​​。

先贴上来的是改写的代码,它们可以控制龙舟前进,后退,停止。

 

在Mind+中运行mcp_pipe.py。

image.png

当终端如上图,小智MCP服务连接成功。

此时,小智控制台接入点状态:在线,可用工具:可见。

22.png

下面放的是我修改所得的两个文件。

代码
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器


mcp = FastMCP("MoveServer")

@mcp.tool()
def forward() -> dict:
    """
    控制龙舟前进。
   
    该函数将设置P5和P6引脚为低电平,P8和P16引脚为50%占空比的PWM输出,使龙舟前进。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "龙舟前进"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def back() -> dict:
    """
    控制龙舟后退。
   
    该函数将设置P5和P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,使龙舟后退。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "龙舟后退"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def stop() -> dict:
    """
    控制龙舟停止。
   
    该函数将设置P8和P16引脚的PWM输出为0,使龙舟停止。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "龙舟停止"
    logger.info(result)
    return {"success": True, "result": result}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")
代码
"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py 

"""

#  -*- coding: UTF-8 -*-
 
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Board,Pin
#from pinpong.extension.unihiker import *
import json

Board().begin()
u_gui=GUI()
p_p5_out=Pin(Pin.P5, Pin.OUT)
p_p8_pwm=Pin(Pin.P8, Pin.PWM)
p_p6_out=Pin(Pin.P6, Pin.OUT)
p_p16_pwm=Pin(Pin.P16, Pin.PWM)

u_gui.draw_image(image="back.png",x=0,y=0)
u_gui.draw_text(text="M10龙舟MCP",x=30,y=10,font_size=25, color="#FFFFFF")

# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    global p_p5_out,p_p8_pwm,p_p6_out,p_p16_pwm
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            print(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            print(json_str['id'])

            if json_str['id']>1:
              print(json_str['id'])
              if json_str.get('result', {}):

                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                   print(text['result'])
                   L=480
                   if "前进" in text['result']:
                             print("******************************")
                             p_p5_out.write_digital(0)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(0)
                             p_p16_pwm.write_analog(512)                             
                   if "后退" in text['result']:
                             print("******************************")
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512)
                   if "停止" in text['result']:
                             print("******************************")
                             p_p8_pwm.write_analog(0)
                             p_p16_pwm.write_analog(0)
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    # mcp_script
    #if len(sys.argv) < 2:
    #    logger.error("Usage: mcp_pipe.py ")
    #    sys.exit(1)

    mcp_script = "move.py"

    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzM0Mjg2NH0.-x_-iy2X8ipN9zLxtkM50HhheMb9aCPhJcOhS2TXkZl4wkLPV4EM8JsuT5B6PUH14PD9RKjqCIvlGStH4ClyEQ"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)

    # Start main loop
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")

不过我改的代码,只能控制前进、后退、停止。

 

然后把代码抛给了DeepSeek,让它把代码优化一下,同时增加速度控制功能。

看DS思考的过程也是涨知识的,它给出了下面的代码,上面视频运行的就是这组代码。

 

两个核心模块:​​通信管道模块(mcp_pipe.py)​​ 和 ​​动作服务模块(move.py)​​。系统实现了龙舟模型的远程控制功能,具备前进、后退、停止和速度调节能力。

1. 核心控制功能

功能说明速度范围
前进龙舟正向运动0-100%(默认50%)
后退龙舟反向运动0-100%(默认50%)
停止紧急停止龙舟-

2. 速度调节

支持0-100%无级调速(每1%为一个调节档位)

速度-PWM自动转换:速度百分比自动转换为硬件PWM值(0-1023)

默认速度:50%(可通过指令参数修改)

代码
from mcp.server.fastmcp import FastMCP
import logging

logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")

@mcp.tool()
def forward(speed: int = 50) -> dict:
    """控制龙舟前进(可调速)[3](@ref)
    Args:
        speed: 速度百分比(0-100),默认50%
    Returns:
        dict: 包含动作类型和实际速度
    """
    speed = max(0, min(100, speed))  # 限制速度范围
    logger.info(f"前进速度: {speed}%")
    return {"success": True, "result": f"龙舟前进 {speed}%", "action": "forward", "speed": speed}

@mcp.tool()
def back(speed: int = 50) -> dict:
    """控制龙舟后退(可调速)
    Args:
        speed: 速度百分比(0-100)
    """
    speed = max(0, min(100, speed))
    logger.info(f"后退速度: {speed}%")
    return {"success": True, "result": f"龙舟后退 {speed}%", "action": "back", "speed": speed}

@mcp.tool()
def stop() -> dict:
    """立即停止龙舟"""
    logger.info("停止指令")
    return {"success": True, "result": "龙舟停止", "action": "stop"}

if __name__ == "__main__":
    mcp.run(transport="stdio")  # 使用标准IO通信[6](@ref)
代码
# -*- coding: UTF-8 -*-
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
import json
from pinpong.board import Board, Pin

# 初始化硬件
Board().begin()
u_gui = GUI()
p_p5_out = Pin(Pin.P5, Pin.OUT)
p_p8_pwm = Pin(Pin.P8, Pin.PWM)
p_p6_out = Pin(Pin.P6, Pin.OUT)
p_p16_pwm = Pin(Pin.P16, Pin.PWM)

# 界面初始化
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10龙舟MCP", x=30, y=10, font_size=20, color="#FFFFFF")

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

def set_motor_speed(direction: int, speed: int):
    """统一设置电机速度和方向[3](@ref)
    Args:
        direction: 0(前进)/1(后退)
        speed: 0-100的速度百分比
    """
    pwm_value = int(max(0, min(100, speed)) / 100 * 1023)  # 速度转PWM值
    p_p5_out.write_digital(direction)
    p_p6_out.write_digital(direction)
    p_p8_pwm.write_analog(pwm_value)
    p_p16_pwm.write_analog(pwm_value)

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    global reconnect_attempt
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data: 
                logger.info("Process output ended")
                break

            json_str = json.loads(data)
            if json_str['id'] > 1 and json_str.get('result'):
                content = json_str['result']['content'][0]['text']
                try:
                    # 解析动作和速度参数[6](@ref)
                    resp_data = json.loads(content) if isinstance(content, str) else content
                    action = resp_data.get("action")
                    speed = resp_data.get("speed", 50)  # 默认50%速度
                    
                    # 执行电机控制
                    if action == "forward":
                        set_motor_speed(0, speed)  # 前进方向
                    elif action == "back":
                        set_motor_speed(1, speed)  # 后退方向
                    elif action == "stop":
                        set_motor_speed(0, 0)  # 停止
                except Exception as e:
                    logger.error(f"控制解析失败: {e}")
            
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Process-to-WebSocket error: {e}")
        raise

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, lambda sig, frame: sys.exit(0))
    mcp_script = "move.py"
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzM0Mjg2NH0.-x_-iy2X8ipN9zLxtkM50HhheMb9aCPhJcOhS2TXkZl4wkLPV4EM8JsuT5B6PUH14PD9RKjqCIvlGStH4ClyEQ"  # 实际token
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")

评论

user-avatar