7.7
【项目目标】
前面两个任务,都在学习小智AI和M10(MCP),在这个任务中,我将尝试用乐动小智和行空板M10(MCP)组成智能家居助手,控制RGB灯环的亮灭,小风扇的启停,同时尝试获取dht22传感器实时数据。

材料清单
- 乐动掌控2.0小智 X1
- 行空板M10 X1
- M10扩展板组合 X1
- 小风扇、灯环 X2
- dht22温湿度传感器 X1
步骤1 乐动掌控2.0安装小智AI
1、下载安装mpython 0.8.7,连接乐动掌控,选择硬件。
2、刷小智固件。

3、给小智配网。

WiFi连接成功后,屏幕上会出现小智平台网址和“设备码”。
4、设置小智AI。
登录https://xiaozhi.me小智机器人官网,进入“控制台”,添加小智机器人。
添加小智机器人,需要点击“新建智能体”菜单,新建智能体,例如起名“乐动掌控”。

配置角色:

管理设备--“添加设备”按钮,在弹出的对话框中填写设备码,设备码为屏幕上的数字,确定。
这样,乐动掌控2.0成功变身为“小智聊天机器人”,可以畅快的与之对话了。
聊天后就可使用声纹识别功能,添加说话人了。

挺简单的。
获取MCP接入点,复制接入点地址备用。

