【项目缘起】
处暑,是二十四节气之第十四个节气。
斗指戊(西南方);太阳黄经达150°;
于每年公历8月22-24日交节。
处暑,即为“出暑”,是炎热离开的意思。
在学习大圣老师的MCP相关文章后,我深受启发。
分享运用AI模型"小智"与行空板M10进行智能交互
——《小智控制RGB灯柱(MCP)》项目。
《小智控制RGB灯柱(MCP)》项目需要以下三步
1.上位机乐动掌控刷入ESP32固件,配置AI小智角色
2.下位机行空板M10作为刷入V0.4.1系统,配置MCP关键库
3.行空板M10与灯带连接,编写相关程序
项目可以实现:
1.实时数据的高效处理,
2.智能化任务执行,
3.动态灯光效果控制

材料清单
- 行空板M10 X1
- 乐动掌控3.0 X1
- RGB灯带 X1
步骤1 上位机——将小智刷入乐动掌控中
1.打开软件mPython中设置主控板乐动掌控3.0


2.烧录固件



步骤2 上位机——配置乐动掌控小智角色
步骤3 行空板M10作为下位机——固件刷入
步骤4 行空板M10作为下位机——配网
步骤5 行空板M10作为下位机——调整Python3.12.7
步骤6 行空板M10作为下位机——MCP互动的关键库
1.在终端,使用pip install mcp,安装mcp库

2. 安装websockets 库:pip install websockets

