回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

M10仰望小车(MCP)的学习与尝试 简单

头像 rzyzzxw 2025.07.03 145 0

7.3

【项目设想】

在社区中学习各位大神的教程,总觉自己基础差跟不上步伐。比如很多老师在玩小智AI和MCP,云天老师的[M10项目]行空板M10扩展板——行空车(MCP)项目就让我很喜欢,教程写的很详尽友好,让我产生了一种可以复刻一下的感觉,因为我也想让我手中帅气的M10仰望小车通过MCP语音遥控。

生命的美好在于折腾,开始搞起来。

1f3b1646b1272628988ecd4b0e09e24.jpg

材料清单

  • M10仰望小车 X1
  • K10+小智 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7

本步骤参考官方教程

https://gitee.com/liliang9693/unihiker-pyenv-python

教程中提供了离线和联网两种方法。

我用了联网安装的方法(具体步骤请点击上面参照链接官方教程进行)。

注意:联网安装方法推荐网络环境好,能正常访问github的情况下使用。

使用方法-联网安装

1.在行空板上安装pyenv环境

行空板联网

国内需切换apt源,下载更快

在行空板终端运行命令。

(请进入上面原文,按序分别复制下面三组代码粘到行空板终端中回车运行)

image.png

安装pyenv工具

在行空板终端运行命令。

(请进入上面原文,复制下面代码粘到行空板终端中回车运行)

image.png

看到pyenv版本号了。

d093a012b1a390d9177d86066c6d0d3.png

2.下载预编译文件传入行空板


下载需要的python版本对应编译后的tar.gz文件,本教材以3.12.7为例则下载python-3.12.7.tar.gz:

下载路径一: github releases:https://github.com/liliang9693/unihiker-pyenv-python/releases
下载路径二: 百度网盘: https://pan.baidu.com/s/171Qao8nwvjqZdT1iYHFb_w?pwd=pyen
上传到行空板上pyenv路径下,即/root/.pyenv/versions/

image.png

按照云天老师的要求,下载了Python 3.12.7版本.

image.png

3.安装Python版本3.12.7并将它设置为全局默认Python

在行空板终端运行命令。

(下面命令按顺序分条复制到行空板终端回车执行)

#进入文件目录
cd /root/.pyenv/versions/

#解压到当前文件夹,如果下载的是其他文件,则应该修改为对应文件名
tar -xzf python-3.12.7.tar.gz

#查看文件目录清单,应该列出3.12.7文件夹
ls

#刷新pyenv的python列表,无输出
pyenv rehash

#列出识别到的版本,此时应该会输出system和3.12.7
pyenv versions

#设置3.12.7为全局默认python
pyenv global 3.12.7

#列出版本,此时3.12.7前面有个*表示被选中了
pyenv versions

#查看python版本,应该输出Python 3.12.7,表示切换版本成功
python --version

#查看pip库清单。应该只输出pip和setuptools,表示这是一个全新的python环境,可以安装自己需要的库
pip list

备注:
.1只用操作一次即可。重复2和3可以安装更多版本的Python

.安装新的Python环境之后,如果需要使用Mind+图形化相关功能,需要手动安装响应的库

.切换回系统自带的Python3.7的方法

pyenv global system

【注意:】一定要用pyenv global 3.12.7命令设置3.12.7为全局默认python。

如下图所示,当*号出现在3.12.7前表示被选中。

image.png

【注:】这一步很重要,因MCP库不能在M10系统自带的python 3.7下运行,所以我们安装了python 3.12.7后要设置3.12.7为全局默认python,然后再进行下一步,安装MCP库。

(7.3,用联网方式安装完成pyevn环境和Python 3.12.7。)

(7.4 补记:我将M10刷机重装了系统,这次按照官方教程用了离线安装完成pyevn环境和Python 3.12.7。)

(7.27补记,再次给行空板刷机安装空系统后用联网方式进行了安装pyevn环境和Python 3.12.7操作,发现空系统怎么装都容易。)

(连接wifi,将刷机后的M10接入网络。)

0f9432edadeef9d78f259f6cf342d65.png

(各种库文件都需在连接网络后安装。)

断开远程终端(行空板M10),重新连接远程终端(行空板M10)--提示缺少库文件(因为安装了新的Python环境):

d42adc0f9c546c8ae861aec190645dc.png

查看官方教程,也提示要手动安装库文件。

7870386e12c813e0213d159a617803c.png

在行空板在网络畅通的情况下安装所有需要的库,直到:

c1df98d594211d26750a79e12ae1826.png

步骤2 行空板M10上安装mcp库

在终端,使用pip install mcp,安装mcp,同时安装python-dotenv>=1.1.1,websockets>=15.0.1 ,pydantic>=2.11.7

image.png

等待一小段时间,下载MCP库……直到安装完毕。

image.png

pip list

image.png

如果没有在列表中找到websockets,用pip install websockets安装。

image.png

至此,在M10上完成MCP库安装。

步骤3 配置小智AI

K10上刷入小智新版固件,行空板官方提供有教程:

行空板小智机器人官方地址为:

