回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

行空板M10扩展板---基于行空板M10+扩展版组合的四足机器狗(MCP) 简单

头像 韩亚楠 2025.07.04 44 0

屏幕截图 2025-07-06 031925.png956fe7318398fd43984a5141fd7d441.jpg

【项目背景】

最近收到了期待已久的 行空板 + 扩展板组合,扩展板不仅内置电池,还引出了了多个 IO 接口,大大增强了便携性和扩展能力。这种“自带电源+多接口”的组合形式,给我带来了更多移动端项目的灵感。

恰好最近在研究小智机器狗,并且看到社区里不少大佬分享了使用 MCP 协议控制其他设备的案例,我便萌生了一个想法:能不能也做一只可以用小智控制的机器狗?就这样,项目正式启动,又一段折腾之旅开始了!

【功能演示】

原本的想法是直连四个舵机,因为扩展板组合引出了两路5v舵机接口,并且行空板自身还有几路pwm输出的,感觉应该是足够的,但是最终步态的测试效果不尽如人意,具体的原因还在找,目前先用舵机驱动板替代,通过usb转串口来控制,功能正常就是外形丑了点。

31360bcd9ddfe2f996bb513fcf15f44.jpg微信图片_20250706033146.jpg

材料清单

  • 行空板M10 X1
  • 行空板扩展组合 X1
  • 9g舵机 X4
  • RGB灯带 X1
  • 舵机驱动板 X1
  • usb转串口模块 X1
  • 3d打印件 X1

【硬件组装】

步骤1 舵机和RGB灯带装入底盘并固定好,4个舵机接入舵机控制板。

步骤2 usb转串口 插 行空板M10 usb口,rx tx 与舵机驱动板 tx rx 连接。

步骤3 灯带接入扩展组合背面P4,组装其他3d打印组件

【程序逻辑】

主控端:任意版本小智(推荐K10 )

受控端:行空板M10

行空板 M10 端提前启动 MCP 服务端,并正确填入对应的 MCP 接入点。

在 M10 上运行 mcp_pipe.py 和 dog.py,系统将自动连接小智平台。一旦连接成功,小智平台的 MCP 工具栏中将显示本项目注册的工具,表示机器狗已成功上线并可以被远程控制,接下来就可以使用小智来控制机器狗了。

【环境搭建】

行空板默认的3.7 python环境不能运行mcp服务端,需要安装3.12的python版本。

具体安装教程参考 云天大佬的教程

https://makelog.dfrobot.com.cn/article-317610.html

比较推荐用pyenv来安装

https://gitee.com/liliang9693/unihiker-pyenv-python

环境安装好后 依次安装依赖库。

【运行程序】

步骤4 配置好舵机驱动板内的动作步态,如果和我的驱动板型号一样,可以私发动作文件。

步骤5 修改 mcp_pipe.py 中的 endpoint_url 项为自己小智mcp接入点的地址

步骤6 mcp_pipe.py 和dog.py 两个拷贝到行空板文件目录

步骤7 在终端里cd到文件所在目录,然后启动服务端 :python mcp_pipe.py

步骤8 等待连通后,小智mcp后台出现下图工具后即可使用小智来进行控制了

屏幕截图 2025-07-06 045419.png

代码
#mcp_pipe.py

"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py

