回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

乐动小智+M10智能小助手(MCP) 简单

头像 rzyzzxw 2025.07.07 27 0

7.7

【项目目标】

前面两个任务,都在学习小智AI和M10(MCP),在这个任务中,我将尝试用乐动小智和行空板M10(MCP)组成智能家居助手,控制RGB灯环的亮灭,小风扇的启停,同时尝试获取dht22传感器实时数据。

36dd0f6ec7425d8aec448d91c31a16f.jpg

材料清单

  • 乐动掌控2.0小智 X1
  • 行空板M10 X1
  • M10扩展板组合 X1
  • 小风扇、灯环 X2
  • dht22温湿度传感器 X1

步骤1 乐动掌控2.0安装小智AI

1、下载安装mpython 0.8.7,连接乐动掌控,选择硬件。

2、刷小智固件。

32313f14a2e079968b02769f44d7938.png

3、给小智配网。

9696fedb9d77a78c96e3d958f06f1fd.jpg

WiFi连接成功后,屏幕上会出现小智平台网址和“设备码”。

4、设置小智AI。

登录https://xiaozhi.me小智机器人官网,进入“控制台”,添加小智机器人。

添加小智机器人,需要点击“新建智能体”菜单,新建智能体,例如起名“乐动掌控”。

image.png

配置角色:

image.png

管理设备--“添加设备”按钮,在弹出的对话框中填写设备码,设备码为屏幕上的数字,确定。

这样,乐动掌控2.0成功变身为“小智聊天机器人”,可以畅快的与之对话了。

聊天后就可使用声纹识别功能,添加说话人了。

62383fa3063ec536cbe9c473c0cc8f1.png

挺简单的。

获取MCP接入点,复制接入点地址备用。

8f8fa2f5915a3051f50841ae2b75c6d.png

测试了下,乐动掌控2.0的摄像头目前没有加入互动,期待早日实现拍照识物功能。

步骤2 行空板M10上安装pyenv环境和Python 3.12.7

这一步已在前面项目中按照官方教程完成,记录在:M10仰望小车(MCP)的学习与尝试

https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714

步骤3 硬件连接

551859efc616260d7017e110abc38aa.png

步骤4 M10程序编写

1、mcp服务文件,move.py。

功能描述:控制M10风扇开启、关闭,风量调节,灯光点亮、熄灭,亮度调整,获取实时温度、湿度。

代码
"""
真实硬件版本:MCP服务器设备控制工具
版本: 0.3.0
"""

import sys
import logging
import time
from typing import Dict
from mcp.server.fastmcp import FastMCP
from pinpong.board import Board, Pin, DHT22, NeoPixel

# 配置日志系统
def setup_logger() -> logging.Logger:
    logger = logging.getLogger('MoveServer')
    logger.setLevel(logging.INFO)
    
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    file_handler = logging.FileHandler('move_server.log')
    file_handler.setLevel(logging.DEBUG)
    
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    try:
        sys.stderr.reconfigure(encoding='utf-8')
        sys.stdout.reconfigure(encoding='utf-8')
    except AttributeError:
        import io
        sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8')
        sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8')

# 创建日志记录器
logger = setup_logger()

# 创建MCP服务器
mcp = FastMCP("MoveServer")

# ======================= 硬件配置 =======================
HARDWARE_CONFIG = {
    "FAN_PIN": 21,      # 风扇控制引脚
    "SENSOR_PIN": 22,   # 温湿度传感器引脚
    "LIGHT_PIN": 23,    # 灯带控制引脚
    "NUM_LEDS": 16      # LED数量
}

# ======================= 设备控制工具 =======================

