回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

小智智能助手:RGB灯环颜色亮度控制(MCP) 简单

头像 rzyzzxw 2025.07.11 13 0

7.14

[写在前面]

这个帖子,写的是用K10小智AI通过MCP服务控制接在M10上的灯环改变颜色和亮度的事,顺便控制小风扇的开关。

c3d2a83ccd9f0272dffef59a2e684c0.jpg
bc68cebef0027783ce37b259f9bb3ae.png

材料清单

  • M10 X1
  • M10扩展板组合 X1
  • K10小智+电池 X1
  • RGB灯环 X1
  • 数字继电器模块 X1
  • 小风扇及电源 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7

这一步已在前面项目中按照官方教程完成,记录在:M10仰望小车(MCP)的学习与尝试

https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714

步骤2 K10小智聊天机器人配置

K10刷小智固件见官方教程。

行空板小智机器人官方地址为:

https://www.unihiker.com.cn/wiki/k10/xiaozhi_ai

现阶段,提供了四种小智固件。

最新版,除了支持调用摄像头识别物体之外,还支持板载RGB的调用。

配置小智AI

进入小智聊天机器人控制台,配置角色。

image.png
image.png

获取MCP接入点,复制智能体接入点地址备用。

步骤3 M10程序编写

1、编写mcp服务文件,move.py。

新建文件move.py,复制云天老师写的代码,修改为智能家的控制内容。

功能描述:控制M10风扇开启、关闭,灯环点亮、熄灭,灯环亮度、颜色调整。

image.png

代码
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

#配置日志
logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器
mcp = FastMCP("MoveServer")

#======================= 设备控制工具 =======================
#添加一个工具
@mcp.tool()
def open_fan() -> dict:
    """
    控制风扇开启。   
    该函数将设置P21引脚为高电平,控制接在该引脚上的继电器闭合电源开关,使风扇开启。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "风扇开启"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_fan() -> dict:
    """
    控制风扇关闭。   
    该函数将设置P21引脚为低电平,控制接在该引脚上的继电器断开电源开关,使风扇关闭。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "风扇关闭"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def open_light() -> dict:
    """
    控制RGB灯环点亮。   
    该函数将设置P23引脚RGB灯环点亮,设置为红色,亮度100。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "灯环已点亮"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_light() -> dict:
    """
    控RGB灯环熄灭。   
    该函数将设置P23引脚RGB灯环熄灭。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "灯环已熄灭"
    logger.info(result)
    return {"success": True, "result": result}

# 新增RGB颜色控制工具
@mcp.tool()
def set_light_color(color: str) -> dict:
    """
    设置RGB灯环颜色。   
    该函数将设置P23引脚RGB灯环的颜色。
    参数:
        color (str): 颜色名称(如红色、绿色、蓝色等)
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = f"设置颜色为{color}"
    logger.info(f"设置灯环颜色: {color}")
    return {"success": True, "result": result}

# 新增RGB亮度控制工具
@mcp.tool()
def set_light_brightness(brightness: int) -> dict:
    """
    设置RGB灯环亮度。   
    该函数将设置P23引脚RGB灯环的亮度。
    参数:
        brightness (int): 亮度值(0-255)
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = f"设置亮度为{brightness}"
    logger.info(f"设置灯环亮度: {brightness}")
    return {"success": True, "result": result}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

修改小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

back.png

代码
# mcp_pipe.py

"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py 

