7.3
【项目设想】
在社区中学习各位大神的教程,总觉自己基础差跟不上步伐。比如很多老师在玩小智AI和MCP,云天老师的[M10项目]行空板M10扩展板——行空车(MCP)项目就让我很喜欢,教程写的很详尽友好,让我产生了一种可以复刻一下的感觉,因为我也想让我手中帅气的M10仰望小车通过MCP语音遥控。
生命的美好在于折腾,开始搞起来。

材料清单
- M10仰望小车 X1
- K10+小智 X1
步骤1 行空板M10上安装pyenv环境和Python 3.12.7
本步骤参考官方教程
https://gitee.com/liliang9693/unihiker-pyenv-python
教程中提供了离线和联网两种方法。
我用了联网安装的方法(具体步骤请点击上面参照链接李工教程进行)。


看到pyenv版本号了。


按照云天老师的要求,下载了Python 3.12.7版本.


看到了Python 版本号3.12.7。

因为是小白,这一步折腾了挺长时间,好在安装成功了。
【补记部分】
7.4,我将M10刷机重装了系统,这次按照官方教程用了离线安装完成pyevn环境和Python 3.12.7。发现似乎空系统挺适合这种方式,此时M10还没有接入网络,而已经用过一阵子的M10较适合联网安装(昨天 我的M10离线方式安装时报错无法安装)。
(当然连接wifi后应该也可以联网安装pyevn环境和Python 3.12.7。)

下面的各种库文件都需在连接网络后安装。
步骤2 行空板M10上安装mcp库
在终端,使用pip install mcp,安装mcp,同时安装python-dotenv>=1.1.1,websockets>=15.0.1 ,pydantic>=2.11.7

如果没有在列表中找到websockets,用pip install websockets安装。

步骤3 配置小智AI
K10上刷入小智新版固件,行空板官方提供有教程:
行空板小智机器人官方地址为:
https://www.unihiker.com.cn/wiki/k10/xiaozhi_ai
现阶段,提供了四种小智固件。最新版,除了支持调用摄像头识别物体之外,还支持板载RGB的调用。

配置小智AI
进入小智聊天机器人控制台,配置角色。

获取MCP接入点,复制智能体接入点地址备用。

步骤4 给M10仰望小车编写程序
仰望小车接线:
方向舵机--P0
左电机--M1 P5(out) P8(pwm)
右电机--M2 P6(out) P16(pwm)

1、mind+--Python模式--添加行空板M10官方库--代码模式。
2、M10接入电脑--连接远程终端--提示缺少库文件(因为安装了新的Python环境):

查看官方教程,也提示要手动安装库文件。

在行空板在网络畅通的情况下安装所有需要的库,直到:

3、编写mcp服务文件,move.py。
新建文件move.py,复制云天老师写的代码,修改为仰望小车的控制内容(我对仰望小车的控制需求和云天老师相同,只做了少量任务描述方面的修改)。
功能描述:控制M10仰望小车的前进、后退、左转、右转和停车。
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging
logger = logging.getLogger('MoveServer')
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 创建MCP服务器
mcp = FastMCP("MoveServer")
@mcp.tool()
def forward() -> dict:
"""
控制仰望小车前进。
该函数将设置P0舵机87度,P5和P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车前进。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "仰望小车前进"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def back() -> dict:
"""
控制仰望小车后退。
该函数将设置P0舵机87度,P5和P6引脚为低电平,P8和P16引脚为50%占空比的PWM输出,使仰望小车后退。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "仰望小车后退"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def left() -> dict:
"""
控制仰望小车左转。
该函数将设置P0舵机60度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车左转。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "仰望小车左转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def right() -> dict:
"""
控仰望小车右转。
该函数将设置P0舵机110度,P5引脚为高电平,P6引脚为高电平,P8和P16引脚为50%占空比的PWM输出,舵机控制方向使仰望小车右转。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "仰望小车右转"
logger.info(result)
return {"success": True, "result": result}
@mcp.tool()
def stop() -> dict:
"""
控制仰望小车停止。
该函数将设置P0舵机87度,P8和P16引脚的PWM输出为0,使仰望小车停止。
Returns:
dict: 返回一个字典,包含操作结果。
"""
result = "仰望小车停车"
logger.info(result)
return {"success": True, "result": result}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
修改小智AI管道文件mymcp.py。
新建文件mymcp.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。(小白的我不会写代码,只好图形化编写UI和控制程序后,将自动生成的代码复制过来修改。)
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
加入了开机画面。

