7.15
[写在前面]
在5月份时,我曾经做过一个hello world墙花的小项目,在夜色中的窗边,它发出七彩炫丽的光芒。

当小智AI遇到它,现代和传统将会碰擦出什么样子的火花。

[目标预设]
1、行空板M10+电池扩展板组合给hello world墙花供电。
2、用小智AI通过MCP控制hello world墙花RGB灯带色彩和亮度的变化。
材料清单
- M10+电池扩展板组合 X1
- hello world墙花RGB灯板 X1
- 充电宝 X1
- 乐动小智 X1
步骤1 准备hello world RGB 灯板
步骤2 行空板M10上安装pyenv环境、Python 3.12.7和mcp库
步骤3 配置小智AI
步骤4 M10程序编写
1、mcp服务文件,move.py。
功能描述:控制行空板M10 引脚P23 RGB灯光点亮、熄灭,亮度、颜色调整,特别增加彩虹渐变色流水灯效。
# move.py
# MCP服务器设备控制工具
# 简化版
from mcp.server.fastmcp import FastMCP
import logging
# 配置日志
logger = logging.getLogger('MoveServer')
mcp = FastMCP("MoveServer")
# 设备控制工具
@mcp.tool()
def set_light_rainbow() -> dict:
"""点亮灯环(彩虹色),行空板M10引脚P23RGB灯环设为彩虹渐变色。"""
return {"success": True, "result": "灯环设为彩虹渐变色"}
@mcp.tool()
def open_light() -> dict:
"""点亮灯环(红色),行空板M10引脚P23RGB灯环点亮。"""
return {"success": True, "result": "灯环已点亮"}
@mcp.tool()
def close_light() -> dict:
"""熄灭灯环,行空板M10引脚P23RGB灯环熄灭。"""
return {"success": True, "result": "灯环已熄灭"}
@mcp.tool()
def set_light_color(color: str) -> dict:
"""设置行空板M10引脚P23RGB灯环颜色,支持: 红色、绿色,蓝色,黄色,紫色,青色,白色"""
return {"success": True, "result": f"设置颜色为{color}"}
@mcp.tool()
def set_light_brightness(brightness: int) -> dict:
"""设置行空板M10引脚P23RGB灯环亮度(0-255)"""
return {"success": True, "result": f"设置亮度为{brightness}"}
# 启动服务器
if __name__ == "__main__":
mcp.run(transport="stdio")
小智AI管道文件mcp_pipe.py。
新建文件mcp_pipe.py,修改代码,注意填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
加入了开机画面。