class HardwareController:
    """真实硬件操作类"""
    
    def __init__(self, config: dict):
        """初始化真实硬件"""
        try:
            # 初始化开发板
            Board().begin()
            
            # 初始化风扇控制引脚
            self.fan_pin = Pin(config["FAN_PIN"], Pin.OUT)
            
            # 初始化温湿度传感器
            self.dht_sensor = DHT22(Pin(config["SENSOR_PIN"]))
            
            # 初始化灯带
            self.light_pin = NeoPixel(Pin(config["LIGHT_PIN"]), config["NUM_LEDS"])
            self.light_pin.brightness(100)
            
            logger.info("真实硬件初始化成功")
        except Exception as e:
            logger.critical(f"硬件初始化失败: {e}")
            raise

    def control_fan(self, state: bool) -> str:
        try:
            self.fan_pin.write_digital(1 if state else 0)
            return f"风扇已{'开启' if state else '关闭'}"
        except Exception as e:
            logger.error(f"风扇控制失败: {e}")
            return f"风扇控制失败: {str(e)}"
    
    def control_light(self, state: bool) -> str:
        try:
            if state:
                self.light_pin.range_color(0, self.light_pin.num, 0xFF0000)  # 红色
            else:
                self.light_pin.clear()
            return f"灯带已{'点亮' if state else '熄灭'}"
        except Exception as e:
            logger.error(f"灯带控制失败: {e}")
            return f"灯带控制失败: {str(e)}"
    
    def get_temperature(self) -> str:
        """获取真实温度数据"""
        try:
            # 尝试最多3次读取,防止偶尔读取失败
            for _ in range(3):
                try:
                    temp = self.dht_sensor.temp_c()
                    return f"当前温度: {temp}℃"
                except:
                    time.sleep(0.2)  # 短暂延迟后重试
            return "温度读取失败"
        except Exception as e:
            logger.error(f"温度读取失败: {e}")
            return f"温度读取失败: {str(e)}"
    
    def get_humidity(self) -> str:
        """获取真实湿度数据"""
        try:
            for _ in range(3):
                try:
                    humidity = self.dht_sensor.humidity()
                    return f"当前湿度: {humidity}%"
                except:
                    time.sleep(0.2)
            return "湿度读取失败"
        except Exception as e:
            logger.error(f"湿度读取失败: {e}")
            return f"湿度读取失败: {str(e)}"

def tool_response(success: bool, result: str) -> Dict[str, object]:
    return {
        "success": success,
        "result": result
    }

# 创建硬件控制器实例
try:
    hardware_controller = HardwareController(HARDWARE_CONFIG)
except Exception as e:
    logger.critical("硬件初始化失败,服务器无法启动")
    sys.exit(1)

@mcp.tool()
def open_fan() -> Dict[str, object]:
    logger.info("执行风扇开启命令")
    result = hardware_controller.control_fan(True)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def close_fan() -> Dict[str, object]:
    logger.info("执行风扇关闭命令")
    result = hardware_controller.control_fan(False)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def open_light() -> Dict[str, object]:
    logger.info("执行灯带开启命令")
    result = hardware_controller.control_light(True)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def close_light() -> Dict[str, object]:
    logger.info("执行灯带关闭命令")
    result = hardware_controller.control_light(False)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def get_temp() -> Dict[str, object]:
    logger.info("读取温度数据")
    result = hardware_controller.get_temperature()
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def get_humidity() -> Dict[str, object]:
    logger.info("读取湿度数据")
    result = hardware_controller.get_humidity()
    logger.info(result)
    return tool_response("失败" not in result, result)

# 启动服务器
if __name__ == "__main__":
    try:
        logger.info("启动MCP服务器...")
        logger.info(f"硬件配置: {HARDWARE_CONFIG}")
        mcp.run(transport="stdio")
        logger.info("MCP服务器已停止")
    except Exception as e:
        logger.critical(f"服务器运行失败: {e}")
        sys.exit(1)

修改小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,填入自己的MCP接入点地址。

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

back.png

