回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

小智AI遇上炫酷灯环(3)两个灯环MCP 简单

头像 rzyzzxw 2025.07.23 5 0

【写在前面】

这个帖子记录的是小智AI和两个灯环炫酷灯效的事,灯效和种类虽不是很多,可是两个灯环可以分别控制,分别用不同的灯效,还是很不错的哦。

d114650b7cce8e6ef2fd6c6c06f37b9.jpg
ea03c2f07b20136f205a2ba685d46dc.jpg

材料清单

  • M10 X1
  • M10电池扩展板组合 X1
  • RGB灯环 X2
  • 乐动小智 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7及MCP库安装

步骤2 小智聊天机器人配置

步骤3 M10程序编写

硬件与依赖​​

​​硬件​​:Unihiker开发板 + NeoPixel灯环(24灯珠,P23引脚,16灯珠,P21引脚)

​​关键库​​:

pinpong(硬件控制)

websockets(网络通信)

asyncio(异步任务调度)

简化控制逻辑,确保灯效流畅切换;模块化函数便于扩展效果;异步架构保障实时响应。适合物联网教学及智能硬件开发场景。

mcp服务文件,move.py。

功能描述:控制灯环点亮、熄灭,各种灯效。

代码
# move.py 
from mcp.server.fastmcp import FastMCP
import logging

# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")

# 简化工具描述
@mcp.tool()
def set_light_rainbow(ring: str) -> dict:
    """设置灯环为彩虹渐变色。ring: 灯环标识('P21'或'P23')"""
    return {"success": True, "result": f"设置{ring}灯环为彩虹渐变色"}

@mcp.tool()
def close_light(ring: str) -> dict:
    """熄灭灯环。ring: 灯环标识('P21'或'P23')"""
    return {"success": True, "result": f"熄灭{ring}灯环"}

@mcp.tool()
def set_light_blink(ring: str, color: str) -> dict:
    """设置灯环闪烁效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:红色)"""
    return {"success": True, "result": f"设置{ring}灯环{color}闪烁"}

@mcp.tool()
def set_light_breathe(ring: str, color: str) -> dict:
    """设置灯环呼吸效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:蓝色)"""
    return {"success": True, "result": f"设置{ring}灯环{color}呼吸"}

@mcp.tool()
def set_police_light() -> dict:
    """设置警车灯效(P21红灯闪烁,P23蓝灯闪烁)"""
    return {"success": True, "result": "设置警车灯效"}

@mcp.tool()
def set_running_lights(ring: str, color: str) -> dict:
    """设置跑马灯效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:黄色)"""
    return {"success": True, "result": f"设置{ring}灯环{color}跑马灯"}

@mcp.tool()
def set_fire_effect(ring: str) -> dict:
    """设置火焰效果。ring: 灯环标识('P21'或'P23')"""
    return {"success": True, "result": f"设置{ring}灯环火焰效果"}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

小智AI管道文件mcp_pipe.py。填入自己的MCP接入点地址。


功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

双灯环独立控制

P21引脚:16灯环

P23引脚:24灯环

支持分别控制每个灯环

灯效模式

闪烁效果:指定颜色闪烁

呼吸效果:平滑的亮度变化

警车灯效:P21红灯与P23蓝灯交替闪烁

跑马灯:动态流动的光点

火焰效果:逼真的火焰模拟

彩虹旋转:保留原有功能

命令处理增强

自动识别命令中的灯环标识(P21/P23)

支持中文颜色名称(红/绿/蓝/黄/紫/青/白/橙/粉)

自动停止之前的灯效再启动新效果

状态管理

跟踪每个灯环的当前效果

优雅停止所有任务

使用示例

现在您可以通过小智发送以下类型的命令:

基础控制

"点亮P21灯环"

"熄灭P23灯环"

"设置P21灯环颜色为蓝色"

"设置P23灯环亮度为150"

特效控制

