回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

小智控制RGB灯柱(MCP) 简单

头像 rzegkly 2025.08.17 35 1

image.png

【项目缘起】

处暑,是二十四节气之第十四个节气。

斗指戊(西南方);太阳黄经达150°;

于每年公历8月22-24日交节。

处暑,即为“出暑”,是炎热离开的意思。

在学习大圣老师的MCP相关文章后,我深受启发。

分享运用AI模型"小智"与行空板M10进行智能交互

——《小智控制RGB灯柱(MCP)》项目。

image.png

《小智控制RGB灯柱(MCP)》项目需要以下三步

1.上位机乐动掌控刷入ESP32固件,配置AI小智角色

2.下位机行空板M10作为刷入V0.4.1系统,配置MCP关键库

3.行空板M10与灯带连接,编写相关程序

项目可以实现:

1.实时数据的高效处理,

2.智能化任务执行,

3.动态灯光效果控制

image.png

材料清单

  • 行空板M10 X1
  • 乐动掌控3.0 X1
  • RGB灯带 X1

步骤1 上位机——将小智刷入乐动掌控中

1.打开软件mPython中设置主控板乐动掌控3.0

image.png
image.png

2.烧录固件

image.png
image.png
image.png

步骤2 上位机——配置乐动掌控小智角色

image.png
1e5f5e1f5d7997cb407c6a0a3341391c_47d8548f616784b9279accf2ce337e23.jpg
image.png
image.png
image.png

步骤3 行空板M10作为下位机——固件刷入

V0.4.1版系统镜像下载与烧录

系统镜像烧录操做步骤

image.png
image.png
image.png

步骤4 行空板M10作为下位机——配网

image.png

步骤5 行空板M10作为下位机——调整Python3.12.7

image.png

步骤6 行空板M10作为下位机——MCP互动的关键库

1.在终端,使用pip install mcp,安装mcp库

image.png

2. 安装websockets 库:pip install websockets

image.png

步骤7 程序测试

1.管道文件mcp_pipe.py

说明:RGB灯带接入M10扩展P4口.

代码
# mcp_pipe.py 
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json
from pinpong.board import Board, Pin, NeoPixel
 
# 初始化硬件
Board().begin()
u_gui = GUI()
 
# 初始化单个灯带 (P4引脚, 7灯)
pin4 = Pin(Pin.P4)
np = NeoPixel(pin4, 7)
np.brightness(100)  # 设置默认亮度
 
# 全局变量
current_effect = None  # 当前运行的效果任务
 
# 初始化UI
#u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="小智控制RGB灯柱", x=10, y=5, font_size=20, color="#000000")
u_gui.draw_emoji(emoji="Smile",x=0,y=40,duration=0.2)
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
 
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
 
# ===================== 核心灯效函数 =====================
async def rainbow_rotate():
    """彩虹色循环旋转效果"""
    try:
        np.rainbow(0, np.num - 1, 1, 360)
        while True:
            np.rotate(1)
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        logger.info("彩虹旋转中断")
 
async def blink(color):
    """闪烁效果"""
    try:
        while True:
            np.range_color(0, np.num, color)
            await asyncio.sleep(0.5)
            np.clear()
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        logger.info("闪烁中断")
 
async def breathe(color):
    """呼吸灯效果"""
    try:
        np.range_color(0, np.num, color)
        current_brightness = 20
        direction = 1
        
        while True:
            np.brightness(current_brightness)
            await asyncio.sleep(0.03)
            
            current_brightness += direction * 2
            if current_brightness >= 100:
                current_brightness = 100
                direction = -1
            elif current_brightness <= 20:
                current_brightness = 20
                direction = 1
    except asyncio.CancelledError:
        logger.info("呼吸灯中断")
 
async def running_lights(color):
    """跑马灯效果"""
    try:
        np.clear()
        while True:
            for i in range(np.num):
                np[i] = color
                if i > 0:
                    np[i-1] = (color >> 1) & 0x7F7F7F  # 半亮
                if i > 1:
                    np[i-2] = 0  # 熄灭
                await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        logger.info("跑马灯中断")
 
async def fire_effect():
    """火焰效果"""
    try:
        heat = [0] * np.num
        while True:
            # 冷却
            for i in range(np.num):
                heat[i] = max(0, heat[i] - random.randint(0, 5))
            
            # 传播热量
            for i in range(np.num - 3, 0, -1):
                heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
            
            # 添加火花
            if random.randint(0, 255) < 120:
                y = random.randint(0, 7)
                heat[y] = min(255, heat[y] + random.randint(160, 255))
            
            # 转换为颜色
            for i in range(np.num):
                # 热色映射 (黑->红->黄->白)
                t = heat[i]
                if t < 85:
                    r = t * 3
                    g = 0
                    b = 0
                elif t < 170:
                    r = 255
                    g = (t - 85) * 3
                    b = 0
                else:
                    r = 255
                    g = 255
                    b = (t - 170) * 3
                
                np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
            
            await asyncio.sleep(0.05)
    except asyncio.CancelledError:
        logger.info("火焰效果中断")
 
