7.21
【写在前面】
自从学习了云天老师的M10(MCP)课程后,就开始了写作业模式,其实水平也没有提高很多,就是又菜又爱玩那种。
今天继续玩灯环,灯环的各种炫酷灯效,比如彩虹旋转、闪烁、呼吸、跑马灯、火焰,除了彩虹旋转和火焰,其它的灯效还可以随时改变颜色,加上小智AI参与互动,更加有趣可爱。


材料清单
- 行空板M10 X1
- M10电池扩展板组合 X1
- 24灯RGB灯环 X1
步骤1 行空板M10上安装pyenv环境、Python 3.12.7和mcp库
步骤2 配置小智AI
步骤3 M10程序编写
步骤1和2在前面帖子中有详述,这里不再赘述。
1、mcp服务文件,move.py。
功能描述:控制行空板M10 引脚P23 中24RGB灯环的五种灯效。
# move.py (最终简洁版)
from mcp.server.fastmcp import FastMCP
import logging
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
"""设置灯环为彩虹渐变色"""
return {"success": True, "result": "设置灯环为彩虹渐变色"}
@mcp.tool()
def close_light() -> dict:
"""熄灭灯环"""
return {"success": True, "result": "熄灭灯环"}
@mcp.tool()
def set_light_blink(color: str) -> dict:
"""设置灯环闪烁效果。color: 颜色(如:红色)"""
return {"success": True, "result": f"设置灯环{color}闪烁"}
@mcp.tool()
def set_light_breathe(color: str) -> dict:
"""设置灯环呼吸效果。color: 颜色(如:蓝色)"""
return {"success": True, "result": f"设置灯环{color}呼吸"}
@mcp.tool()
def set_running_lights(color: str) -> dict:
"""设置跑马灯效果。color: 颜色(如:黄色)"""
return {"success": True, "result": f"设置灯环{color}跑马灯"}
@mcp.tool()
def set_fire_effect() -> dict:
"""设置火焰效果"""
return {"success": True, "result": "设置灯环火焰效果"}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
小智AI管道文件mcp_pipe.py。
新建文件mcp_pipe.py,修改代码,注意填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
# mcp_pipe.py (最终简洁版)
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json
from pinpong.board import Board, Pin, NeoPixel
# 初始化硬件
Board().begin()
u_gui = GUI()
# 初始化单个灯环 (P23引脚, 24灯)
pin23 = Pin(Pin.P23)
np = NeoPixel(pin23, 24)
np.brightness(100) # 设置默认亮度
# 全局变量
current_effect = None # 当前运行的效果任务
# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
# ===================== 核心灯效函数 =====================
async def rainbow_rotate():
"""彩虹色循环旋转效果"""
try:
np.rainbow(0, np.num - 1, 1, 360)
while True:
np.rotate(1)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("彩虹旋转中断")
async def blink(color):
"""闪烁效果"""
try:
while True:
np.range_color(0, np.num, color)
await asyncio.sleep(0.5)
np.clear()
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("闪烁中断")
async def breathe(color):
"""呼吸灯效果"""
try:
np.range_color(0, np.num, color)
current_brightness = 20
direction = 1
while True:
np.brightness(current_brightness)
await asyncio.sleep(0.03)
current_brightness += direction * 2
if current_brightness >= 100:
current_brightness = 100
direction = -1
elif current_brightness <= 20:
current_brightness = 20
direction = 1
except asyncio.CancelledError:
logger.info("呼吸灯中断")
async def running_lights(color):
"""跑马灯效果"""
try:
np.clear()
while True:
for i in range(np.num):
np[i] = color
if i > 0:
np[i-1] = (color >> 1) & 0x7F7F7F # 半亮
if i > 1:
np[i-2] = 0 # 熄灭
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("跑马灯中断")
async def fire_effect():
"""火焰效果"""
try:
heat = [0] * np.num
while True:
# 冷却
for i in range(np.num):
heat[i] = max(0, heat[i] - random.randint(0, 5))
# 传播热量
for i in range(np.num - 3, 0, -1):
heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
# 添加火花
if random.randint(0, 255) < 120:
y = random.randint(0, 7)
heat[y] = min(255, heat[y] + random.randint(160, 255))
# 转换为颜色
for i in range(np.num):
# 热色映射 (黑->红->黄->白)
t = heat[i]
if t < 85:
r = t * 3
g = 0
b = 0
elif t < 170:
r = 255
g = (t - 85) * 3
b = 0
else:
r = 255
g = 255
b = (t - 170) * 3
np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logger.info("火焰效果中断")
# ===================== 实用工具函数 =====================
def stop_current_effect():
"""停止当前效果"""
global current_effect
if current_effect and not current_effect.done():
current_effect.cancel()
current_effect = None
def get_color_by_name(color_str):
"""根据名称获取颜色值"""
color_map = {
"红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
"黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF,
"白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
}
return color_map.get(color_str, 0xFF0000) # 默认为红色
# ===================== 网络通信部分 =====================
async def connect_with_retry(uri):
"""带重连机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f}秒后重试...")
await asyncio.sleep(wait_time)
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
async def connect_to_server(uri):
"""连接到WebSocket服务器"""
global reconnect_attempt, backoff
try:
logger.info("连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info("WebSocket连接成功")
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"启动 move.py 进程")
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
if 'process' in locals():
logger.info(f"终止 move.py 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
async def pipe_websocket_to_process(websocket, process):
"""WebSocket到进程的管道"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""进程到WebSocket的管道"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
if not data:
break
logger.debug(f">> {data[:120]}...")
await handle_device_commands(data, websocket)
except Exception as e:
logger.error(f"进程到WebSocket错误: {e}")
raise
async def handle_device_commands(data, websocket):
"""处理设备控制命令"""
global current_effect
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
text_data = json.loads(content)
if text_data['success']:
cmd_result = text_data['result']
logger.info(f"收到命令: {cmd_result}")
# 停止之前的灯效
stop_current_effect()
# 处理命令
if "彩虹" in cmd_result or "渐变色" in cmd_result:
current_effect = asyncio.create_task(rainbow_rotate())
logger.info("启动彩虹旋转效果")
elif "熄灭" in cmd_result:
np.clear()
logger.info("灯环已熄灭")
elif "闪烁" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(blink(color))
logger.info(f"启动{color_str}闪烁效果")
elif "呼吸" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(breathe(color))
logger.info(f"启动{color_str}呼吸效果")
elif "跑马灯" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(running_lights(color))
logger.info(f"启动{color_str}跑马灯效果")
elif "火焰" in cmd_result:
current_effect = asyncio.create_task(fire_effect())
logger.info("启动火焰效果")
await websocket.send(data)
except json.JSONDecodeError as je:
logger.error(f"JSON解析错误: {je}, 原始数据: {data}")
except KeyError as ke:
logger.error(f"缺少必要字段: {ke}, 原始数据: {data}")
except Exception as e:
logger.error(f"命令处理错误: {e}")
async def pipe_process_stderr_to_terminal(process):
"""进程错误输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not data: break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"错误输出处理错误: {e}")
def signal_handler(sig, frame):
"""中断信号处理"""
logger.info("收到中断信号,正在关闭...")
# 停止当前灯效
stop_current_effect()
# 熄灭灯环
np.clear()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
在Mind+中运行mcp_pipe.py。
等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。
用乐动小智和M10(MCP)互动控制灯光。
【小结】
极简设计:5个核心灯效:彩虹旋转、闪烁、呼吸、跑马灯、火焰
稳定可靠:所有灯效参数固定,避免意外错误,简化错误处理逻辑,优化资源管理
易用性强:语音命令保持简单直观,开箱即用,无需复杂配置
性能优化:减少内存占用,降低CPU使用率,提高系统响应速度
使用指南:
# 基础控制:熄灭灯环
# 特效控制:
设置灯环为彩虹渐变色
设置灯环红色闪烁
设置灯环蓝色呼吸
设置灯环黄色跑马灯
设置灯环火焰效果
本项目代码由DeepSeek协助完成,特此致谢。
评论