步骤7 程序测试
1.管道文件mcp_pipe.py
说明:RGB灯带接入M10扩展P4口.
# mcp_pipe.py
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json
from pinpong.board import Board, Pin, NeoPixel
# 初始化硬件
Board().begin()
u_gui = GUI()
# 初始化单个灯带 (P4引脚, 7灯)
pin4 = Pin(Pin.P4)
np = NeoPixel(pin4, 7)
np.brightness(100) # 设置默认亮度
# 全局变量
current_effect = None # 当前运行的效果任务
# 初始化UI
#u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="小智控制RGB灯柱", x=10, y=5, font_size=20, color="#000000")
u_gui.draw_emoji(emoji="Smile",x=0,y=40,duration=0.2)
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
# ===================== 核心灯效函数 =====================
async def rainbow_rotate():
"""彩虹色循环旋转效果"""
try:
np.rainbow(0, np.num - 1, 1, 360)
while True:
np.rotate(1)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("彩虹旋转中断")
async def blink(color):
"""闪烁效果"""
try:
while True:
np.range_color(0, np.num, color)
await asyncio.sleep(0.5)
np.clear()
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("闪烁中断")
async def breathe(color):
"""呼吸灯效果"""
try:
np.range_color(0, np.num, color)
current_brightness = 20
direction = 1
while True:
np.brightness(current_brightness)
await asyncio.sleep(0.03)
current_brightness += direction * 2
if current_brightness >= 100:
current_brightness = 100
direction = -1
elif current_brightness <= 20:
current_brightness = 20
direction = 1
except asyncio.CancelledError:
logger.info("呼吸灯中断")
async def running_lights(color):
"""跑马灯效果"""
try:
np.clear()
while True:
for i in range(np.num):
np[i] = color
if i > 0:
np[i-1] = (color >> 1) & 0x7F7F7F # 半亮
if i > 1:
np[i-2] = 0 # 熄灭
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("跑马灯中断")
async def fire_effect():
"""火焰效果"""
try:
heat = [0] * np.num
while True:
# 冷却
for i in range(np.num):
heat[i] = max(0, heat[i] - random.randint(0, 5))
# 传播热量
for i in range(np.num - 3, 0, -1):
heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
# 添加火花
if random.randint(0, 255) < 120:
y = random.randint(0, 7)
heat[y] = min(255, heat[y] + random.randint(160, 255))
# 转换为颜色
for i in range(np.num):
# 热色映射 (黑->红->黄->白)
t = heat[i]
if t < 85:
r = t * 3
g = 0
b = 0
elif t < 170:
r = 255
g = (t - 85) * 3
b = 0
else:
r = 255
g = 255
b = (t - 170) * 3
np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logger.info("火焰效果中断")
# ===================== 实用工具函数 =====================
def stop_current_effect():
"""停止当前效果"""
global current_effect
if current_effect and not current_effect.done():
current_effect.cancel()
current_effect = None
def get_color_by_name(color_str):
"""根据名称获取颜色值"""
color_map = {
"红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
"黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF,
"白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
}
return color_map.get(color_str, 0xFF0000) # 默认为红色
# ===================== 网络通信部分 =====================
async def connect_with_retry(uri):
"""带重连机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f}秒后重试...")
await asyncio.sleep(wait_time)
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
async def connect_to_server(uri):
"""连接到WebSocket服务器"""
global reconnect_attempt, backoff
try:
logger.info("连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info("WebSocket连接成功")
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"启动 move.py 进程")
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
if 'process' in locals():
logger.info(f"终止 move.py 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
async def pipe_websocket_to_process(websocket, process):
"""WebSocket到进程的管道"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""进程到WebSocket的管道"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
if not data:
break
logger.debug(f">> {data[:120]}...")
await handle_device_commands(data, websocket)
except Exception as e:
logger.error(f"进程到WebSocket错误: {e}")
raise
async def handle_device_commands(data, websocket):
"""处理设备控制命令"""
global current_effect
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
text_data = json.loads(content)
if text_data['success']:
cmd_result = text_data['result']
logger.info(f"收到命令: {cmd_result}")
# 停止之前的灯效
stop_current_effect()
# 处理命令
if "彩虹" in cmd_result or "渐变色" in cmd_result:
current_effect = asyncio.create_task(rainbow_rotate())
logger.info("启动彩虹旋转效果")
elif "熄灭" in cmd_result:
np.clear()
logger.info("灯环已熄灭")
elif "闪烁" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(blink(color))
logger.info(f"启动{color_str}闪烁效果")
elif "呼吸" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(breathe(color))
logger.info(f"启动{color_str}呼吸效果")
elif "跑马灯" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(running_lights(color))
logger.info(f"启动{color_str}跑马灯效果")
elif "火焰" in cmd_result:
current_effect = asyncio.create_task(fire_effect())
logger.info("启动火焰效果")
await websocket.send(data)
except json.JSONDecodeError as je:
logger.error(f"JSON解析错误: {je}, 原始数据: {data}")
except KeyError as ke:
logger.error(f"缺少必要字段: {ke}, 原始数据: {data}")
except Exception as e:
logger.error(f"命令处理错误: {e}")
async def pipe_process_stderr_to_terminal(process):
"""进程错误输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not data: break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"错误输出处理错误: {e}")
def signal_handler(sig, frame):
"""中断信号处理"""
logger.info("收到中断信号,正在关闭...")
# 停止当前灯效
stop_current_effect()
# 熄灭灯环
np.clear()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=============================="
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
2.服务文件move.py
# move.py
from mcp.server.fastmcp import FastMCP
import logging
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
"""设置灯环为彩虹渐变色"""
return {"success": True, "result": "设置灯环为彩虹渐变色"}
@mcp.tool()
def close_light() -> dict:
"""熄灭灯环"""
return {"success": True, "result": "熄灭灯环"}
@mcp.tool()
def set_light_blink(color: str) -> dict:
"""设置灯环闪烁效果。color: 颜色(如:红色)"""
return {"success": True, "result": f"设置灯环{color}闪烁"}
@mcp.tool()
def set_light_breathe(color: str) -> dict:
"""设置灯环呼吸效果。color: 颜色(如:蓝色)"""
return {"success": True, "result": f"设置灯环{color}呼吸"}
@mcp.tool()
def set_running_lights(color: str) -> dict:
"""设置跑马灯效果。color: 颜色(如:黄色)"""
return {"success": True, "result": f"设置灯环{color}跑马灯"}
@mcp.tool()
def set_fire_effect() -> dict:
"""设置火焰效果"""
return {"success": True, "result": "设置灯环火焰效果"}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
三、知识扩展:
MCP
Model Context Protocol (MCP,模型上下文协议) 是一种专为 AI 模型与外部系统交互 设计的开放标准协议,2024 年由 Anthropic 等公司牵头提出,2025 年逐步被主流 AI 平台采纳,旨在提供统一的接口,使大语言模型(LLM)如 ChatGPT、Claude 等能动态访问数据库、API 或企业工具,实现更智能的实时数据处理和任务执行。
rzegkly2025.08.20
考虑上位机乐动掌控需要访问小智网站,如果出现交互问题的时候,我们可以重启乐动掌控,实现智能交互了! 下位机M10必须切换python3.12.7,否则mcp库无法安装!