【写在前面】
这个帖子记录的是小智AI和两个灯环炫酷灯效的事,灯效和种类虽不是很多,可是两个灯环可以分别控制,分别用不同的灯效,还是很不错的哦。


材料清单
- M10 X1
- M10电池扩展板组合 X1
- RGB灯环 X2
- 乐动小智 X1
步骤1 行空板M10上安装pyenv环境和Python 3.12.7及MCP库安装
步骤2 小智聊天机器人配置
步骤3 M10程序编写
硬件与依赖
硬件:Unihiker开发板 + NeoPixel灯环(24灯珠,P23引脚,16灯珠,P21引脚)
关键库:
pinpong(硬件控制)
websockets(网络通信)
asyncio(异步任务调度)
简化控制逻辑,确保灯效流畅切换;模块化函数便于扩展效果;异步架构保障实时响应。适合物联网教学及智能硬件开发场景。
mcp服务文件,move.py。
功能描述:控制灯环点亮、熄灭,各种灯效。
# move.py
from mcp.server.fastmcp import FastMCP
import logging
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
# 简化工具描述
@mcp.tool()
def set_light_rainbow(ring: str) -> dict:
"""设置灯环为彩虹渐变色。ring: 灯环标识('P21'或'P23')"""
return {"success": True, "result": f"设置{ring}灯环为彩虹渐变色"}
@mcp.tool()
def close_light(ring: str) -> dict:
"""熄灭灯环。ring: 灯环标识('P21'或'P23')"""
return {"success": True, "result": f"熄灭{ring}灯环"}
@mcp.tool()
def set_light_blink(ring: str, color: str) -> dict:
"""设置灯环闪烁效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:红色)"""
return {"success": True, "result": f"设置{ring}灯环{color}闪烁"}
@mcp.tool()
def set_light_breathe(ring: str, color: str) -> dict:
"""设置灯环呼吸效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:蓝色)"""
return {"success": True, "result": f"设置{ring}灯环{color}呼吸"}
@mcp.tool()
def set_police_light() -> dict:
"""设置警车灯效(P21红灯闪烁,P23蓝灯闪烁)"""
return {"success": True, "result": "设置警车灯效"}
@mcp.tool()
def set_running_lights(ring: str, color: str) -> dict:
"""设置跑马灯效果。ring: 灯环标识('P21'或'P23'),color: 颜色(如:黄色)"""
return {"success": True, "result": f"设置{ring}灯环{color}跑马灯"}
@mcp.tool()
def set_fire_effect(ring: str) -> dict:
"""设置火焰效果。ring: 灯环标识('P21'或'P23')"""
return {"success": True, "result": f"设置{ring}灯环火焰效果"}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
小智AI管道文件mcp_pipe.py。填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
双灯环独立控制:
P21引脚:16灯环
P23引脚:24灯环
支持分别控制每个灯环
灯效模式:
闪烁效果:指定颜色闪烁
呼吸效果:平滑的亮度变化
警车灯效:P21红灯与P23蓝灯交替闪烁
跑马灯:动态流动的光点
火焰效果:逼真的火焰模拟
彩虹旋转:保留原有功能
命令处理增强:
自动识别命令中的灯环标识(P21/P23)
支持中文颜色名称(红/绿/蓝/黄/紫/青/白/橙/粉)
自动停止之前的灯效再启动新效果
状态管理:
跟踪每个灯环的当前效果
优雅停止所有任务
使用示例
现在您可以通过小智发送以下类型的命令:
基础控制:
"点亮P21灯环"
"熄灭P23灯环"
"设置P21灯环颜色为蓝色"
"设置P23灯环亮度为150"
特效控制:
"设置P21灯环彩虹渐变色"
"设置P23灯环红色闪烁"
"设置P21灯环绿色呼吸"
"设置警车灯效"
"设置P21灯环橙色跑马灯"
"设置P23灯环火焰效果"
# mcp_pipe.py
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json # 添加缺失的json模块导入
from pinpong.board import Board, Pin, NeoPixel
from collections import defaultdict
# 初始化硬件
Board().begin()
u_gui = GUI()
# 初始化两个灯环
pin21 = Pin(Pin.P21)
np21 = NeoPixel(pin21, 16) # P21引脚接16灯环
pin23 = Pin(Pin.P23)
np23 = NeoPixel(pin23, 24) # P23引脚接24灯环
# 设置默认亮度
for np in [np21, np23]:
np.brightness(100)
# 全局变量
tasks = defaultdict(list) # 存储每个灯环的任务
# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
# 灯效函数
async def rainbow_rotate(np, speed=0.1):
"""彩虹色循环旋转效果"""
np.rainbow(0, np.num - 1, 1, 360)
try:
while True:
np.rotate(1)
await asyncio.sleep(speed)
except asyncio.CancelledError:
logger.info(f"{np}彩虹旋转中断")
async def blink(np, color, interval=0.5):
"""闪烁效果"""
try:
while True:
np.range_color(0, np.num, color)
await asyncio.sleep(interval)
np.clear()
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"{np}闪烁中断")
async def breathe(np, color, interval=0.03, min_brightness=10, max_brightness=100):
"""呼吸灯效果"""
try:
np.range_color(0, np.num, color)
current_brightness = min_brightness
direction = 1
while True:
np.brightness(current_brightness)
await asyncio.sleep(interval)
current_brightness += direction * 2
if current_brightness >= max_brightness:
current_brightness = max_brightness
direction = -1
elif current_brightness <= min_brightness:
current_brightness = min_brightness
direction = 1
except asyncio.CancelledError:
logger.info(f"{np}呼吸灯中断")
async def police_flash(np1, np2, interval=0.2):
"""警车灯效(红蓝交替闪烁)"""
try:
while True:
np1.range_color(0, np1.num, 0xFF0000) # 红色
np2.range_color(0, np2.num, 0x0000FF) # 蓝色
await asyncio.sleep(interval)
np1.clear()
np2.clear()
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("警车灯效中断")
async def running_lights(np, color, interval=0.1):
"""跑马灯效果"""
try:
np.clear()
while True:
for i in range(np.num):
np[i] = color
if i > 0:
np[i-1] = (color >> 1) & 0x7F7F7F # 半亮
if i > 1:
np[i-2] = 0 # 熄灭
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"{np}跑马灯中断")
async def fire_effect(np, cooling=55, sparking=120, interval=0.05):
"""火焰效果"""
try:
heat = [0] * np.num
while True:
# 冷却
for i in range(np.num):
heat[i] = max(0, heat[i] - random.randint(0, cooling // 10))
# 传播热量
for i in range(np.num - 3, 0, -1):
heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
# 添加火花
if random.randint(0, 255) < sparking:
y = random.randint(0, 7)
heat[y] = min(255, heat[y] + random.randint(160, 255))
# 转换为颜色
for i in range(np.num):
# 热色映射 (黑->红->黄->白)
t = heat[i]
if t < 85:
r = t * 3
g = 0
b = 0
elif t < 170:
r = 255
g = (t - 85) * 3
b = 0
else:
r = 255
g = 255
b = (t - 170) * 3
np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"{np}火焰效果中断")
def stop_ring_effects(ring_id):
"""停止指定灯环的所有效果"""
if ring_id in tasks:
for task in tasks[ring_id]:
if not task.done():
task.cancel()
tasks[ring_id] = []
def get_color_by_name(color_str):
"""根据名称获取颜色值"""
color_map = {
"红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
"黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF,
"白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB
}
return color_map.get(color_str, 0xFF0000) # 默认为红色
async def connect_with_retry(uri):
"""带重连机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f}秒后重试...")
await asyncio.sleep(wait_time)
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
async def connect_to_server(uri):
"""连接到WebSocket服务器"""
global reconnect_attempt, backoff
try:
logger.info("连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info("WebSocket连接成功")
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"启动 move.py 进程")
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
if 'process' in locals():
logger.info(f"终止 move.py 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
async def pipe_websocket_to_process(websocket, process):
"""WebSocket到进程的管道"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""进程到WebSocket的管道"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
if not data:
break
logger.debug(f">> {data[:120]}...")
await handle_device_commands(data, websocket)
except Exception as e:
logger.error(f"进程到WebSocket错误: {e}")
raise
async def handle_device_commands(data, websocket):
"""处理设备控制命令"""
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
text_data = json.loads(content)
if text_data['success']:
cmd_result = text_data['result']
logger.info(f"收到命令: {cmd_result}")
# 确定操作的灯环
ring_name = "P21"
if "P23" in cmd_result or "24" in cmd_result:
ring_name = "P23"
# 停止之前的灯效
stop_ring_effects(ring_name)
# 处理命令
np = np21 if ring_name == "P21" else np23
if "彩虹" in cmd_result or "渐变色" in cmd_result:
task = asyncio.create_task(rainbow_rotate(np))
tasks[ring_name].append(task)
logger.info(f"{ring_name}灯环启动彩虹旋转效果")
elif "熄灭" in cmd_result:
np.clear()
logger.info(f"{ring_name}灯环已熄灭")
elif "闪烁" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
task = asyncio.create_task(blink(np, color))
tasks[ring_name].append(task)
logger.info(f"{ring_name}灯环启动{color_str}闪烁效果")
elif "呼吸" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
task = asyncio.create_task(breathe(np, color))
tasks[ring_name].append(task)
logger.info(f"{ring_name}灯环启动{color_str}呼吸效果")
elif "警车" in cmd_result:
task1 = asyncio.create_task(blink(np21, 0xFF0000))
task2 = asyncio.create_task(blink(np23, 0x0000FF, interval=0.4))
tasks["P21"].append(task1)
tasks["P23"].append(task2)
logger.info("启动警车灯效")
elif "跑马灯" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色", "白色", "橙色", "粉色"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
task = asyncio.create_task(running_lights(np, color))
tasks[ring_name].append(task)
logger.info(f"{ring_name}灯环启动{color_str}跑马灯效果")
elif "火焰" in cmd_result:
task = asyncio.create_task(fire_effect(np))
tasks[ring_name].append(task)
logger.info(f"{ring_name}灯环启动火焰效果")
await websocket.send(data)
except Exception as e:
logger.error(f"命令处理错误: {e}")
async def pipe_process_stderr_to_terminal(process):
"""进程错误输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not data: break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"错误输出处理错误: {e}")
def signal_handler(sig, frame):
"""中断信号处理"""
logger.info("收到中断信号,正在关闭...")
# 停止所有灯效
for ring_id in list(tasks.keys()):
for task in tasks[ring_id]:
if not task.done():
task.cancel()
tasks[ring_id] = []
# 熄灭所有灯环
np21.clear()
np23.clear()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
在Mind+中运行mcp_pipe.py。
等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。
用乐动小智和M10(MCP)互动控制灯光。
本项目代码由DeepSeek协助完成,致谢。
评论