7.22
【写在前面】
这个帖子记录的是小智AI和单个灯环炫酷灯效的事,相比上一个灯效的帖子中的内容,这算是一个增强版,增加了新的灯效。
基础灯效
彩虹旋转:循环渐变彩虹色
单色闪烁:支持红/绿/蓝等颜色闪烁
呼吸效果:固定亮度周期性渐变(30-100)
跑马灯:动态流动效果,当前灯珠高亮后续渐暗
火焰模拟:随机红/黄/橙色跳动
高级灯效
流星拖尾:可自定义颜色(默认橙色)
彩虹呼吸:呼吸叠加彩虹色周期切换
音乐同步:模拟频谱响应(蓝→红渐变)
控制特性
灯效序列:预设场景模式(如“派对模式”自动切换多个效果)
速度调节:1-10级动态效果速度
材料清单
- M10 X1
- M10扩展板 X1
- 乐动小智AI X1
- 24灯RGB灯环 X1
步骤1 行空板M10上安装pyenv环境和Python 3.12.7及MCP库安装
步骤2 小智聊天机器人配置
获取MCP接入点。

步骤3 M10程序编写
硬件与依赖
硬件:Unihiker开发板 + NeoPixel灯环(24灯珠,P23引脚)
关键库:
pinpong(硬件控制)
websockets(网络通信)
asyncio(异步任务调度)
设计目标:简化控制逻辑,确保灯效流畅切换;模块化函数便于扩展效果;异步架构保障实时响应。适合物联网教学及智能硬件开发场景。
mcp服务文件,move.py。
功能描述:控制灯环点亮、熄灭,各种灯效。
# move.py (移除亮度调节版)
from mcp.server.fastmcp import FastMCP
import logging
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
"""设置灯环为彩虹渐变色"""
return {"success": True, "result": "设置灯环为彩虹渐变色"}
@mcp.tool()
def close_light() -> dict:
"""熄灭灯环"""
return {"success": True, "result": "熄灭灯环"}
@mcp.tool()
def set_light_blink(color: str) -> dict:
"""设置灯环闪烁效果。color: 颜色(如:红色)"""
return {"success": True, "result": f"设置灯环{color}闪烁"}
@mcp.tool()
def set_light_breathe(color: str) -> dict:
"""设置灯环呼吸效果。color: 颜色(如:蓝色)"""
return {"success": True, "result": f"设置灯环{color}呼吸"}
@mcp.tool()
def set_running_lights(color: str) -> dict:
"""设置跑马灯效果。color: 颜色(如:黄色)"""
return {"success": True, "result": f"设置灯环{color}跑马灯"}
@mcp.tool()
def set_fire_effect() -> dict:
"""设置火焰效果"""
return {"success": True, "result": "设置灯环火焰效果"}
# 新增功能
@mcp.tool()
def set_meteor_effect(color: str = "橙色") -> dict:
"""设置流星效果。color: 流星颜色(默认:橙色)"""
return {"success": True, "result": f"设置{color}流星效果"}
@mcp.tool()
def set_rainbow_breathe() -> dict:
"""设置彩虹呼吸效果"""
return {"success": True, "result": "设置彩虹呼吸效果"}
@mcp.tool()
def set_music_sync() -> dict:
"""设置音乐同步效果"""
return {"success": True, "result": "设置音乐同步效果"}
@mcp.tool()
def set_effect_sequence() -> dict:
"""设置灯效组合序列"""
return {"success": True, "result": "设置灯效组合序列"}
@mcp.tool()
def set_speed(speed_level: int) -> dict:
"""设置灯效速度。speed_level: 速度级别 (1-10)"""
return {"success": True, "result": f"设置速度为{speed_level}级"}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
小智AI管道文件mcp_pipe.py。填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
# mcp_pipe.py (增强版) - 亮度调节优化版
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
import math
import json
import time
from pinpong.board import Board, Pin, NeoPixel
# 初始化硬件
Board().begin()
u_gui = GUI()
# 初始化单个灯环 (P23引脚, 24灯)
pin23 = Pin(Pin.P23)
np = NeoPixel(pin23, 24)
np.brightness(100) # 设置默认亮度
# 全局变量
current_effect = None # 当前运行的效果任务
current_brightness = 100 # 当前亮度 (0-255)
effect_speed = 1.0 # 效果速度系数 (1.0为正常速度)
# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
# ===================== 基础灯效函数 =====================
async def rainbow_rotate():
"""彩虹色循环旋转效果"""
speed = 0.1 / effect_speed
np.rainbow(0, np.num - 1, 1, 360)
try:
while True:
np.brightness(current_brightness) # 应用当前亮度
np.rotate(1)
await asyncio.sleep(speed)
except asyncio.CancelledError:
logger.info("彩虹旋转中断")
async def blink(color):
"""闪烁效果"""
interval = 0.5 / effect_speed
try:
while True:
np.brightness(current_brightness) # 应用当前亮度
np.range_color(0, np.num, color)
await asyncio.sleep(interval)
np.clear()
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("闪烁中断")
async def breathe(color):
"""呼吸灯效果 - 已修复亮度响应问题"""
interval = 0.03 / effect_speed
try:
current_bright = max(20, current_brightness // 3) # 初始化为最小亮度
direction = 1
while True:
# 每次循环都重新获取当前的最大亮度和最小亮度
min_bright = max(20, current_brightness // 3)
max_bright = current_brightness
np.range_color(0, np.num, color)
np.brightness(current_bright)
await asyncio.sleep(interval)
current_bright += direction * 2
if current_bright >= max_bright:
current_bright = max_bright
direction = -1
elif current_bright <= min_bright:
current_bright = min_bright
direction = 1
except asyncio.CancelledError:
logger.info("呼吸灯中断")
async def running_lights(color):
"""跑马灯效果"""
interval = 0.1 / effect_speed
try:
np.clear()
while True:
np.brightness(current_brightness) # 应用当前亮度
for i in range(np.num):
np[i] = color
if i > 0:
np[i-1] = (color >> 1) & 0x7F7F7F # 半亮
if i > 1:
np[i-2] = 0 # 熄灭
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("跑马灯中断")
async def fire_effect():
"""火焰效果 - 已添加亮度支持"""
interval = 0.05 / effect_speed
try:
heat = [0] * np.num
while True:
# 冷却
for i in range(np.num):
heat[i] = max(0, heat[i] - random.randint(0, 5))
# 传播热量
for i in range(np.num - 3, 0, -1):
heat[i] = (heat[i-1] + heat[i-2] + heat[i-2]) // 3
# 添加火花
if random.randint(0, 255) < 120:
y = random.randint(0, 7)
heat[y] = min(255, heat[y] + random.randint(160, 255))
# 转换为颜色并应用亮度
brightness_factor = current_brightness / 255.0
for i in range(np.num):
# 热色映射 (黑->红->黄->白)
t = heat[i]
if t < 85:
r = t * 3
g = 0
b = 0
elif t < 170:
r = 255
g = (t - 85) * 3
b = 0
else:
r = 255
g = 255
b = (t - 170) * 3
# 应用亮度因子
r = int(r * brightness_factor)
g = int(g * brightness_factor)
b = int(b * brightness_factor)
np[i] = (min(255, r) << 16) | (min(255, g) << 8) | min(255, b)
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("火焰效果中断")
# ===================== 新增灯效函数 =====================
async def meteor_effect(color=0xFFA500, tail_length=10, decay=0.7):
"""流星效果 - 已添加亮度支持"""
interval = 0.05 / effect_speed
try:
while True:
# 创建新流星
meteor_pos = 0
meteor_brightness = 1.0
meteor_pixels = [0] * np.num
while meteor_pos < np.num + tail_length:
# 清除上一帧
np.clear()
# 绘制流星并应用全局亮度
brightness_factor = current_brightness / 255.0
for i in range(tail_length):
pos = meteor_pos - i
if 0 <= pos < np.num:
# 计算亮度衰减
brightness = meteor_brightness * (decay ** i) * brightness_factor
r = int(((color >> 16) & 0xFF) * brightness)
g = int(((color >> 8) & 0xFF) * brightness)
b = int((color & 0xFF) * brightness)
meteor_pixels[pos] = (r << 16) | (g << 8) | b
np[pos] = meteor_pixels[pos]
meteor_pos += 1
await asyncio.sleep(interval)
# 流星消失后添加一些星星
for _ in range(5):
np.brightness(current_brightness) # 应用当前亮度
for i in range(5):
pos = random.randint(0, np.num-1)
np[pos] = 0xFFFFFF
await asyncio.sleep(0.2)
np.clear()
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("流星效果中断")
async def rainbow_breathe():
"""彩虹呼吸效果 - 已修复亮度响应问题"""
interval = 0.03 / effect_speed
hue = 0
try:
current_bright = max(20, current_brightness // 3) # 初始化为最小亮度
direction = 1
while True:
# 每次循环都重新计算最小和最大亮度
min_bright = max(20, current_brightness // 3)
max_bright = current_brightness
# 计算当前颜色
r, g, b = hsv_to_rgb(hue, 1.0, 1.0)
color = (int(r*255) << 16) | (int(g*255) << 8) | int(b*255)
# 设置所有灯珠
np.range_color(0, np.num, color)
np.brightness(current_bright)
# 更新亮度和色相
current_bright += direction * 2
if current_bright >= max_bright:
current_bright = max_bright
direction = -1
elif current_bright <= min_bright:
current_bright = min_bright
direction = 1
hue = (hue + 30) % 360 # 每次呼吸周期改变颜色
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("彩虹呼吸效果中断")
def hsv_to_rgb(h, s, v):
"""HSV转RGB颜色"""
h = h % 360
c = v * s
x = c * (1 - abs((h / 60) % 2 - 1))
m = v - c
if 0 <= h < 60:
r, g, b = c, x, 0
elif 60 <= h < 120:
r, g, b = x, c, 0
elif 120 <= h < 180:
r, g, b = 0, c, x
elif 180 <= h < 240:
r, g, b = 0, x, c
elif 240 <= h < 300:
r, g, b = x, 0, c
else: # 300 <= h < 360
r, g, b = c, 0, x
return r + m, g + m, b + m
async def music_sync_effect():
"""音乐同步效果 (模拟) - 已添加亮度支持"""
try:
# 创建频谱数据 (模拟音乐频谱)
spectrum = [0.1] * 8
decay = 0.9
while True:
# 随机添加"音乐节拍"
if random.random() < 0.3:
band = random.randint(0, 7)
spectrum[band] = min(1.0, spectrum[band] + random.uniform(0.2, 0.8))
# 将频谱映射到灯环并应用亮度
brightness_factor = current_brightness / 255.0
for i in range(np.num):
# 将灯珠分组对应频谱带
band_idx = i % 8
value = spectrum[band_idx]
# 计算颜色 (从蓝到红) 并应用亮度
r = int(255 * value * brightness_factor)
g = int(100 * value * brightness_factor)
b = int(255 * (1 - value) * brightness_factor)
np[i] = (r << 16) | (g << 8) | b
# 频谱衰减
for i in range(8):
spectrum[i] = max(0.1, spectrum[i] * decay)
await asyncio.sleep(0.05 / effect_speed)
except asyncio.CancelledError:
logger.info("音乐同步效果中断")
# ===================== 灯效组合序列 =====================
async def effect_sequence():
"""灯效组合序列"""
try:
# 序列1: 派对模式
sequences = {
"派对模式": [
(rainbow_rotate, 5.0),
(lambda: blink(0xFF0000), 2.0),
(lambda: blink(0x00FF00), 2.0),
(lambda: blink(0x0000FF), 2.0),
(meteor_effect, 8.0)
],
"浪漫氛围": [
(lambda: breathe(0xFF69B4), 4.0), # 粉色
(lambda: breathe(0x800080), 4.0), # 紫色
(rainbow_breathe, 8.0)
],
"火焰派对": [
(fire_effect, 6.0),
(lambda: meteor_effect(0xFF4500), 6.0), # 橙红色
(fire_effect, 6.0),
(lambda: meteor_effect(0xFFD700), 6.0) # 金色
],
"彩虹狂欢": [
(rainbow_rotate, 8.0),
(rainbow_breathe, 8.0),
(lambda: running_lights(0x00FF00), 4.0),
(lambda: running_lights(0xFF0000), 4.0),
(lambda: running_lights(0x0000FF), 4.0)
]
}
# 随机选择一个序列
seq_name, seq = random.choice(list(sequences.items()))
logger.info(f"开始灯效序列: {seq_name}")
# 执行序列
for effect_func, duration in seq:
stop_current_effect()
current_effect = asyncio.create_task(effect_func())
await asyncio.sleep(duration)
# 序列结束后恢复彩虹旋转
stop_current_effect()
current_effect = asyncio.create_task(rainbow_rotate())
except asyncio.CancelledError:
logger.info("灯效序列中断")
# ===================== 工具函数 =====================
def stop_current_effect():
"""停止当前效果"""
global current_effect
if current_effect and not current_effect.done():
current_effect.cancel()
current_effect = None
def get_color_by_name(color_str):
"""根据名称获取颜色值"""
color_map = {
"红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
"黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF,
"白色": 0xFFFFFF, "橙色": 0xFFA500, "粉色": 0xFFC0CB,
"金色": 0xFFD700, "品红": 0xFF00FF, "青色": 0x00FFFF
}
return color_map.get(color_str, 0xFF0000) # 默认为红色
def set_brightness(level):
"""设置亮度 (0-100) - 优化亮度映射"""
global current_brightness
# 将0-100映射到20-255 (保留最低亮度)
mapped_level = max(0, min(100, int(level)))
current_brightness = max(20, min(255, int(mapped_level * 2.55)))
logger.info(f"亮度设置为: {mapped_level}% (实际亮度值: {current_brightness})")
np.brightness(current_brightness) # 立即应用亮度变化
def set_speed(speed_level):
"""设置速度 (1-10)"""
global effect_speed
# 1-10 映射到 0.5-2.0
speed_level = max(1, min(10, int(speed_level)))
effect_speed = 0.5 + (speed_level - 1) * 0.15
logger.info(f"速度设置为: {speed_level}级 (实际速度系数: {effect_speed:.2f})")
# ===================== 网络通信部分 =====================
async def connect_with_retry(uri):
"""带重连机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f}秒后重试...")
await asyncio.sleep(wait_time)
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
async def connect_to_server(uri):
"""连接到WebSocket服务器"""
global reconnect_attempt, backoff
try:
logger.info("连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info("WebSocket连接成功")
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
process = subprocess.Popen(
['python', 'move.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"启动 move.py 进程")
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
if 'process' in locals():
logger.info(f"终止 move.py 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
async def pipe_websocket_to_process(websocket, process):
"""WebSocket到进程的管道"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""进程到WebSocket的管道"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
if not data:
break
logger.debug(f">> {data[:120]}...")
await handle_device_commands(data, websocket)
except Exception as e:
logger.error(f"进程到WebSocket错误: {e}")
raise
async def handle_device_commands(data, websocket):
"""处理设备控制命令"""
global current_effect
try:
json_data = json.loads(data)
if json_data['id'] > 1 and json_data.get('result', {}):
content = json_data['result']['content'][0]['text']
text_data = json.loads(content)
if text_data['success']:
cmd_result = text_data['result']
logger.info(f"收到命令: {cmd_result}")
# 停止之前的灯效
stop_current_effect()
# ========== 基础灯效控制 ==========
if "彩虹" in cmd_result or "渐变色" in cmd_result:
current_effect = asyncio.create_task(rainbow_rotate())
logger.info("启动彩虹旋转效果")
elif "熄灭" in cmd_result:
np.clear()
logger.info("灯环已熄灭")
elif "闪烁" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色", "金色", "品红"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(blink(color))
logger.info(f"启动{color_str}闪烁效果")
elif "呼吸" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色", "金色", "品红"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(breathe(color))
logger.info(f"启动{color_str}呼吸效果")
elif "跑马灯" in cmd_result:
# 尝试提取颜色名称
color_str = "红色" # 默认红色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色", "金色", "品红"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(running_lights(color))
logger.info(f"启动{color_str}跑马灯效果")
elif "火焰" in cmd_result:
current_effect = asyncio.create_task(fire_effect())
logger.info("启动火焰效果")
# ========== 新增灯效控制 ==========
elif "流星" in cmd_result:
# 尝试提取颜色名称
color_str = "橙色" # 默认橙色
for color_name in ["红色", "绿色", "蓝色", "黄色", "紫色", "青色",
"白色", "橙色", "粉色", "金色", "品红"]:
if color_name in cmd_result:
color_str = color_name
break
color = get_color_by_name(color_str)
current_effect = asyncio.create_task(meteor_effect(color))
logger.info(f"启动{color_str}流星效果")
elif "彩虹呼吸" in cmd_result:
current_effect = asyncio.create_task(rainbow_breathe())
logger.info("启动彩虹呼吸效果")
elif "音乐" in cmd_result or "节奏" in cmd_result:
current_effect = asyncio.create_task(music_sync_effect())
logger.info("启动音乐同步效果")
elif "序列" in cmd_result or "组合" in cmd_result:
current_effect = asyncio.create_task(effect_sequence())
logger.info("启动灯效组合序列")
# ========== 设置控制 ==========
elif "亮度" in cmd_result:
# 尝试提取亮度值 (0-100)
level = 70 # 默认亮度
for i in range(0, 101, 10):
if str(i) in cmd_result:
level = i
break
set_brightness(level)
elif "速度" in cmd_result:
# 尝试提取速度值 (1-10)
speed_level = 5 # 默认速度
for i in range(1, 11):
if str(i) in cmd_result:
speed_level = i
break
set_speed(speed_level)
await websocket.send(data)
except json.JSONDecodeError as je:
logger.error(f"JSON解析错误: {je}, 原始数据: {data}")
except KeyError as ke:
logger.error(f"缺少必要字段: {ke}, 原始数据: {data}")
except Exception as e:
logger.error(f"命令处理错误: {e}")
async def pipe_process_stderr_to_terminal(process):
"""进程错误输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not data: break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"错误输出处理错误: {e}")
def signal_handler(sig, frame):
"""中断信号处理"""
logger.info("收到中断信号,正在关闭...")
# 停止当前灯效
stop_current_effect()
# 熄灭灯环
np.clear()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MzE1NjQ5MH0.v4laJkUfHNgsKY1NMnh56NEyNnULeEhnQa-YZX4gt_-afJZ_-G6S_07E80GkLiwmA9gKHqBiXq1Or-bJ0bVEiA"
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
在Mind+中运行mcp_pipe.py。
等待成功连接服务器,期间可能有红字报连不上服务器,耐心等待(有时快,有时慢)。
用乐动小智和M10(MCP)互动控制灯光。
本项目代码由DeepSeek协助完成,致谢。
评论