代码
"""
优化版本:连接MCP服务器并通过WebSocket传输输入输出
版本: 0.2.0

用法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""

# -*- coding: UTF-8 -*-
import asyncio
import json
import logging
import signal
import sys
import random
import subprocess
from unihiker import GUI
import websockets
from pinpong.board import Board, Pin, DHT22, NeoPixel

# 全局常量
INITIAL_BACKOFF = 1  # 初始重连等待时间(秒)
MAX_BACKOFF = 600    # 最大重连等待时间(秒)
MCP_SCRIPT = "move.py"  # MCP脚本名称

# 配置日志系统
def setup_logger():
    """配置并返回日志记录器"""
    logger = logging.getLogger('MCP_PIPE_OPT')
    logger.setLevel(logging.INFO)
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 创建文件处理器
    file_handler = logging.FileHandler('mcp_pipe.log')
    file_handler.setLevel(logging.DEBUG)
    
    # 创建日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    # 添加处理器到日志记录器
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

logger = setup_logger()

# 硬件初始化
def init_hardware():
    """初始化所有硬件设备并返回对象"""
    try:
        # 初始化开发板
        Board().begin()
        
        # 初始化GUI
        gui = GUI()
        gui.draw_image(image="back.png", x=0, y=0)
        gui.draw_text(
            text="M10智能助手", 
            x=10, 
            y=20, 
            font_size=25, 
            color="#FF00FF"
        )

        # 初始化引脚
        pin1 = Pin(Pin.P21, Pin.OUT)
        pin2 = Pin(Pin.P22, Pin.IN)
        pin3 = Pin(Pin.P23, Pin.PWM)
        
        # 初始化传感器
        dht = DHT22(pin2)
        
        # 初始化NeoPixel
        np = NeoPixel(pin3, 16)
        np.brightness(100)
        
        logger.info("硬件初始化成功")
        return {
            'gui': gui,
            'pin1': pin1,
            'pin2': pin2,
            'pin3': pin3,
            'dht': dht,
            'np': np
        }
    
    except Exception as e:
        logger.error(f"硬件初始化失败: {e}")
        raise

# 处理硬件控制命令
def handle_hardware_command(command: str, hardware: dict):
    """
    根据命令执行硬件操作
    
    参数:
        command: 接收到的命令字符串
        hardware: 包含所有硬件对象的字典
    """
    try:
        if "开启" in command:
            hardware['pin1'].write_digital(1)
            logger.info("设备已开启")
        elif "关闭" in command:
            hardware['pin1'].write_digital(0)
            logger.info("设备已关闭")
        elif "点亮" in command:
            hardware['np'].range_color(0, 15, 0xFF0000)
            logger.info("灯带已点亮")
        elif "熄灭" in command:
            hardware['np'].clear()
            logger.info("灯带已熄灭")
        elif "温度" in command:
            temp = hardware['dht'].temp_c()
            logger.info(f"当前温度: {temp}℃")
        elif "湿度" in command:
            humidity = hardware['dht'].humidity()
            logger.info(f"当前湿度: {humidity}%")
    except Exception as e:
        logger.error(f"执行硬件命令时出错: {command} - {e}")

# 处理JSON数据
def process_json_data(data: str, hardware: dict):
    """
    处理JSON数据并执行相应的硬件操作
    
    参数:
        data: JSON格式的字符串数据
        hardware: 包含所有硬件对象的字典
    """
    try:
        json_data = json.loads(data)
        message_id = json_data.get('id', 0)
        
        # 只处理ID大于1的消息
        if message_id <= 1:
            return
        
        result = json_data.get('result', {})
        if not result:
            return
        
        # 提取内容文本
        contents = result.get('content', [])
        if not contents:
            return
        
        first_content = contents[0]
        text_content = first_content.get('text', '')
        
        if not text_content:
            return
        
        # 尝试解析内部JSON
        try:
            inner_data = json.loads(text_content)
            if inner_data.get('success', False):
                command_result = inner_data.get('result', '')
                if command_result:
                    handle_hardware_command(command_result, hardware)
        except json.JSONDecodeError:
            # 如果不是JSON,直接作为命令处理
            handle_hardware_command(text_content, hardware)
    
    except Exception as e:
        logger.error(f"处理JSON数据时出错: {e}\n原始数据: {data[:200]}")

# WebSocket到进程的管道
async def pipe_websocket_to_process(websocket, process):
    """从WebSocket读取数据并写入进程的stdin"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< 收到消息: {message[:120]}...")
            
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            
            try:
                process.stdin.write(message + '\n')
                process.stdin.flush()
            except BrokenPipeError:
                logger.error("进程stdin管道已断开")
                break
    
    except websockets.exceptions.ConnectionClosed:
        logger.warning("WebSocket连接已关闭")
    except Exception as e:
        logger.error(f"WebSocket到进程管道错误: {e}")
    finally:
        if not process.stdin.closed:
            process.stdin.close()