"""
import asyncio
import websockets
import subprocess
import logging
import os
import time
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Board,Pin,NeoPixel,UART
import serial
from pinpong.extension.unihiker import *
import json
from unihiker import GUI

u_gui=GUI()
ser=serial.Serial("/dev/ttyUSB0",9600)
Board().begin()
np1 = NeoPixel(Pin((Pin.P4)),3) #定义灯带
np1[0] = (0,0,0)
np1[1] = (0,0,0)
np1[2] = (0,0,0)

def sendx(x,t):
    buf = bytearray(b'\x55\x55')
    buf.append(5 & 0xff)
    buf.append(6 & 0xff)
    buf.append(x & 0xff)
    buf.append(t & 0xff)
    buf.append(0 & 0xff)
    ser.write(buf)

def stopx():
    buf = bytearray(b'\x55\x55')
    buf.append(2 & 0xff)
    buf.append(7 & 0xff)
    ser.write(buf)
    sendx(0,1)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

def pose_is(target_lf, target_rf, target_rb, target_lb, tolerance=1):
    return (abs(lf_angle - target_lf) < tolerance and
            abs(rf_angle - target_rf) < tolerance and
            abs(rb_angle - target_rb) < tolerance and
            abs(lb_angle - target_lb) < tolerance)


async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"正在连接服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"已连接 WebSocket")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"启动子进程: {mcp_script} ")
            u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=5)
            sendx(0,1)

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket 连接已关闭: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"正在终止子程序 {mcp_script} ")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} 已终止")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            logger.info(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            logger.info(json_str['id'])

            if json_str['id']>1:
              logger.info(json_str['id'])
              if json_str.get('result', {}):
                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                    print(text['result'])
                    if "起立" in text['result']:
                        sendx(0,1)
                    if "趴下" in text['result']:
                        sendx(1,1)
                    if "坐下" in text['result']:
                        sendx(2,1)
                    if "前进" in text['result']:
                        sendx(3,text.get("steps"))
                    if "后退" in text['result']:
                        sendx(4,text.get("steps"))
                    if "左转" in text['result']:
                        sendx(5,text.get("times"))
                    if "右转" in text['result']:
                        sendx(6,text.get("times"))
                    if "懒腰" in text['result']:
                        sendx(7,4)
                    if "打断" in text['result']:
                        stopx()
                    if "灯" in text['result']:
                        led_colors = text.get("led_colors", [])
                        if len(led_colors) == 3:
                            for i, color in enumerate(led_colors):
                                r = color.get("r", 0)
                                g = color.get("g", 0)
                                b = color.get("b", 0)
                                brightness = color.get("brightness", 255)

                                # 根据亮度比例调整颜色
                                r = int(r * brightness / 255)
                                g = int(g * brightness / 255)
                                b = int(b * brightness / 255)

                                logger.info(f"设置LED{i}: R={r}, G={g}, B={b}")
                                np1[i] = (r, g, b)     
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)
    mcp_script = "dog.py"
    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="填入自己的mcp接入点地址"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")
代码
# dog.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

logger = logging.getLogger("DogBot")

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')


# 创建MCP服务器
mcp = FastMCP("DogBot")

@mcp.tool()
def lay_down() -> dict:
    """
    机器狗趴下
    发送串口指令给舵机控制板 实现趴下来的效果
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗趴下"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def stand_up() -> dict:
    """
    机器狗站起来
    发送串口指令给舵机控制板 实现站起来的效果
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗起立"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def sit_down() -> dict:
    """
    机器狗坐下来
    发送串口指令给舵机控制板 实现坐下的效果
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗坐下"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def forward(steps:int) -> dict:
    """
    机器狗往前走一步
    发送串口指令给舵机控制板 实现前进一步的效果,如果命令中包含步数那就照做,不包含的话就2到4步随机。
    Returns:
        dict: 返回一个字典,包含操作结果,和步数。
    """
    result = "机器狗前进"
    logger.info(result)
    return {"success": True, "result": result,"steps":steps}

@mcp.tool()
def backward(steps:int) -> dict:
    """
    机器狗往后退一步
    发送串口指令给舵机控制板 实现后退一步的效果,如果命令中包含步数那就照做,不包含的话就2到4步随机。
    Returns:
        dict: 返回一个字典,包含操作结果,和步数。
    """
    result = "机器狗后退"
    logger.info(result)
    return {"success": True, "result": result,"steps":steps}

@mcp.tool()
def turn_left(times:int) -> dict:
    """
    机器狗往左转30° 
    发送串口指令给舵机控制板 实现左转的效果,你可以根据命令对话里边的角度,计算下需要运行多少次
    然后返回出去,小于30°的按30°来旋转。
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗左转"
    logger.info(result)
    return {"success": True, "result": result,"times":times}

@mcp.tool()
def turn_right(times:int) -> dict:
    """
    机器狗往右转30°
    发送串口指令给舵机控制板 实现右转的效果,你可以根据命令对话里边的角度,计算下需要运行多少次
    然后返回出去,小于30°的按30°来旋转。
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗右转"
    logger.info(result)
    return {"success": True, "result": result,"times":times}

@mcp.tool()
def lazy() -> dict:
    """
    机器狗伸懒腰
    发送串口指令给舵机控制板 实现伸懒腰的效果
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗伸懒腰"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def stop() -> dict:
    """
    机器狗动作打断
    立即打断机器狗目前所执行的动作,让机器狗停下来。
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "机器狗打断"
    logger.info(result)
    return {"success": True, "result": result}

@mcp.tool()
def set_rgb_lights(led_colors: list[dict]) -> dict:
    """
    设置3个RGB灯珠的颜色,每个灯包含字段 r, g, b, brightness(范围 0~255)
    如果只是简单的开灯 那就都设置为255,如果需要关灯就都设置为0。
    参数示例:
    [
        {"r": 255, "g": 0, "b": 0, "brightness": 255},
        {"r": 0, "g": 255, "b": 0, "brightness": 128},
        {"r": 0, "g": 0, "b": 255, "brightness": 200}
    ]
    """
    # 这里返回默认值,AI 会覆盖这个返回值
    result = "灯光设置成功"
    return {"success": True, "result": result, "led_colors": led_colors}

# 启动服务器
if __name__ == "__main__":
    logger.info("子程序启动成功!")
    mcp.run(transport="stdio")

评论

user-avatar