测试了下,乐动掌控2.0的摄像头目前没有加入互动,期待早日实现拍照识物功能。
步骤2 行空板M10上安装pyenv环境和Python 3.12.7
这一步已在前面项目中按照官方教程完成,记录在:M10仰望小车(MCP)的学习与尝试
https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714
步骤3 硬件连接
步骤4 M10程序编写
1、mcp服务文件,move.py。
功能描述:控制M10风扇开启、关闭,风量调节,灯光点亮、熄灭,亮度调整,获取实时温度、湿度。
"""
真实硬件版本:MCP服务器设备控制工具
版本: 0.3.0
"""
import sys
import logging
import time
from typing import Dict
from mcp.server.fastmcp import FastMCP
from pinpong.board import Board, Pin, DHT22, NeoPixel
# 配置日志系统
def setup_logger() -> logging.Logger:
logger = logging.getLogger('MoveServer')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
file_handler = logging.FileHandler('move_server.log')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
try:
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
import io
sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8')
# 创建日志记录器
logger = setup_logger()
# 创建MCP服务器
mcp = FastMCP("MoveServer")
# ======================= 硬件配置 =======================
HARDWARE_CONFIG = {
"FAN_PIN": 21, # 风扇控制引脚
"SENSOR_PIN": 22, # 温湿度传感器引脚
"LIGHT_PIN": 23, # 灯带控制引脚
"NUM_LEDS": 16 # LED数量
}
# ======================= 设备控制工具 =======================
class HardwareController:
"""真实硬件操作类"""
def __init__(self, config: dict):
"""初始化真实硬件"""
try:
# 初始化开发板
Board().begin()
# 初始化风扇控制引脚
self.fan_pin = Pin(config["FAN_PIN"], Pin.OUT)
# 初始化温湿度传感器
self.dht_sensor = DHT22(Pin(config["SENSOR_PIN"]))
# 初始化灯带
self.light_pin = NeoPixel(Pin(config["LIGHT_PIN"]), config["NUM_LEDS"])
self.light_pin.brightness(100)
logger.info("真实硬件初始化成功")
except Exception as e:
logger.critical(f"硬件初始化失败: {e}")
raise
def control_fan(self, state: bool) -> str:
try:
self.fan_pin.write_digital(1 if state else 0)
return f"风扇已{'开启' if state else '关闭'}"
except Exception as e:
logger.error(f"风扇控制失败: {e}")
return f"风扇控制失败: {str(e)}"
def control_light(self, state: bool) -> str:
try:
if state:
self.light_pin.range_color(0, self.light_pin.num, 0xFF0000) # 红色
else:
self.light_pin.clear()
return f"灯带已{'点亮' if state else '熄灭'}"
except Exception as e:
logger.error(f"灯带控制失败: {e}")
return f"灯带控制失败: {str(e)}"
def get_temperature(self) -> str:
"""获取真实温度数据"""
try:
# 尝试最多3次读取,防止偶尔读取失败
for _ in range(3):
try:
temp = self.dht_sensor.temp_c()
return f"当前温度: {temp}℃"
except:
time.sleep(0.2) # 短暂延迟后重试
return "温度读取失败"
except Exception as e:
logger.error(f"温度读取失败: {e}")
return f"温度读取失败: {str(e)}"
def get_humidity(self) -> str:
"""获取真实湿度数据"""
try:
for _ in range(3):
try:
humidity = self.dht_sensor.humidity()
return f"当前湿度: {humidity}%"
except:
time.sleep(0.2)
return "湿度读取失败"
except Exception as e:
logger.error(f"湿度读取失败: {e}")
return f"湿度读取失败: {str(e)}"
def tool_response(success: bool, result: str) -> Dict[str, object]:
return {
"success": success,
"result": result
}
# 创建硬件控制器实例
try:
hardware_controller = HardwareController(HARDWARE_CONFIG)
except Exception as e:
logger.critical("硬件初始化失败,服务器无法启动")
sys.exit(1)
@mcp.tool()
def open_fan() -> Dict[str, object]:
logger.info("执行风扇开启命令")
result = hardware_controller.control_fan(True)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def close_fan() -> Dict[str, object]:
logger.info("执行风扇关闭命令")
result = hardware_controller.control_fan(False)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def open_light() -> Dict[str, object]:
logger.info("执行灯带开启命令")
result = hardware_controller.control_light(True)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def close_light() -> Dict[str, object]:
logger.info("执行灯带关闭命令")
result = hardware_controller.control_light(False)
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def get_temp() -> Dict[str, object]:
logger.info("读取温度数据")
result = hardware_controller.get_temperature()
logger.info(result)
return tool_response("失败" not in result, result)
@mcp.tool()
def get_humidity() -> Dict[str, object]:
logger.info("读取湿度数据")
result = hardware_controller.get_humidity()
logger.info(result)
return tool_response("失败" not in result, result)
# 启动服务器
if __name__ == "__main__":
try:
logger.info("启动MCP服务器...")
logger.info(f"硬件配置: {HARDWARE_CONFIG}")
mcp.run(transport="stdio")
logger.info("MCP服务器已停止")
except Exception as e:
logger.critical(f"服务器运行失败: {e}")
sys.exit(1)
修改小智AI管道文件mcp_pipe.py。
新建文件mcp_pipe.py,填入自己的MCP接入点地址。
功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。
加入了开机画面。
"""
优化版本:连接MCP服务器并通过WebSocket传输输入输出
版本: 0.2.0
用法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""
# -*- coding: UTF-8 -*-
import asyncio
import json
import logging
import signal
import sys
import random
import subprocess
from unihiker import GUI
import websockets
from pinpong.board import Board, Pin, DHT22, NeoPixel
# 全局常量
INITIAL_BACKOFF = 1 # 初始重连等待时间(秒)
MAX_BACKOFF = 600 # 最大重连等待时间(秒)
MCP_SCRIPT = "move.py" # MCP脚本名称
# 配置日志系统
def setup_logger():
"""配置并返回日志记录器"""
logger = logging.getLogger('MCP_PIPE_OPT')
logger.setLevel(logging.INFO)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 创建文件处理器
file_handler = logging.FileHandler('mcp_pipe.log')
file_handler.setLevel(logging.DEBUG)
# 创建日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 添加处理器到日志记录器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
# 硬件初始化
def init_hardware():
"""初始化所有硬件设备并返回对象"""
try:
# 初始化开发板
Board().begin()
# 初始化GUI
gui = GUI()
gui.draw_image(image="back.png", x=0, y=0)
gui.draw_text(
text="M10智能助手",
x=10,
y=20,
font_size=25,
color="#FF00FF"
)
# 初始化引脚
pin1 = Pin(Pin.P21, Pin.OUT)
pin2 = Pin(Pin.P22, Pin.IN)
pin3 = Pin(Pin.P23, Pin.PWM)
# 初始化传感器
dht = DHT22(pin2)
# 初始化NeoPixel
np = NeoPixel(pin3, 16)
np.brightness(100)
logger.info("硬件初始化成功")
return {
'gui': gui,
'pin1': pin1,
'pin2': pin2,
'pin3': pin3,
'dht': dht,
'np': np
}
except Exception as e:
logger.error(f"硬件初始化失败: {e}")
raise
# 处理硬件控制命令
def handle_hardware_command(command: str, hardware: dict):
"""
根据命令执行硬件操作
参数:
command: 接收到的命令字符串
hardware: 包含所有硬件对象的字典
"""
try:
if "开启" in command:
hardware['pin1'].write_digital(1)
logger.info("设备已开启")
elif "关闭" in command:
hardware['pin1'].write_digital(0)
logger.info("设备已关闭")
elif "点亮" in command:
hardware['np'].range_color(0, 15, 0xFF0000)
logger.info("灯带已点亮")
elif "熄灭" in command:
hardware['np'].clear()
logger.info("灯带已熄灭")
elif "温度" in command:
temp = hardware['dht'].temp_c()
logger.info(f"当前温度: {temp}℃")
elif "湿度" in command:
humidity = hardware['dht'].humidity()
logger.info(f"当前湿度: {humidity}%")
except Exception as e:
logger.error(f"执行硬件命令时出错: {command} - {e}")
# 处理JSON数据
def process_json_data(data: str, hardware: dict):
"""
处理JSON数据并执行相应的硬件操作
参数:
data: JSON格式的字符串数据
hardware: 包含所有硬件对象的字典
"""
try:
json_data = json.loads(data)
message_id = json_data.get('id', 0)
# 只处理ID大于1的消息
if message_id <= 1:
return
result = json_data.get('result', {})
if not result:
return
# 提取内容文本
contents = result.get('content', [])
if not contents:
return
first_content = contents[0]
text_content = first_content.get('text', '')
if not text_content:
return
# 尝试解析内部JSON
try:
inner_data = json.loads(text_content)
if inner_data.get('success', False):
command_result = inner_data.get('result', '')
if command_result:
handle_hardware_command(command_result, hardware)
except json.JSONDecodeError:
# 如果不是JSON,直接作为命令处理
handle_hardware_command(text_content, hardware)
except Exception as e:
logger.error(f"处理JSON数据时出错: {e}\n原始数据: {data[:200]}")
# WebSocket到进程的管道
async def pipe_websocket_to_process(websocket, process):
"""从WebSocket读取数据并写入进程的stdin"""
try:
while True:
message = await websocket.recv()
logger.debug(f"<< 收到消息: {message[:120]}...")
if isinstance(message, bytes):
message = message.decode('utf-8')
try:
process.stdin.write(message + '\n')
process.stdin.flush()
except BrokenPipeError:
logger.error("进程stdin管道已断开")
break
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket连接已关闭")
except Exception as e:
logger.error(f"WebSocket到进程管道错误: {e}")
finally:
if not process.stdin.closed:
process.stdin.close()
# 进程到WebSocket的管道
async def pipe_process_to_websocket(process, websocket, hardware):
"""从进程的stdout读取数据并发送到WebSocket"""
try:
while True:
data = await asyncio.get_event_loop().run_in_executor(
None, process.stdout.readline
)
if not data:
logger.info("进程输出结束")
break
logger.debug(f">> 发送消息: {data[:120]}...")
# 处理数据并控制硬件
process_json_data(data, hardware)
# 发送到WebSocket
await websocket.send(data)
except Exception as e:
logger.error(f"进程到WebSocket管道错误: {e}")
# 进程stderr到终端的管道
async def pipe_process_stderr_to_terminal(process):
"""从进程的stderr读取数据并打印到终端"""
try:
while True:
err_data = await asyncio.get_event_loop().run_in_executor(
None, process.stderr.readline
)
if not err_data:
logger.info("进程错误输出结束")
break
sys.stderr.write(err_data)
sys.stderr.flush()
except Exception as e:
logger.error(f"进程stderr管道错误: {e}")
# 连接WebSocket服务器
async def connect_to_server(uri, hardware):
"""连接到WebSocket服务器并建立与MCP脚本的双向通信"""
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
while True:
try:
if reconnect_attempt > 0:
wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
logger.info(f"等待 {wait_time:.2f} 秒后尝试重新连接 #{reconnect_attempt}...")
await asyncio.sleep(wait_time)
logger.info(f"尝试连接到WebSocket服务器: {uri[:60]}...")
async with websockets.connect(uri) as websocket:
logger.info("成功连接到WebSocket服务器")
# 重置重连计数器
reconnect_attempt = 0
backoff = INITIAL_BACKOFF
# 启动MCP脚本进程
process = subprocess.Popen(
['python', MCP_SCRIPT],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
text=True
)
logger.info(f"已启动 {MCP_SCRIPT} 进程")
# 创建通信任务
try:
await asyncio.gather(
pipe_websocket_to_process(websocket, process),
pipe_process_to_websocket(process, websocket, hardware),
pipe_process_stderr_to_terminal(process)
)
except Exception as e:
logger.error(f"通信任务出错: {e}")
finally:
# 确保子进程正确终止
terminate_process(process)
except websockets.exceptions.ConnectionClosed as e:
reconnect_attempt += 1
logger.warning(f"WebSocket连接关闭: {e}")
backoff = min(backoff * 2, MAX_BACKOFF)
except Exception as e:
reconnect_attempt += 1
logger.error(f"连接错误: {e}")
backoff = min(backoff * 2, MAX_BACKOFF)
# 终止进程
def terminate_process(process):
"""安全终止子进程"""
if process.poll() is None: # 检查进程是否仍在运行
logger.info(f"终止 {MCP_SCRIPT} 进程")
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("进程未响应,强制终止")
process.kill()
logger.info(f"{MCP_SCRIPT} 进程已终止")
# 信号处理
def signal_handler(sig, frame):
"""处理中断信号"""
logger.info("接收到中断信号,正在关闭...")
sys.exit(0)
# 主函数
async def main():
"""主异步函数"""
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
# 初始化硬件
try:
hardware = init_hardware()
except Exception as e:
logger.critical(f"硬件初始化失败,程序终止: {e}")
return
# WebSocket端点URL
endpoint_url = (
"wss://api.xiaozhi.me/mcp/?token="
"eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0."
"ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
)
# 启动主循环
try:
await connect_to_server(endpoint_url, hardware)
except KeyboardInterrupt:
logger.info("用户中断程序")
except Exception as e:
logger.error(f"程序执行错误: {e}")
if __name__ == "__main__":
asyncio.run(main())
在mind+中运行mcp_pipe.py。

稍等一会,等待成功连接服务器。

可以用乐动小智和M10智能家(MCP)互动了。
小结:
代码小白,本作业是复制上一作业的代码,只是换了一个小智AI硬件,换了接入点地址而已。
欢迎拍砖。
评论