回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

小智AI控制房间模型RGB灯光(MCP) 简单

头像 rzyzzxw 2025.07.18 43 0

7.18

【写在前面】

在这个帖子中,记录了一条灯带,六颗RGB灯的事,它们分别布置在模型的六个房间内,充当六个房间的照明灯光,我们尝试用小智来控制它们的点亮与熄灭,颜色及亮度调节。

4f81d1f58c85aba25118d5a2e1c1e89.jpg

开始计划做成这样子,上面是一层哑光薄塑料板和一层纸:

d73bd93fb511dd371a0e872f5090a69.jpg

可是多了一层纸,透光效果不好,改成了这样子,只有一层塑料板了。

78adc711f06cb066326d7657e629fdb.jpg

先看完成后的效果,每个房间一颗灯,亮度不太好,好在只是个模型,给大家做个抛砖引玉的引子吧。

材料清单

  • M10 X1
  • M10电池扩展板组合 X1
  • 乐动小智 X1
  • RGB灯带 6灯 X1

步骤1 行空板M10上安装pyenv环境、Python 3.12.7和mcp库

步骤2 配置小智AI

步骤3 M10程序编写

步骤1和2在前面帖子中有详述,这里不再赘述。

1、mcp服务文件,move.py。

功能描述:控制行空板M10 引脚P21 中6个RGB灯光点亮、熄灭,亮度、颜色调整。

image.png

代码
# move.py
# MCP服务器设备控制工具
# 最终版

from mcp.server.fastmcp import FastMCP
import logging

# 配置日志
logger = logging.getLogger('RoomLightServer')
mcp = FastMCP("RoomLightServer")

# 房间列表
ROOMS = ["客厅", "主卧", "次卧", "书房", "卫生间","厨房"]

# 设备控制工具
@mcp.tool()
def set_light_on(room: str) -> dict:
    """点亮指定房间的灯,房间包括:客厅、主卧、次卧、书房、卫生间、厨房。灯色设为白色,亮度100。"""
    if room not in ROOMS:
        return {"success": False, "result": f"无效的房间: {room}"}
    return {"success": True, "result": f"已点亮{room}的灯(白色, 亮度100)"}

@mcp.tool()
def set_light_off(room: str) -> dict:
    """熄灭指定房间的灯,房间包括:客厅、主卧、次卧、书房、卫生间、厨房。"""
    if room not in ROOMS:
        return {"success": False, "result": f"无效的房间: {room}"}
    return {"success": True, "result": f"已熄灭{room}的灯"}

@mcp.tool()
def set_light_color(room: str, color: str) -> dict:
    """设置指定房间灯的颜色,支持: 红色、绿色,蓝色,黄色,紫色,青色,白色"""
    if room not in ROOMS:
        return {"success": False, "result": f"无效的房间: {room}"}
    valid_colors = ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色"]
    if color not in valid_colors:
        return {"success": False, "result": f"无效的颜色: {color}"}
    return {"success": True, "result": f"已设置{room}的颜色为{color}"}

@mcp.tool()
def set_light_brightness(room: str, brightness: int) -> dict:
    """设置指定房间灯的亮度(0-255)"""
    if room not in ROOMS:
        return {"success": False, "result": f"无效的房间: {room}"}
    # 自动修正亮度值到0-255范围
    brightness = max(0, min(brightness, 255))
    return {"success": True, "result": f"已设置{room}的亮度为{brightness}"}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,修改代码,注意填入自己的MCP接入点地址。


功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

代码
# mcp_pipe.py
# 连接MCP服务器并通过WebSocket传输输入输出
# 最终稳定版

from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
from pinpong.board import Board, Pin, NeoPixel
import json
import os
import re

# 初始化硬件
Board().begin()
u_gui = GUI()
pin1 = Pin(Pin.P21)
np1 = NeoPixel(pin1, 6)  # 6个灯对应6个房间
np1.brightness(100)

# 房间与灯索引映射
ROOM_MAP = {
    "客厅": 0,
    "主卧": 1,
    "次卧": 3,
    "书房": 4,
    "卫生间": 2,
    "厨房": 5
}

# 颜色映射
COLOR_MAP = {
    "红色": (255, 0, 0),
    "绿色": (0, 255, 0),
    "蓝色": (0, 0, 255),
    "黄色": (255, 255, 0),
    "紫色": (128, 0, 128),
    "青色": (0, 255, 255),
    "白色": (255, 255, 255)
}