"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0
Usage:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
# -*- coding: UTF-8 -*-
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Servo
from pinpong.board import Board,Pin
#from pinpong.extension.unihiker import *
import json
Board().begin()
u_gui=GUI()
p_p5_out=Pin(Pin.P5, Pin.OUT)
p_p8_pwm=Pin(Pin.P8, Pin.PWM)
p_p6_out=Pin(Pin.P6, Pin.OUT)
p_p16_pwm=Pin(Pin.P16, Pin.PWM)
pin1 = Pin(Pin.P0)
servo1 = Servo(pin1)
servo1.write_angle(87)
u_gui.draw_image(image="yw.png",x=0,y=0)
u_gui.draw_text(text=" M10仰望小车",x=0,y=0,font_size=16, color="#FFFFFF")
#buzzer.pitch(523,1)
# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')
# Reconnection settings
INITIAL_BACKOFF = 1 # Initial wait time in seconds
MAX_BACKOFF = 600 # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
async def connect_with_retry(uri):
"""Connect to WebSocket server with retry mechanism"""
global reconnect_attempt, backoff
while True: # Infinite reconnection
try:
if reconnect_attempt > 0:
wait_time = backoff * (1 + random.random() * 0.1) # Add some random jitter
logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
await asyncio.sleep(wait_time)
# Attempt to connect
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")
# Calculate wait time for next reconnection (exponential backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
async def connect_to_server(uri):
"""Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
global reconnect_attempt, backoff
try:
logger.info(f"Connecting to WebSocket server...")
async with websockets.connect(uri) as websocket:
logger.info(f"Successfully connected to WebSocket server")
# Reset reconnection counter if connection closes normally
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# Start mcp_script process
process = subprocess.Popen(
['python', mcp_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True # Use text mode
)
logger.info(f"Started {mcp_script} process")
# Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed: {e}")
raise # Re-throw exception to trigger reconnection
except Exception as e:
logger.error(f"Connection error: {e}")
raise # Re-throw exception
finally:
# Ensure the child process is properly terminated
if 'process' in locals():
logger.info(f"Terminating {mcp_script} process")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
logger.info(f"{mcp_script} process terminated")
async def pipe_websocket_to_process(websocket, process):
"""Read data from WebSocket and write to process stdin"""
try:
while True:
# Read message from WebSocket
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
# Write to process stdin (in text mode)
if isinstance(message, bytes):
message = message.decode('utf-8')
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"Error in WebSocket to process pipe: {e}")
raise # Re-throw exception to trigger reconnection
finally:
# Close process stdin
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""Read data from process stdout and send to WebSocket"""
global p_p5_out,p_p8_pwm,p_p6_out,p_p16_pwm
try:
while True:
# Read data from process stdout
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data: # If no data, the process may have ended
logger.info("Process has ended output")
break
# Send data to WebSocket
logger.debug(f">> {data[:120]}...")
print(data)
# 解析 JSON 字符串
json_str = json.loads(data)
print(json_str['id'])
if json_str['id']>1:
print(json_str['id'])
if json_str.get('result', {}):
text=json.loads(json_str['result']['content'][0]['text'])
if text['success']:
print(text['result'])
L=480
if "前进" in text['result']:
print("******************************")
servo1.write_angle(87)
p_p5_out.write_digital(1)
p_p8_pwm.write_analog(512)
p_p6_out.write_digital(1)
p_p16_pwm.write_analog(512)
if "后退" in text['result']:
print("******************************")
servo1.write_angle(87)
p_p5_out.write_digital(0)
p_p8_pwm.write_analog(512)
p_p6_out.write_digital(0)
p_p16_pwm.write_analog(512)
if "左转" in text['result']:
print("******************************")
servo1.write_angle(60)
p_p5_out.write_digital(1)
p_p8_pwm.write_analog(512)
p_p6_out.write_digital(1)
p_p16_pwm.write_analog(512)
if "右转" in text['result']:
print("******************************")
servo1.write_angle(120)
p_p5_out.write_digital(1)
p_p8_pwm.write_analog(512)
p_p6_out.write_digital(1)
p_p16_pwm.write_analog(512)
if "停车" in text['result']:
print("******************************")
servo1.write_angle(87)
p_p8_pwm.write_analog(0)
p_p16_pwm.write_analog(0)
# In text mode, data is already a string, no need to decode
await websocket.send(data)
except Exception as e:
logger.error(f"Error in process to WebSocket pipe: {e}")
raise # Re-throw exception to trigger reconnection
async def pipe_process_stderr_to_terminal(process):
"""Read data from process stderr and print to terminal"""
try:
while True:
# Read data from process stderr
data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not data: # If no data, the process may have ended
logger.info("Process has ended stderr output")
break
# Print stderr data to terminal (in text mode, data is already a string)
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"Error in process stderr pipe: {e}")
raise # Re-throw exception to trigger reconnection
def signal_handler(sig, frame):
"""Handle interrupt signals"""
logger.info("Received interrupt signal, shutting down...")
sys.exit(0)
if __name__ == "__main__":
# Register signal handler
signal.signal(signal.SIGINT, signal_handler)
# mcp_script
#if len(sys.argv) < 2:
# logger.error("Usage: mcp_pipe.py ")
# sys.exit(1)
mcp_script = "move.py"
# Get token from environment variable or command line arguments
#endpoint_url = os.environ.get('MCP_ENDPOINT')
endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
if not endpoint_url:
logger.error("Please set the `MCP_ENDPOINT` environment variable")
sys.exit(1)
# Start main loop
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("Program interrupted by user")
except Exception as e:
logger.error(f"Program execution error: {e}")
在mind+中点击运行。


成功连接服务器了。

看到程序中的内容啦。

可以用K10小智来控制啦。

小结:
1、感谢云天老师的教程和技术指导,让我也能体验到这样子高科技。
2、K10小智+M10(MCP),结合起来可以玩更多的项目了。
3、这个项目我是真喜欢,小智可以根据语意分析你的目的并发出指令。
4、项目需要进一步探究的地方,程序启动当前只能通过mind+,M10中点击mymcp.py会报错不能启动。问题等待高人解决。

5、程序启动时连接服务器,在繁忙时要耐心等待。今天早上6点多,一次就成功接入了,这是我目前第一次一次成功。
评论