回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

M10仰望小车(MCP)的学习与尝试 简单

头像 rzyzzxw 2025.07.03 22 0

7.3

【项目设想】

在社区中学习各位大神的教程,总觉自己基础差跟不上步伐。比如很多老师在玩小智AI和MCP,云天老师的[M10项目]行空板M10扩展板——行空车(MCP)项目就让我很喜欢,教程写的很详尽友好,让我产生了一种可以复刻一下的感觉,因为我也想让我手中帅气的M10仰望小车通过MCP语音遥控。

生命的美好在于折腾,开始搞起来。

1f3b1646b1272628988ecd4b0e09e24.jpg

材料清单

  • M10仰望小车 X1
  • K10+小智 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7

本步骤参考官方教程

https://gitee.com/liliang9693/unihiker-pyenv-python

教程中提供了离线和联网两种方法。

我用了联网安装的方法(具体步骤请点击上面参照链接李工教程进行)。

image.png
image.png

看到pyenv版本号了。

d093a012b1a390d9177d86066c6d0d3.png
image.png

按照云天老师的要求,下载了Python 3.12.7版本.

image.png
image.png

看到了Python 版本号3.12.7。

662ec6fd1ff2bb05c2afad64ce49a0a.png

因为是小白,这一步折腾了挺长时间,好在安装成功了。

【补记部分】

7.4,我将M10刷机重装了系统,这次按照官方教程用了离线安装完成pyevn环境和Python 3.12.7。发现似乎空系统挺适合这种方式,此时M10还没有接入网络,而已经用过一阵子的M10较适合联网安装(昨天 我的M10离线方式安装时报错无法安装)。

(当然连接wifi后应该也可以联网安装pyevn环境和Python 3.12.7。)

0f9432edadeef9d78f259f6cf342d65.png

下面的各种库文件都需在连接网络后安装。

步骤2 行空板M10上安装mcp库

在终端,使用pip install mcp,安装mcp,同时安装python-dotenv>=1.1.1,websockets>=15.0.1 ,pydantic>=2.11.7

image.png

如果没有在列表中找到websockets,用pip install websockets安装。

image.png

步骤3 配置小智AI

K10上刷入小智新版固件,行空板官方提供有教程:

行空板小智机器人官方地址为:

https://www.unihiker.com.cn/wiki/k10/xiaozhi_ai

现阶段,提供了四种小智固件。最新版,除了支持调用摄像头识别物体之外,还支持板载RGB的调用。

image.png

配置小智AI

进入小智聊天机器人控制台,配置角色。

image.png

获取MCP接入点,复制智能体接入点地址备用。

68097aed817a266fd995ff87a0e315f.png

步骤4 给M10仰望小车编写程序

仰望小车接线:

方向舵机--P0

左电机--M1 P5(out) P8(pwm)

右电机--M2 P6(out) P16(pwm)

2a3f91f4995e30864999f905c345dec.jpg

1、mind+--Python模式--添加行空板M10官方库--代码模式。

2、M10接入电脑--连接远程终端--提示缺少库文件(因为安装了新的Python环境):

d42adc0f9c546c8ae861aec190645dc.png

查看官方教程,也提示要手动安装库文件。

7870386e12c813e0213d159a617803c.png

在行空板在网络畅通的情况下安装所有需要的库,直到:

c1df98d594211d26750a79e12ae1826.png

3、编写mcp服务文件,move.py。

新建文件move.py,复制云天老师写的代码,修改为仰望小车的控制内容(我对仰望小车的控制需求和云天老师相同,只做了少量任务描述方面的修改)。

功能描述:控制M10仰望小车的前进、后退、左转、右转和停车。

代码
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器


mcp = FastMCP("MoveServer")

@mcp.tool()
def forward() -> dict:
    """
    控制仰望小车前进。
   
    该函数将设置P0舵机87度,P5和P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车前进。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车前进"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def back() -> dict:
    """
    控制仰望小车后退。
   
    该函数将设置P0舵机87度,P5和P6引脚为低电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车后退。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车后退"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def left() -> dict:
    """
    控制仰望小车左转。
   
    该函数将设置P0舵机60度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车左转。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车左转"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def right() -> dict:
    """
    控仰望小车右转。
   
    该函数将设置P0舵机110度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车右转。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车右转"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def stop() -> dict:
    """
    控制仰望小车停止。
   
    该函数将设置P0舵机87度,P8和P16引脚的PWM输出为0,使仰望小车停止。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车停车"
    logger.info(result)
    return {"success": True, "result": result}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

修改小智AI管道文件mymcp.py。

新建文件mymcp.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。(小白的我不会写代码,只好图形化编写UI和控制程序后,将自动生成的代码复制过来修改。)

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

yw.png

代码
"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py 

"""

#  -*- coding: UTF-8 -*-
 
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Servo
from pinpong.board import Board,Pin
#from pinpong.extension.unihiker import *
import json

Board().begin()
u_gui=GUI()
p_p5_out=Pin(Pin.P5, Pin.OUT)
p_p8_pwm=Pin(Pin.P8, Pin.PWM)
p_p6_out=Pin(Pin.P6, Pin.OUT)
p_p16_pwm=Pin(Pin.P16, Pin.PWM)
pin1 = Pin(Pin.P0)
servo1 = Servo(pin1)
servo1.write_angle(87)
u_gui.draw_image(image="yw.png",x=0,y=0)
u_gui.draw_text(text="       M10仰望小车",x=0,y=0,font_size=16, color="#FFFFFF")
#buzzer.pitch(523,1)
# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    global p_p5_out,p_p8_pwm,p_p6_out,p_p16_pwm
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            print(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            print(json_str['id'])

            if json_str['id']>1:
              print(json_str['id'])
              if json_str.get('result', {}):

                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                   print(text['result'])
                   L=480
                   if "前进" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512)                             
                   if "后退" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p5_out.write_digital(0)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(0)
                             p_p16_pwm.write_analog(512)                             
                   if "左转" in text['result']:
                             print("******************************")
                             servo1.write_angle(60)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512) 
                   if "右转" in text['result']:
                             print("******************************")
                             servo1.write_angle(120)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512)
                   if "停车" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p8_pwm.write_analog(0)
                             p_p16_pwm.write_analog(0)
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    # mcp_script
    #if len(sys.argv) < 2:
    #    logger.error("Usage: mcp_pipe.py ")
    #    sys.exit(1)

    mcp_script = "move.py"

    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)

    # Start main loop
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")

在mind+中点击运行。

image.png
image.png

成功连接服务器了。

image.png

看到程序中的内容啦。

image.png

可以用K10小智来控制啦。

9e8b2a9c4f77363dcac8f200729d708.jpg

小结:

1、感谢云天老师的教程和技术指导,让我也能体验到这样子高科技。

2、K10小智+M10(MCP),结合起来可以玩更多的项目了。

3、这个项目我是真喜欢,小智可以根据语意分析你的目的并发出指令。

4、项目需要进一步探究的地方,程序启动当前只能通过mind+,M10中点击mymcp.py会报错不能启动。问题等待高人解决。

87dacdf5061804727fa5365297b2b6c.png

5、程序启动时连接服务器,在繁忙时要耐心等待。今天早上6点多,一次就成功接入了,这是我目前第一次一次成功。

评论

user-avatar