回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

当小智AI遇上“hello world”(MCP) 简单

头像 rzyzzxw 2025.07.15 30 0

7.15

[写在前面]

在5月份时,我曾经做过一个hello world墙花的小项目,在夜色中的窗边,它发出七彩炫丽的光芒。

194d651b1d6532ca4efe4efa270a1da.jpg

当小智AI遇到它,现代和传统将会碰擦出什么样子的火花。

ffe7ffaf833c553132ce2412ae4370b.jpg

[目标预设]

1、行空板M10+电池扩展板组合给hello world墙花供电。

2、用小智AI通过MCP控制hello world墙花RGB灯带色彩和亮度的变化。

材料清单

  • M10+电池扩展板组合 X1
  • hello world墙花RGB灯板 X1
  • 充电宝 X1
  • 乐动小智 X1

步骤1 准备hello world RGB 灯板

详见:

用Beetle-ESP32控制器和W2812做一个七彩hello world墙花- Makelog(造物记)

步骤2 行空板M10上安装pyenv环境、Python 3.12.7和mcp库

详见:

M10仰望小车(MCP)的学习与尝试- Makelog(造物记)

步骤3 配置小智AI

K10小智和乐动掌控小智等均可,为了避免RGB控制的冲突,这里选择乐动小智,详见:

乐动小智+M10智能小助手(MCP)- Makelog(造物记)

获取MCP接入点,复制接入点地址备用。

8f8fa2f5915a3051f50841ae2b75c6d.png

步骤4 M10程序编写

1、mcp服务文件,move.py。

功能描述:控制行空板M10 引脚P23 RGB灯光点亮、熄灭,亮度、颜色调整,特别增加彩虹渐变色流水灯效。

代码
# move.py
# MCP服务器设备控制工具
# 简化版

from mcp.server.fastmcp import FastMCP
import logging

# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")

# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
    """点亮灯环(彩虹色),行空板M10引脚P23RGB灯环设为彩虹渐变色。"""
    return {"success": True, "result": "灯环设为彩虹渐变色"}

@mcp.tool()
def open_light() -> dict:
    """点亮灯环(红色),行空板M10引脚P23RGB灯环点亮。"""
    return {"success": True, "result": "灯环已点亮"}

@mcp.tool()
def close_light() -> dict:
    """熄灭灯环,行空板M10引脚P23RGB灯环熄灭。"""
    return {"success": True, "result": "灯环已熄灭"}

@mcp.tool()
def set_light_color(color: str) -> dict:
    """设置行空板M10引脚P23RGB灯环颜色,支持: 红色、绿色,蓝色,黄色,紫色,青色,白色"""
    return {"success": True, "result": f"设置颜色为{color}"}

@mcp.tool()
def set_light_brightness(brightness: int) -> dict:
    """设置行空板M10引脚P23RGB灯环亮度(0-255)"""
    return {"success": True, "result": f"设置亮度为{brightness}"}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,修改代码,注意填入自己的MCP接入点地址。

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

yw.png

代码
# mcp_pipe.py
# 连接MCP服务器并通过WebSocket传输输入输出
# 简化版

from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
from pinpong.board import Board, Pin, NeoPixel
import json

# 初始化硬件
Board().begin()
u_gui = GUI()
pin1 = Pin(Pin.P21)
np1 = NeoPixel(pin1,84)
np1.brightness(100)

# 全局变量
rotating = False
rotate_task = None

# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

# 彩虹旋转函数
async def rainbow_rotate():
    """彩虹色循环旋转效果"""
    global rotating
    rotating = True
    try:
        while rotating:
            np1.rotate(1)
            await asyncio.sleep(0.1)  # 控制旋转速度
    except asyncio.CancelledError:
        logger.info("彩虹旋转中断")
    finally:
        rotating = False

async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")

async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 {mcp_script} 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 {mcp_script} 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()

async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise

async def handle_device_commands(data, websocket):
    """处理设备控制命令"""
    global rotate_task
    
    try:
        json_str = json.loads(data)
        if json_str['id'] > 1 and json_str.get('result', {}):
            text = json.loads(json_str['result']['content'][0]['text'])
            
            if text['success']:
                cmd_result = text['result']

                # 先停止任何正在进行的旋转
                if rotate_task and not rotate_task.done():
                    rotate_task.cancel()
                    try:
                        await rotate_task
                    except asyncio.CancelledError:
                        pass
                    rotate_task = None

                # RGB灯环控制
                if "点亮" in cmd_result: 
                    np1.range_color(0, 83, 0xFF0000)
                elif "彩虹" in cmd_result: 
                    np1.rainbow(0, 83, 1, 360)
                    # 启动彩虹旋转任务
                    rotate_task = asyncio.create_task(rainbow_rotate())
                elif "渐变色" in cmd_result: 
                    np1.rainbow(0, 83, 1, 360)
                    # 启动彩虹旋转任务
                    rotate_task = asyncio.create_task(rainbow_rotate())
                elif "熄灭" in cmd_result: 
                    np1.clear()
                elif "设置颜色" in cmd_result: 
                    handle_color(cmd_result)
                elif "设置亮度" in cmd_result: 
                    handle_brightness(cmd_result)
                
        await websocket.send(data)
    except Exception as e:
        logger.error(f"命令处理错误: {e}")

def handle_color(cmd_result):
    """处理颜色设置"""
    color_str = cmd_result.split("为")[-1].strip()
    color_map = {
        "红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
        "黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, "白色": 0xFFFFFF
    }
    np1.range_color(0, 83, color_map.get(color_str, 0xFF0000))

def handle_brightness(cmd_result):
    """处理亮度设置"""
    brightness_str = cmd_result.split("为")[-1].strip()
    try:
        brightness = max(0, min(int(brightness_str), 255))
        np1.brightness(brightness)
    except ValueError:
        logger.error(f"无效亮度值: {brightness_str}")

async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")

def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    mcp_script = "move.py"
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

【关于程序】

工作原理:

当收到"渐变色"命令时,创建异步旋转任务

任务在后台以100ms间隔循环旋转灯环

当收到其他命令(点亮/熄灭/设置颜色/设置亮度)时:

取消正在进行的旋转任务

执行新命令的操作

旋转循环会自动退出

注意事项:

旋转速度调整:修改 await asyncio.sleep(0.1) 中的值:

值越小旋转越快(0.05秒更快)

值越大旋转越慢(0.2秒更慢)

硬件兼容性:确保 np1.rotate(1) 在硬件上正常工作:

参数 1 表示每次旋转1个LED

如果旋转方向不对,可尝试 -1 反向旋转

内存管理:任务取消后,Python的垃圾回收会处理任务对象,无需手动释放。

这种实现方式既保持了彩虹效果的流畅性,又能及时响应其他命令中断旋转,同时充分利用了异步编程的优势,不会阻塞主程序执行。

在Mind+中运行mcp_pipe.py。

屏幕截图 2025-07-15 165057.png

稍等一会,等待成功连接服务器。

可以用乐动小智和M10(MCP)互动控制灯带了。

【小结】

1、这个用DeepSeek辅助完成的作业,重点是增加了RGB灯带彩虹渐变色灯效。

2、84颗灯,有点费电,电池扩展板的电池还是小了点,加了小充电宝支援一下。

3、程序先用灯环16灯写的,后来改hello world时代码中灯环没有修改成灯带,小智不知道不会自动改口的。

4、DeepSeek是个很好的老师,可以帮助解答问题,修改程序。

评论

user-avatar