# mcp_pipe.py
# 连接MCP服务器并通过WebSocket传输输入输出
# 简化版
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import signal
import sys
import random
from pinpong.board import Board, Pin, NeoPixel
import json
# 初始化硬件
Board().begin()
u_gui = GUI()
pin1 = Pin(Pin.P21)
np1 = NeoPixel(pin1,84)
np1.brightness(100)
# 全局变量
rotating = False
rotate_task = None
# 初始化UI
u_gui.draw_image(image="back.png", x=0, y=0)
u_gui.draw_text(text="M10RGB助手", x=10, y=5, font_size=25, color="#FF00FF")
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP_PIPE')
# 重连设置
INITIAL_BACKOFF, MAX_BACKOFF = 1, 600
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
# 彩虹旋转函数
async def rainbow_rotate():
"""彩虹色循环旋转效果"""
global rotating
rotating = True
try:
while rotating:
np1.rotate(1)
await asyncio.sleep(0.1) # 控制旋转速度
except asyncio.CancelledError:
logger.info("彩虹旋转中断")
finally:
rotating = False
async def connect_with_retry(uri):
"""带重连机制的WebSocket连接"""
global reconnect_attempt, backoff
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f}秒后重试...")
await asyncio.sleep(wait_time)
await connect_to_server(uri)
except Exception as e:
reconnect_attempt += 1
backoff = min(backoff * 2, MAX_BACKOFF)
logger.warning(f"连接关闭 (尝试: {reconnect_attempt}): {e}")
async def connect_to_server(uri):
"""连接到WebSocket服务器"""
global reconnect_attempt, backoff
try:
logger.info("连接WebSocket服务器...")
async with websockets.connect(uri) as websocket:
logger.info("WebSocket连接成功")
reconnect_attempt, backoff = 0, INITIAL_BACKOFF
process = subprocess.Popen(
['python', mcp_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"启动 {mcp_script} 进程")
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket),
pipe_process_stderr_to_terminal(process)
)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket连接关闭: {e}")
raise
except Exception as e:
logger.error(f"连接错误: {e}")
raise
finally:
if 'process' in locals():
logger.info(f"终止 {mcp_script} 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
async def pipe_websocket_to_process(websocket, process):
"""WebSocket到进程的管道"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< {message[:120]}...")
process.stdin.write(message + '\n')
process.stdin.flush()
except Exception as e:
logger.error(f"WebSocket到进程错误: {e}")
raise
finally:
if not process.stdin.closed:
process.stdin.close()
async def pipe_process_to_websocket(process, websocket):
"""进程到WebSocket的管道"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
if not data:
break
logger.debug(f">> {data[:120]}...")
await handle_device_commands(data, websocket)
except Exception as e:
logger.error(f"进程到WebSocket错误: {e}")
raise
async def handle_device_commands(data, websocket):
"""处理设备控制命令"""
global rotate_task
try:
json_str = json.loads(data)
if json_str['id'] > 1 and json_str.get('result', {}):
text = json.loads(json_str['result']['content'][0]['text'])
if text['success']:
cmd_result = text['result']
# 先停止任何正在进行的旋转
if rotate_task and not rotate_task.done():
rotate_task.cancel()
try:
await rotate_task
except asyncio.CancelledError:
pass
rotate_task = None
# RGB灯环控制
if "点亮" in cmd_result:
np1.range_color(0, 83, 0xFF0000)
elif "彩虹" in cmd_result:
np1.rainbow(0, 83, 1, 360)
# 启动彩虹旋转任务
rotate_task = asyncio.create_task(rainbow_rotate())
elif "渐变色" in cmd_result:
np1.rainbow(0, 83, 1, 360)
# 启动彩虹旋转任务
rotate_task = asyncio.create_task(rainbow_rotate())
elif "熄灭" in cmd_result:
np1.clear()
elif "设置颜色" in cmd_result:
handle_color(cmd_result)
elif "设置亮度" in cmd_result:
handle_brightness(cmd_result)
await websocket.send(data)
except Exception as e:
logger.error(f"命令处理错误: {e}")
def handle_color(cmd_result):
"""处理颜色设置"""
color_str = cmd_result.split("为")[-1].strip()
color_map = {
"红色": 0xFF0000, "绿色": 0x00FF00, "蓝色": 0x0000FF,
"黄色": 0xFFFF00, "紫色": 0x800080, "青色": 0x00FFFF, "白色": 0xFFFFFF
}
np1.range_color(0, 83, color_map.get(color_str, 0xFF0000))
def handle_brightness(cmd_result):
"""处理亮度设置"""
brightness_str = cmd_result.split("为")[-1].strip()
try:
brightness = max(0, min(int(brightness_str), 255))
np1.brightness(brightness)
except ValueError:
logger.error(f"无效亮度值: {brightness_str}")
async def pipe_process_stderr_to_terminal(process):
"""进程错误输出到终端"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not data: break
sys.stderr.write(data)
sys.stderr.flush()
except Exception as e:
logger.error(f"错误输出处理错误: {e}")
def signal_handler(sig, frame):
"""中断信号处理"""
logger.info("收到中断信号,正在关闭...")
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
mcp_script = "move.py"
endpoint_url = "wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6NDYwNjExLCJlbmRwb2ludElkIjoiYWdlbnRfNDYwNjExIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MjU0NjM1OH0.7y6CVkzE2dqILqBBoImu0qenKuPhMy27Yi-95lCawmXU_tSGaHcF5f8k6Vbp6kWYLjwVe6v2M-0TBOeoRVxzvQ"
try:
asyncio.run(connect_with_retry(endpoint_url))
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
【关于程序】
工作原理:
当收到"渐变色"命令时,创建异步旋转任务
任务在后台以100ms间隔循环旋转灯环
当收到其他命令(点亮/熄灭/设置颜色/设置亮度)时:
取消正在进行的旋转任务
执行新命令的操作
旋转循环会自动退出
注意事项:
旋转速度调整:修改 await asyncio.sleep(0.1) 中的值:
值越小旋转越快(0.05秒更快)
值越大旋转越慢(0.2秒更慢)
硬件兼容性:确保 np1.rotate(1) 在硬件上正常工作:
参数 1 表示每次旋转1个LED
如果旋转方向不对,可尝试 -1 反向旋转
内存管理:任务取消后,Python的垃圾回收会处理任务对象,无需手动释放。
这种实现方式既保持了彩虹效果的流畅性,又能及时响应其他命令中断旋转,同时充分利用了异步编程的优势,不会阻塞主程序执行。
在Mind+中运行mcp_pipe.py。
稍等一会,等待成功连接服务器。
可以用乐动小智和M10(MCP)互动控制灯带了。
【小结】
1、这个用DeepSeek辅助完成的作业,重点是增加了RGB灯带彩虹渐变色灯效。
2、84颗灯,有点费电,电池扩展板的电池还是小了点,加了小充电宝支援一下。
3、程序先用灯环16灯写的,后来改hello world时代码中灯环没有修改成灯带,小智不知道不会自动改口的。
4、DeepSeek是个很好的老师,可以帮助解答问题,修改程序。
评论