"设置P21灯环彩虹渐变色"

"设置P23灯环红色闪烁"

"设置P21灯环绿色呼吸"

"设置警车灯效"

"设置P21灯环橙色跑马灯"

"设置P23灯环火焰效果"

代码
# mcp_pipe.py 
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json  # 添加缺失的json模块导入
from pinpong.board import Board, Pin, NeoPixel
from collections import defaultdict

# 初始化硬件
Board().begin()
u_gui = GUI()

# 初始化两个灯环
pin21 = Pin(Pin.P21)
np21 = NeoPixel(pin21, 16)  # P21引脚接16灯环
pin23 = Pin(Pin.P23)
np23 = NeoPixel(pin23, 24)  # P23引脚接24灯环

# 设置默认亮度
for np in [np21, np23]:
    np.brightness(100)

# 全局变量
tasks = defaultdict(list)  # 存储每个灯环的任务

# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

# 灯效函数
async def rainbow_rotate(np, speed=0.1):
    """彩虹色循环旋转效果"""
    np.rainbow(0, np.num - 1, 1, 360)
    try:
        while True:
            np.rotate(1)
            await asyncio.sleep(speed)
    except asyncio.CancelledError:
        logger.info(f"{np}彩虹旋转中断")

async def blink(np, color, interval=0.5):
    """闪烁效果"""
    try:
        while True:
            np.range_color(0, np.num, color)
            await asyncio.sleep(interval)
            np.clear()
            await asyncio.sleep(interval)
    except asyncio.CancelledError:
        logger.info(f"{np}闪烁中断")

async def breathe(np, color, interval=0.03, min_brightness=10, max_brightness=100):
    """呼吸灯效果"""
    try:
        np.range_color(0, np.num, color)
        current_brightness = min_brightness
        direction = 1
        
        while True:
            np.brightness(current_brightness)
            await asyncio.sleep(interval)
            
            current_brightness += direction * 2
            if current_brightness >= max_brightness:
                current_brightness = max_brightness
                direction = -1
            elif current_brightness <= min_brightness:
                current_brightness = min_brightness
                direction = 1
    except asyncio.CancelledError:
        logger.info(f"{np}呼吸灯中断")

async def police_flash(np1, np2, interval=0.2):
    """警车灯效(红蓝交替闪烁)"""
    try:
        while True:
            np1.range_color(0, np1.num, 0xFF0000)  # 红色
            np2.range_color(0, np2.num, 0x0000FF)  # 蓝色
            await asyncio.sleep(interval)
            np1.clear()
            np2.clear()
            await asyncio.sleep(interval)
    except asyncio.CancelledError:
        logger.info("警车灯效中断")

async def running_lights(np, color, interval=0.1):
    """跑马灯效果"""
    try:
        np.clear()
        while True:
            for i in range(np.num):
                np[i] = color
                if i > 0:
                    np[i-1] = (color >> 1) & 0x7F7F7F  # 半亮
                if i > 1:
                    np[i-2] = 0  # 熄灭
                await asyncio.sleep(interval)
    except asyncio.CancelledError:
        logger.info(f"{np}跑马灯中断")

