回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

当小智AI遇到炫酷灯环(1)MCP 简单

头像 rzyzzxw 2025.07.21 13 0

7.21

【写在前面】

自从学习了云天老师的M10(MCP)课程后,就开始了写作业模式,其实水平也没有提高很多,就是又菜又爱玩那种。

今天继续玩灯环,灯环的各种炫酷灯效,比如彩虹旋转、闪烁、呼吸、跑马灯、火焰,除了彩虹旋转和火焰,其它的灯效还可以随时改变颜色,加上小智AI参与互动,更加有趣可爱。

87a228f17c5be5b238d0a152529d3e0.jpg
03643853d96fd1c58fd471eb7687433.jpg

材料清单

  • 行空板M10 X1
  • M10电池扩展板组合 X1
  • 24灯RGB灯环 X1

步骤1 行空板M10上安装pyenv环境、Python 3.12.7和mcp库

步骤2 配置小智AI

步骤3 M10程序编写

步骤1和2在前面帖子中有详述,这里不再赘述。

1、mcp服务文件,move.py。

功能描述:控制行空板M10 引脚P23 中24RGB灯环的五种灯效。

代码
# move.py (最终简洁版)
from mcp.server.fastmcp import FastMCP
import logging

# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")

# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
    """设置灯环为彩虹渐变色"""
    return {"success": True, "result": "设置灯环为彩虹渐变色"}

@mcp.tool()
def close_light() -> dict:
    """熄灭灯环"""
    return {"success": True, "result": "熄灭灯环"}

@mcp.tool()
def set_light_blink(color: str) -> dict:
    """设置灯环闪烁效果。color: 颜色(如:红色)"""
    return {"success": True, "result": f"设置灯环{color}闪烁"}

@mcp.tool()
def set_light_breathe(color: str) -> dict:
    """设置灯环呼吸效果。color: 颜色(如:蓝色)"""
    return {"success": True, "result": f"设置灯环{color}呼吸"}

@mcp.tool()
def set_running_lights(color: str) -> dict:
    """设置跑马灯效果。color: 颜色(如:黄色)"""
    return {"success": True, "result": f"设置灯环{color}跑马灯"}

@mcp.tool()
def set_fire_effect() -> dict:
    """设置火焰效果"""
    return {"success": True, "result": "设置灯环火焰效果"}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,修改代码,注意填入自己的MCP接入点地址。

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

代码
# mcp_pipe.py (最终简洁版)
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json
from pinpong.board import Board, Pin, NeoPixel

# 初始化硬件
Board().begin()
u_gui = GUI()

# 初始化单个灯环 (P23引脚, 24灯)
pin23 = Pin(Pin.P23)
np = NeoPixel(pin23, 24)
np.brightness(100)  # 设置默认亮度

# 全局变量
current_effect = None  # 当前运行的效果任务

# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

# ===================== 核心灯效函数 =====================
async def rainbow_rotate():
    """彩虹色循环旋转效果"""
    try:
        np.rainbow(0, np.num - 1, 1, 360)
        while True:
            np.rotate(1)
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        logger.info("彩虹旋转中断")

async def blink(color):
    """闪烁效果"""
    try:
        while True:
            np.range_color(0, np.num, color)
            await asyncio.sleep(0.5)
            np.clear()
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        logger.info("闪烁中断")

async def breathe(color):
    """呼吸灯效果"""
    try:
        np.range_color(0, np.num, color)
        current_brightness = 20
        direction = 1
        
        while True:
            np.brightness(current_brightness)
            await asyncio.sleep(0.03)
            
            current_brightness += direction * 2
            if current_brightness >= 100:
                current_brightness = 100
                direction = -1
            elif current_brightness <= 20:
                current_brightness = 20
                direction = 1
    except asyncio.CancelledError:
        logger.info("呼吸灯中断")

async def running_lights(color):
    """跑马灯效果"""
    try:
        np.clear()
        while True:
            for i in range(np.num):
                np[i] = color
                if i > 0:
                    np[i-1] = (color >> 1) & 0x7F7F7F  # 半亮
                if i > 1:
                    np[i-2] = 0  # 熄灭
                await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        logger.info("跑马灯中断")