# 确保背景图片存在
if not os.path.exists("back.png"):
    # 创建默认黑色背景
    u_gui.fill_rect(x=0, y=0, w=240, h=320, color="#000000")
else:
    u_gui.draw_image(image="back.png", x=0, y=0)

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

# 房间状态跟踪
room_states = {
    "客厅": {"on": False, "color": "白色", "brightness": 100},
    "主卧": {"on": False, "color": "白色", "brightness": 100},
    "次卧": {"on": False, "color": "白色", "brightness": 100},
    "书房": {"on": False, "color": "白色", "brightness": 100},
    "卫生间": {"on": False, "color": "白色", "brightness": 100},
    "厨房": {"on": False, "color": "白色", "brightness": 100}
}

# UI文本对象存储
ui_texts = {
    "客厅": None,
    "主卧": None,
    "次卧": None,
    "书房": None,
    "卫生间": None,
    "厨房": None
}

# 状态显示位置(垂直排列,字体大小18)
ROOM_POSITIONS = {
    "客厅": (20, 50),
    "主卧": (20, 90),
    "次卧": (20, 130),
    "书房": (20, 170),
    "卫生间": (20, 210),
    "厨房":(20, 250)
}

def init_ui():
    """初始化用户界面"""
    # 清除屏幕
    if os.path.exists("back.png"):
        u_gui.draw_image(image="back.png", x=0, y=0)
    else:
        u_gui.fill_rect(x=0, y=0, w=240, h=320, color="#000000")
    
    # 标题
    u_gui.draw_text(text="房间灯光控制", x=10, y=5, font_size=25, color="#FF00FF")
    
    # 创建房间状态文本(字体大小18)
    for room, pos in ROOM_POSITIONS.items():
        color = "#FFFFFF"
        ui_texts[room] = u_gui.draw_text(text=f"{room}: 关", x=pos[0], y=pos[1], font_size=18, color=color)

def update_ui(room):
    """更新单个房间的UI显示 - 使用重绘方法"""
    # 清除旧文本
    if ui_texts[room]:
        # 用背景色覆盖旧文本
        pos = ROOM_POSITIONS[room]
        u_gui.fill_rect(x=pos[0]-5, y=pos[1]-5, w=150, h=30, color="#000000")
    
    # 绘制新文本(字体大小18)
    color = "#FFFF00" if room_states[room]["on"] else "#FFFFFF"
    status = "开" if room_states[room]["on"] else "关"
    pos = ROOM_POSITIONS[room]
    ui_texts[room] = u_gui.draw_text(text=f"{room}: {status}", x=pos[0], y=pos[1], font_size=18, color=color)

def update_room_light(room, action, color=None, brightness=None):
    """更新房间灯光状态并控制硬件"""
    idx = ROOM_MAP[room]
    
    if action == "on":
        # 点亮房间灯
        color_rgb = COLOR_MAP[room_states[room]["color"]]
        np1[idx] = color_rgb
        room_states[room]["on"] = True
        logger.info(f"{room}灯已点亮 ({room_states[room]['color']}, 亮度{room_states[room]['brightness']})")
    elif action == "off":
        # 熄灭房间灯
        np1[idx] = (0, 0, 0)
        room_states[room]["on"] = False
        logger.info(f"{room}灯已熄灭")
    elif action == "color" and color:
        # 设置房间灯颜色
        room_states[room]["color"] = color
        if room_states[room]["on"]:
            np1[idx] = COLOR_MAP[color]
        logger.info(f"{room}灯颜色设置为{color}")
    elif action == "brightness" and brightness is not None:
        # 设置房间灯亮度
        room_states[room]["brightness"] = brightness
        
        # 重要:更新全局亮度(NeoPixel只能设置全局亮度)
        np1.brightness(brightness)
        logger.info(f"{room}灯亮度设置为{brightness}")
    
    # 更新UI显示
    update_ui(room)

async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")

async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 {mcp_script} 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 {mcp_script} 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()

async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise

async def handle_device_commands(data, websocket):
    """处理设备控制命令 - 确保正确返回结果给MCP服务器"""
    try:
        # 解析JSON数据
        json_data = json.loads(data)
        logger.debug(f"收到MCP消息: {json_data}")
        
        # 确保正确处理工具调用结果
        if 'id' in json_data and 'result' in json_data:
            # 检查是否包含内容数据
            result = json_data['result']
            
            # 直接转发结果给MCP服务器
            await websocket.send(data)
            
            # 检查是否包含内容数据
            if 'content' in result and isinstance(result['content'], list) and len(result['content']) > 0:
                first_content = result['content'][0]
                
                # 检查是否包含文本内容
                if 'text' in first_content:
                    text_content = first_content['text']
                    
                    # 尝试解析文本内容为JSON
                    try:
                        content_data = json.loads(text_content)
                    except json.JSONDecodeError:
                        content_data = text_content
                    
                    # 检查是否是成功的结果
                    if isinstance(content_data, dict) and 'success' in content_data and content_data['success']:
                        cmd_result = content_data['result']
                        
                        # 处理房间灯光控制命令
                        if any(room in cmd_result for room in ROOM_MAP):
                            handle_room_light_control(cmd_result)
                    else:
                        logger.debug(f"非成功结果: {content_data}")
                else:
                    logger.debug(f"内容不包含文本: {first_content}")
            else:
                logger.debug(f"结果不包含有效内容: {result}")
        else:
            # 转发其他类型消息
            await websocket.send(data)
            logger.debug(f"转发非结果消息: {json_data}")
            
    except Exception as e:
        logger.error(f"命令处理错误: {e}")
        logger.error(f"原始数据: {data}")
        # 确保即使出错也转发数据
        await websocket.send(data)

def handle_room_light_control(cmd_result):
    """处理房间灯光控制命令"""
    # 识别房间
    room = None
    for room_name in ROOM_MAP:
        if room_name in cmd_result:
            room = room_name
            break
    
    if not room:
        logger.warning(f"未识别到房间: {cmd_result}")
        return
    
    # 解析操作
    if "点亮" in cmd_result or "开启" in cmd_result or "打开" in cmd_result:
        update_room_light(room, "on")
    elif "熄灭" in cmd_result or "关闭" in cmd_result or "关掉" in cmd_result:
        update_room_light(room, "off")
    elif "设置颜色" in cmd_result or "颜色" in cmd_result or "设为" in cmd_result:
        # 解析颜色
        color = None
        for color_name in COLOR_MAP:
            if color_name in cmd_result:
                color = color_name
                break
        if color:
            update_room_light(room, "color", color=color)
            logger.info(f"设置{room}颜色为: {color}")
        else:
            logger.warning(f"未识别颜色: {cmd_result}")
    elif "设置亮度" in cmd_result or "亮度" in cmd_result or "调至" in cmd_result:
        # 解析亮度值
        try:
            # 使用正则表达式提取数字
            match = re.search(r'\d+', cmd_result)
            if match:
                brightness = int(match.group())
                brightness = max(0, min(brightness, 255))
                update_room_light(room, "brightness", brightness=brightness)
                logger.info(f"设置{room}亮度为: {brightness}")
            else:
                logger.warning(f"未找到亮度值: {cmd_result}")
        except ValueError:
            logger.warning(f"无效的亮度值: {cmd_result}")
    else:
        logger.info(f"未识别操作: {cmd_result}")

async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")

def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    mcp_script = "move.py"
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
    
    # 初始化UI
    init_ui()
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

在Mind+中运行mcp_pipe.py。

等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。

用乐动小智和M10(MCP)互动控制灯光。


【使用示例:】

设置颜色:

"将客厅灯设为红色"

"设置主卧颜色为蓝色"

"把书房灯颜色改成绿色"

设置亮度:

"调整客厅亮度到150"

"设置主卧灯亮度为200"

"把书房灯亮度调到50"

控制灯光:

"打开客厅灯"

"关闭主卧灯"

"点亮书房灯"

"熄灭卫生间灯"

【项目总结:】

独立控制每个房间的灯光开关

支持7种颜色设置(红、绿、蓝、黄、紫、青、白)

亮度调节范围0-255

支持多种表达方式("开启"/"打开"、"关闭"/"关掉")

智能识别颜色和亮度值

全局亮度控制(同一条灯带,调整一个灯亮度其它灯也同步变化)

每个LED独立颜色控制

状态同步管理


(本项目代码由DeepSeek协助完成)

评论

user-avatar