https://www.unihiker.com.cn/wiki/k10/xiaozhi_ai

现阶段,提供了四种小智固件。最新版,除了支持调用摄像头识别物体之外,还支持板载RGB的调用。

image.png

 

配置小智AI

进入小智聊天机器人控制台,配置角色。

image.png

获取MCP接入点,复制智能体接入点地址备用。

image.png

步骤4 给M10仰望小车编写程序

仰望小车接线:

方向舵机--P0

左电机--M1 P5(out) P8(pwm)

右电机--M2 P6(out) P16(pwm)

2a3f91f4995e30864999f905c345dec.jpg

1、mind+--Python模式--添加行空板M10官方库--代码模式。

2、编写mcp服务文件,move.py。

新建文件move.py,复制云天老师写的代码,修改为仰望小车的控制内容(我对仰望小车的控制需求和云天老师相同,只做了少量任务描述方面的修改)。

功能描述:控制M10仰望小车的前进、后退、左转、右转和停车。

代码
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器


mcp = FastMCP("MoveServer")

@mcp.tool()
def forward() -> dict:
    """
    控制仰望小车前进。
   
    该函数将设置P0舵机87度,P5和P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车前进。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车前进"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def back() -> dict:
    """
    控制仰望小车后退。
   
    该函数将设置P0舵机87度,P5和P6引脚为低电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车后退。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车后退"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def left() -> dict:
    """
    控制仰望小车左转。
   
    该函数将设置P0舵机60度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车左转。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车左转"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def right() -> dict:
    """
    控仰望小车右转。
   
    该函数将设置P0舵机110度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车右转。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车右转"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def stop() -> dict:
    """
    控制仰望小车停止。
   
    该函数将设置P0舵机87度,P8和P16引脚的PWM输出为0,使仰望小车停止。
   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """

    result = "仰望小车停车"
    logger.info(result)
    return {"success": True, "result": result}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

修改小智AI管道文件mymcp.py。

新建文件mymcp.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。(小白的我不会写代码,只好图形化编写UI和控制程序后,将自动生成的代码复制过来修改。)

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

yw.png

代码
"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py 

"""

#  -*- coding: UTF-8 -*-
 
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Servo
from pinpong.board import Board,Pin
#from pinpong.extension.unihiker import *
import json

Board().begin()
u_gui=GUI()
p_p5_out=Pin(Pin.P5, Pin.OUT)
p_p8_pwm=Pin(Pin.P8, Pin.PWM)
p_p6_out=Pin(Pin.P6, Pin.OUT)
p_p16_pwm=Pin(Pin.P16, Pin.PWM)
pin1 = Pin(Pin.P0)
servo1 = Servo(pin1)
servo1.write_angle(87)
u_gui.draw_image(image="yw.png",x=0,y=0)
u_gui.draw_text(text="       M10仰望小车",x=0,y=0,font_size=16, color="#FFFFFF")
#buzzer.pitch(523,1)
# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    global p_p5_out,p_p8_pwm,p_p6_out,p_p16_pwm
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            print(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            print(json_str['id'])

            if json_str['id']>1:
              print(json_str['id'])
              if json_str.get('result', {}):

                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                   print(text['result'])
                   L=480
                   if "前进" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512)                             
                   if "后退" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p5_out.write_digital(0)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(0)
                             p_p16_pwm.write_analog(512)                             
                   if "左转" in text['result']:
                             print("******************************")
                             servo1.write_angle(60)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512) 
                   if "右转" in text['result']:
                             print("******************************")
                             servo1.write_angle(120)
                             p_p5_out.write_digital(1)
                             p_p8_pwm.write_analog(512)
                             p_p6_out.write_digital(1)
                             p_p16_pwm.write_analog(512)
                   if "停车" in text['result']:
                             print("******************************")
                             servo1.write_angle(87)
                             p_p8_pwm.write_analog(0)
                             p_p16_pwm.write_analog(0)
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    # mcp_script
    #if len(sys.argv) < 2:
    #    logger.error("Usage: mcp_pipe.py ")
    #    sys.exit(1)

    mcp_script = "move.py"

    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)

    # Start main loop
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")

在mind+中点击运行。

image.png
image.png

成功连接服务器了。

image.png

看到程序中的内容啦。

image.png

这时在小智聊天机器人上可以看到接入点在线并显示可用工具。

image.png

可以用K10小智来控制啦。

9e8b2a9c4f77363dcac8f200729d708.jpg

小结:

1、感谢云天老师的教程和技术指导,让我也能体验到这样子高科技。

2、K10小智+M10(MCP),结合起来可以玩更多的项目了。

3、这个项目我是真喜欢,小智可以根据语意分析你的目的并发出指令。

4、项目需要进一步探究的地方,程序启动当前只能通过mind+,M10中点击mymcp.py会报错不能启动。问题等待高人解决。

87dacdf5061804727fa5365297b2b6c.png

5、程序启动时连接服务器,在繁忙时要耐心等待。今天(7.4)早上6点多,一次就成功接入了,这是我目前第一次一次成功。

评论

user-avatar