async def fire_effect():
    """火焰效果"""
    try:
        heat = [0] * np.num
        while True:
            # 冷却
            for i in range(np.num):
                heat[i] = max(0, heat[i] - random.randint(0, 5))
            
            # 传播热量
            for i in range(np.num - 3, 0, -1):
                heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
            
            # 添加火花
            if random.randint(0, 255) < 120:
                y = random.randint(0, 7)
                heat[y] = min(255, heat[y] + random.randint(160, 255))
            
            # 转换为颜色
            for i in range(np.num):
                # 热色映射 (黑->红->黄->白)
                t = heat[i]
                if t < 85:
                    r = t * 3
                    g = 0
                    b = 0
                elif t < 170:
                    r = 255
                    g = (t - 85) * 3
                    b = 0
                else:
                    r = 255
                    g = 255
                    b = (t - 170) * 3
                
                np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
            
            await asyncio.sleep(0.05)
    except asyncio.CancelledError:
        logger.info("火焰效果中断")

# ===================== 实用工具函数 =====================
def stop_current_effect():
    """停止当前效果"""
    global current_effect
    if current_effect and not current_effect.done():
        current_effect.cancel()
        current_effect = None

def get_color_by_name(color_str):
    """根据名称获取颜色值"""
    color_map = {
        "红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
        "黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, 
        "白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
    }
    return color_map.get(color_str, 0xFF0000)  # 默认为红色

# ===================== 网络通信部分 =====================
async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")

async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', 'move.py'],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 move.py 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 move.py 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()

async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise

async def handle_device_commands(data, websocket):
    """处理设备控制命令"""
    global current_effect
    try:
        json_data = json.loads(data)
        if json_data['id'] > 1 and json_data.get('result', {}):
            content = json_data['result']['content'][0]['text']
            text_data = json.loads(content)
            
            if text_data['success']:
                cmd_result = text_data['result']
                logger.info(f"收到命令: {cmd_result}")
                
                # 停止之前的灯效
                stop_current_effect()
                
                # 处理命令
                if "彩虹" in cmd_result or "渐变色" in cmd_result: 
                    current_effect = asyncio.create_task(rainbow_rotate())
                    logger.info("启动彩虹旋转效果")
                
                elif "熄灭" in cmd_result: 
                    np.clear()
                    logger.info("灯环已熄灭")
                
                elif "闪烁" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(blink(color))
                    logger.info(f"启动{color_str}闪烁效果")
                
                elif "呼吸" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(breathe(color))
                    logger.info(f"启动{color_str}呼吸效果")
                
                elif "跑马灯" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(running_lights(color))
                    logger.info(f"启动{color_str}跑马灯效果")
                
                elif "火焰" in cmd_result: 
                    current_effect = asyncio.create_task(fire_effect())
                    logger.info("启动火焰效果")
                
        await websocket.send(data)
    except json.JSONDecodeError as je:
        logger.error(f"JSON解析错误: {je}, 原始数据: {data}")
    except KeyError as ke:
        logger.error(f"缺少必要字段: {ke}, 原始数据: {data}")
    except Exception as e:
        logger.error(f"命令处理错误: {e}")

async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")

def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    # 停止当前灯效
    stop_current_effect()
    # 熄灭灯环
    np.clear()
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

在Mind+中运行mcp_pipe.py。

等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。

用乐动小智和M10(MCP)互动控制灯光。

【小结】
极简设计:5个核心灯效:彩虹旋转、闪烁、呼吸、跑马灯、火焰

稳定可靠:所有灯效参数固定,避免意外错误,简化错误处理逻辑,优化资源管理

易用性强:语音命令保持简单直观,开箱即用,无需复杂配置

性能优化:减少内存占用,降低CPU使用率,提高系统响应速度

使用指南:

# 基础控制:熄灭灯环

# 特效控制:

设置灯环为彩虹渐变色

设置灯环红色闪烁
设置灯环蓝色呼吸
设置灯环黄色跑马灯
设置灯环火焰效果

本项目代码由DeepSeek协助完成,特此致谢。

评论

user-avatar