# 进程到WebSocket的管道
async def pipe_process_to_websocket(process, websocket, hardware):
    """从进程的stdout读取数据并发送到WebSocket"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )
            
            if not data:
                logger.info("进程输出结束")
                break
                
            logger.debug(f">> 发送消息: {data[:120]}...")
            
            # 处理数据并控制硬件
            process_json_data(data, hardware)
            
            # 发送到WebSocket
            await websocket.send(data)
    
    except Exception as e:
        logger.error(f"进程到WebSocket管道错误: {e}")

# 进程stderr到终端的管道
async def pipe_process_stderr_to_terminal(process):
    """从进程的stderr读取数据并打印到终端"""
    try:
        while True:
            err_data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )
            
            if not err_data:
                logger.info("进程错误输出结束")
                break
                
            sys.stderr.write(err_data)
            sys.stderr.flush()
    
    except Exception as e:
        logger.error(f"进程stderr管道错误: {e}")

# 连接WebSocket服务器
async def connect_to_server(uri, hardware):
    """连接到WebSocket服务器并建立与MCP脚本的双向通信"""
    reconnect_attempt = 0
    backoff = INITIAL_BACKOFF
    
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f} 秒后尝试重新连接 #{reconnect_attempt}...")
                await asyncio.sleep(wait_time)
            
            logger.info(f"尝试连接到WebSocket服务器: {uri[:60]}...")
            
            async with websockets.connect(uri) as websocket:
                logger.info("成功连接到WebSocket服务器")
                
                # 重置重连计数器
                reconnect_attempt = 0
                backoff = INITIAL_BACKOFF
                
                # 启动MCP脚本进程
                process = subprocess.Popen(
                    ['python', MCP_SCRIPT],
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    encoding='utf-8',
                    text=True
                )
                logger.info(f"已启动 {MCP_SCRIPT} 进程")
                
                # 创建通信任务
                try:
                    await asyncio.gather(
                        pipe_websocket_to_process(websocket, process),
                        pipe_process_to_websocket(process, websocket, hardware),
                        pipe_process_stderr_to_terminal(process)
                    )
                except Exception as e:
                    logger.error(f"通信任务出错: {e}")
                finally:
                    # 确保子进程正确终止
                    terminate_process(process)
        
        except websockets.exceptions.ConnectionClosed as e:
            reconnect_attempt += 1
            logger.warning(f"WebSocket连接关闭: {e}")
            backoff = min(backoff * 2, MAX_BACKOFF)
        except Exception as e:
            reconnect_attempt += 1
            logger.error(f"连接错误: {e}")
            backoff = min(backoff * 2, MAX_BACKOFF)

# 终止进程
def terminate_process(process):
    """安全终止子进程"""
    if process.poll() is None:  # 检查进程是否仍在运行
        logger.info(f"终止 {MCP_SCRIPT} 进程")
        try:
            process.terminate()
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            logger.warning("进程未响应,强制终止")
            process.kill()
        logger.info(f"{MCP_SCRIPT} 进程已终止")

# 信号处理
def signal_handler(sig, frame):
    """处理中断信号"""
    logger.info("接收到中断信号,正在关闭...")
    sys.exit(0)

# 主函数
async def main():
    """主异步函数"""
    # 注册信号处理
    signal.signal(signal.SIGINT, signal_handler)
    
    # 初始化硬件
    try:
        hardware = init_hardware()
    except Exception as e:
        logger.critical(f"硬件初始化失败,程序终止: {e}")
        return
    
    # WebSocket端点URL
    endpoint_url = (
        "wss://api.xiaozhi.me/mcp/?token="
        "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9."
        "eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0."
        "ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    )
    
    # 启动主循环
    try:
        await connect_to_server(endpoint_url, hardware)
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

if __name__ == "__main__":
    asyncio.run(main())

在mind+中运行mcp_pipe.py。

image.png

稍等一会,等待成功连接服务器。

8f8fa2f5915a3051f50841ae2b75c6d.png

可以用乐动小智和M10智能家(MCP)互动了。

小结:

代码小白,本作业是复制上一作业的代码,只是换了一个小智AI硬件,换了接入点地址而已。

欢迎拍砖。

评论

user-avatar