# ===================== 实用工具函数 =====================
def stop_current_effect():
    """停止当前效果"""
    global current_effect
    if current_effect and not current_effect.done():
        current_effect.cancel()
        current_effect = None
 
def get_color_by_name(color_str):
    """根据名称获取颜色值"""
    color_map = {
        "红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
        "黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, 
        "白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
    }
    return color_map.get(color_str, 0xFF0000)  # 默认为红色
 
# ===================== 网络通信部分 =====================
async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
 
async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', 'move.py'],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 move.py 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 move.py 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
 
async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()
 
async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise
 
async def handle_device_commands(data, websocket):
    """处理设备控制命令"""
    global current_effect
    try:
        json_data = json.loads(data)
        if json_data['id'] > 1 and json_data.get('result', {}):
            content = json_data['result']['content'][0]['text']
            text_data = json.loads(content)
            
            if text_data['success']:
                cmd_result = text_data['result']
                logger.info(f"收到命令: {cmd_result}")
                
                # 停止之前的灯效
                stop_current_effect()
                
                # 处理命令
                if "彩虹" in cmd_result or "渐变色" in cmd_result: 
                    current_effect = asyncio.create_task(rainbow_rotate())
                    logger.info("启动彩虹旋转效果")
                
                elif "熄灭" in cmd_result: 
                    np.clear()
                    logger.info("灯环已熄灭")
                
                elif "闪烁" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(blink(color))
                    logger.info(f"启动{color_str}闪烁效果")
                
                elif "呼吸" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(breathe(color))
                    logger.info(f"启动{color_str}呼吸效果")
                
                elif "跑马灯" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", 
                                    "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    current_effect = asyncio.create_task(running_lights(color))
                    logger.info(f"启动{color_str}跑马灯效果")
                
                elif "火焰" in cmd_result: 
                    current_effect = asyncio.create_task(fire_effect())
                    logger.info("启动火焰效果")
                
        await websocket.send(data)
    except json.JSONDecodeError as je:
        logger.error(f"JSON解析错误: {je}, 原始数据: {data}")
    except KeyError as ke:
        logger.error(f"缺少必要字段: {ke}, 原始数据: {data}")
    except Exception as e:
        logger.error(f"命令处理错误: {e}")
 
async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")
 
def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    # 停止当前灯效
    stop_current_effect()
    # 熄灭灯环
    np.clear()
    sys.exit(0)
 
if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=============================="
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

2.服务文件move.py

代码
# move.py 
from mcp.server.fastmcp import FastMCP
import logging
 
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
 
# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
    """设置灯环为彩虹渐变色"""
    return {"success": True, "result": "设置灯环为彩虹渐变色"}
 
@mcp.tool()
def close_light() -> dict:
    """熄灭灯环"""
    return {"success": True, "result": "熄灭灯环"}
 
@mcp.tool()
def set_light_blink(color: str) -> dict:
    """设置灯环闪烁效果。color: 颜色(如:红色)"""
    return {"success": True, "result": f"设置灯环{color}闪烁"}
 
@mcp.tool()
def set_light_breathe(color: str) -> dict:
    """设置灯环呼吸效果。color: 颜色(如:蓝色)"""
    return {"success": True, "result": f"设置灯环{color}呼吸"}
 
@mcp.tool()
def set_running_lights(color: str) -> dict:
    """设置跑马灯效果。color: 颜色(如:黄色)"""
    return {"success": True, "result": f"设置灯环{color}跑马灯"}
 
@mcp.tool()
def set_fire_effect() -> dict:
    """设置火焰效果"""
    return {"success": True, "result": "设置灯环火焰效果"}
 
# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

360截图20250816142110492.jpg

三、知识扩展:

MCP

Model Context Protocol (MCP,模型上下文协议) 是一种专为 AI 模型与外部系统交互 设计的开放标准协议,2024 年由 Anthropic 等公司牵头提出,2025 年逐步被主流 AI 平台采纳,旨在提供统一的接口,使大语言模型(LLM)如 ChatGPT、Claude 等能动态访问数据库、API 或企业工具,实现更智能的实时数据处理和任务执行。

评论

user-avatar
  • rzegkly

    rzegkly2025.08.20

    考虑上位机乐动掌控需要访问小智网站,如果出现交互问题的时候,我们可以重启乐动掌控,实现智能交互了! 下位机M10必须切换python3.12.7,否则mcp库无法安装!

    0