async def fire_effect(np, cooling=55, sparking=120, interval=0.05):
    """火焰效果"""
    try:
        heat = [0] * np.num
        while True:
            # 冷却
            for i in range(np.num):
                heat[i] = max(0, heat[i] - random.randint(0, cooling // 10))
            
            # 传播热量
            for i in range(np.num - 3, 0, -1):
                heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
            
            # 添加火花
            if random.randint(0, 255) < sparking:
                y = random.randint(0, 7)
                heat[y] = min(255, heat[y] + random.randint(160, 255))
            
            # 转换为颜色
            for i in range(np.num):
                # 热色映射 (黑->红->黄->白)
                t = heat[i]
                if t < 85:
                    r = t * 3
                    g = 0
                    b = 0
                elif t < 170:
                    r = 255
                    g = (t - 85) * 3
                    b = 0
                else:
                    r = 255
                    g = 255
                    b = (t - 170) * 3
                
                np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
            
            await asyncio.sleep(interval)
    except asyncio.CancelledError:
        logger.info(f"{np}火焰效果中断")

def stop_ring_effects(ring_id):
    """停止指定灯环的所有效果"""
    if ring_id in tasks:
        for task in tasks[ring_id]:
            if not task.done():
                task.cancel()
        tasks[ring_id] = []

def get_color_by_name(color_str):
    """根据名称获取颜色值"""
    color_map = {
        "红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
        "黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, 
        "白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
    }
    return color_map.get(color_str, 0xFF0000)  # 默认为红色

async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")

async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', 'move.py'],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 move.py 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 move.py 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()

async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise

async def handle_device_commands(data, websocket):
    """处理设备控制命令"""
    try:
        json_data = json.loads(data)
        if json_data['id'] > 1 and json_data.get('result', {}):
            content = json_data['result']['content'][0]['text']
            text_data = json.loads(content)
            
            if text_data['success']:
                cmd_result = text_data['result']
                logger.info(f"收到命令: {cmd_result}")
                
                # 确定操作的灯环
                ring_name = "P21"
                if "P23" in cmd_result or "24" in cmd_result:
                    ring_name = "P23"
                
                # 停止之前的灯效
                stop_ring_effects(ring_name)
                
                # 处理命令
                np = np21 if ring_name == "P21" else np23
                
                if "彩虹" in cmd_result or "渐变色" in cmd_result: 
                    task = asyncio.create_task(rainbow_rotate(np))
                    tasks[ring_name].append(task)
                    logger.info(f"{ring_name}灯环启动彩虹旋转效果")
                
                elif "熄灭" in cmd_result: 
                    np.clear()
                    logger.info(f"{ring_name}灯环已熄灭")
                
                elif "闪烁" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    task = asyncio.create_task(blink(np, color))
                    tasks[ring_name].append(task)
                    logger.info(f"{ring_name}灯环启动{color_str}闪烁效果")
                
                elif "呼吸" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    task = asyncio.create_task(breathe(np, color))
                    tasks[ring_name].append(task)
                    logger.info(f"{ring_name}灯环启动{color_str}呼吸效果")
                
                elif "警车" in cmd_result: 
                    task1 = asyncio.create_task(blink(np21, 0xFF0000))
                    task2 = asyncio.create_task(blink(np23, 0x0000FF, interval=0.4))
                    tasks["P21"].append(task1)
                    tasks["P23"].append(task2)
                    logger.info("启动警车灯效")
                
                elif "跑马灯" in cmd_result: 
                    # 尝试提取颜色名称
                    color_str = "红色"  # 默认红色
                    for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
                        if color_name in cmd_result:
                            color_str = color_name
                            break
                    color = get_color_by_name(color_str)
                    task = asyncio.create_task(running_lights(np, color))
                    tasks[ring_name].append(task)
                    logger.info(f"{ring_name}灯环启动{color_str}跑马灯效果")
                
                elif "火焰" in cmd_result: 
                    task = asyncio.create_task(fire_effect(np))
                    tasks[ring_name].append(task)
                    logger.info(f"{ring_name}灯环启动火焰效果")
                
        await websocket.send(data)
    except Exception as e:
        logger.error(f"命令处理错误: {e}")

async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")

def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    # 停止所有灯效
    for ring_id in list(tasks.keys()):
        for task in tasks[ring_id]:
            if not task.done():
                task.cancel()
        tasks[ring_id] = []
    # 熄灭所有灯环
    np21.clear()
    np23.clear()
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

在Mind+中运行mcp_pipe.py。

等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。

用乐动小智和M10(MCP)互动控制灯光。

本项目代码由DeepSeek协助完成,致谢。

评论

user-avatar