"""

#  -*- coding: UTF-8 -*-
 
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Board,Pin
from pinpong.board import NeoPixel
import json

Board().begin()
u_gui=GUI()
pin1 = Pin(Pin.P21, Pin.OUT)
pin3 = Pin(Pin.P23, Pin.PWM)
np1 = NeoPixel(pin3,16)
np1.brightness(100)

u_gui.draw_image(image="back.png",x=0,y=0)
u_gui.draw_text(text="M10智能助手",x=10,y=20,font_size=25, color="#FF00FF")

# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            print(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            print(json_str['id'])

            if json_str['id']>1:
              print(json_str['id'])
              if json_str.get('result', {}):

                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                   print(text['result'])
                   L=480
                   
                   # 处理RGB灯环控制命令
                   if "开启" in text['result']:
                       pin1.write_digital(1)
                   if "关闭" in text['result']:
                       pin1.write_digital(0)
                   if "点亮" in text['result']:
                       np1.range_color(0,15,0xFF0000) 
                   if "熄灭" in text['result']:
                       np1.clear()
                       
                   # 新增颜色控制处理
                   if "设置颜色" in text['result']:
                       # 从文本中提取颜色值
                       color_str = text['result'].split("为")[-1].strip()
                       color_map = {
                           "红色": 0xFF0000,
                           "绿色": 0x00FF00,
                           "蓝色": 0x0000FF,
                           "黄色": 0xFFFF00,
                           "紫色": 0x800080,
                           "青色": 0x00FFFF,
                           "白色": 0xFFFFFF
                       }
                       color_code = color_map.get(color_str, 0xFF0000)  # 默认为红色
                       np1.range_color(0,15,color_code)
                       
                   # 新增亮度控制处理
                   if "设置亮度" in text['result']:
                       # 从文本中提取亮度值
                       brightness_str = text['result'].split("为")[-1].strip()
                       try:
                           brightness = int(brightness_str)
                           # 确保亮度在0-255范围内
                           brightness = max(0, min(brightness, 255))
                           np1.brightness(brightness)
                       except ValueError:
                           logger.error(f"无效的亮度值: {brightness_str}")
                           
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    # mcp_script
    #if len(sys.argv) < 2:
    #    logger.error("Usage: mcp_pipe.py ")
    #    sys.exit(1)

    mcp_script = "move.py"

    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjUwMTgyNn0.mJP_f0oQRsWwEK2Vq6u9yrfqkaZQpuziRRN45o6kE_5UCo9Ls3J3GKfjo4nddaY0mV9JCRkGueP6eg4cqzSlHg"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)

    # Start main loop
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")

注意:MCP接入地址要换成自己小智的接入点地址。

在mind+中点击运行mcp_pipe.py。

稍等一会,成功连接服务器。

下面简化版连接服务器后终端显示如下图:

屏幕截图 2025-07-11 095255.png

此时接入点刷新后是这样的:在线,工具可用。

下面就可以用K10小智和M10智能家(MCP)互动了。

屏幕截图 2025-07-11 095328.png

上面代码有些地方不够简练,放在DeepSeek中优化了下,得到下面简化版,逻辑更清晰。

代码
# 简化版
# move.py
from mcp.server.fastmcp import FastMCP
import logging

# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")

# 设备控制工具
@mcp.tool()
def open_fan() -> dict:
    """开启风扇,行空板M10引脚P21高电平,继电器闭合,小风扇电源接通,开始转动。"""
    return {"success": True, "result": "风扇开启"}

@mcp.tool()
def close_fan() -> dict:
    """关闭风扇,行空板M10引脚P21低电平,继电器打开,小风扇电源断开,停止转动。"""
    return {"success": True, "result": "风扇关闭"}

@mcp.tool()
def open_light() -> dict:
    """点亮灯环(红色),行空板M10引脚P23RGB灯环点亮。"""
    return {"success": True, "result": "灯环已点亮"}

@mcp.tool()
def close_light() -> dict:
    """熄灭灯环,行空板M10引脚P23RGB灯环熄灭。"""
    return {"success": True, "result": "灯环已熄灭"}

@mcp.tool()
def set_light_color(color: str) -> dict:
    """设置行空板M10引脚P23RGB灯环颜色,支持: 红色、绿色,蓝色,黄色,紫色,青色,白色"""
    return {"success": True, "result": f"设置颜色为{color}"}

@mcp.tool()
def set_light_brightness(brightness: int) -> dict:
    """设置行空板M10引脚P23RGB灯环亮度(0-255)"""
    return {"success": True, "result": f"设置亮度为{brightness}"}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")
代码
# mcp_pipe.py
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
from pinpong.board import Board, Pin, NeoPixel
import json

# 初始化硬件
Board().begin()
u_gui = GUI()
pin1 = Pin(Pin.P21, Pin.OUT)
pin3 = Pin(Pin.P23, Pin.PWM)  
np1 = NeoPixel(pin3, 16)     
np1.brightness(100)

# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10智能助手", x=10, y=20, font_size=25, color="#FF00FF")

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')

# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF

async def connect_with_retry(uri):
    """带重连机制的WebSocket连接"""
    global reconnect_attempt, backoff
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            await connect_to_server(uri)
        except Exception as e:
            reconnect_attempt += 1
            backoff = min(backoff * 2, MAX_BACKOFF)
            logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")

async def connect_to_server(uri):
    """连接到WebSocket服务器"""
    global reconnect_attempt, backoff
    try:
        logger.info("连接WebSocket服务器...")
        async with websockets.connect(uri) as websocket:
            logger.info("WebSocket连接成功")
            reconnect_attempt, backoff = 0, INITIAL_BACKOFF
            
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True
            )
            logger.info(f"启动 {mcp_script} 进程")
            
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket连接关闭: {e}")
        raise
    except Exception as e:
        logger.error(f"连接错误: {e}")
        raise
    finally:
        if 'process' in locals():
            logger.info(f"终止 {mcp_script} 进程")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()

async def pipe_websocket_to_process(websocket, process):
    """WebSocket到进程的管道"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"WebSocket到进程错误: {e}")
        raise
    finally:
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """进程到WebSocket的管道"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
            if not data:
                break
                
            logger.debug(f">> {data[:120]}...")
            await handle_device_commands(data, websocket)
    except Exception as e:
        logger.error(f"进程到WebSocket错误: {e}")
        raise

async def handle_device_commands(data, websocket):
    """处理设备控制命令"""
    try:
        json_str = json.loads(data)
        if json_str['id'] > 1 and json_str.get('result', {}):
            text = json.loads(json_str['result']['content'][0]['text'])
            
            if text['success']:
                cmd_result = text['result']
                
                # 风扇控制
                if "开启" in cmd_result: pin1.write_digital(1)
                if "关闭" in cmd_result: pin1.write_digital(0)
                
                # RGB灯环控制
                if "点亮" in cmd_result: np1.range_color(0, 15, 0xFF0000)
                if "熄灭" in cmd_result: np1.clear()
                if "设置颜色" in cmd_result: handle_color(cmd_result)
                if "设置亮度" in cmd_result: handle_brightness(cmd_result)
                
        await websocket.send(data)
    except Exception as e:
        logger.error(f"命令处理错误: {e}")

def handle_color(cmd_result):
    """处理颜色设置"""
    color_str = cmd_result.split("为")[-1].strip()
    color_map = {
        "红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
        "黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, "白色": 0xFFFFFF
    }
    np1.range_color(0, 15, color_map.get(color_str, 0xFF0000))

def handle_brightness(cmd_result):
    """处理亮度设置"""
    brightness_str = cmd_result.split("为")[-1].strip()
    try:
        brightness = max(0, min(int(brightness_str), 255))
        np1.brightness(brightness)
    except ValueError:
        logger.error(f"无效亮度值: {brightness_str}")

async def pipe_process_stderr_to_terminal(process):
    """进程错误输出到终端"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
            if not data: break
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"错误输出处理错误: {e}")

def signal_handler(sig, frame):
    """中断信号处理"""
    logger.info("收到中断信号,正在关闭...")
    sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    mcp_script = "move.py"
    endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

【小结】

1、挺好玩的项目,欢迎大家一起来玩,改一改,可以实现更多功能。

2、K10小智的最新版本能控制板载RGB灯的颜色和亮度,在实操时小智有时会搞混,要表达清晰任务要求,其它低版本小智就没有这种担心了。

例如用乐动掌控的小智,控制效果如下面视频:

评论

user-avatar