回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

K10小智+M10智能”家“助手(MCP) 简单

头像 rzyzzxw 2025.07.05 72 0

7.5

【项目说明】

连续两天,都在学习云天老师的小智和M10MCP的教程,这个“智能家”小助手的项目算是一个课后习题,来巩固所学知识。

项目中有小风扇的开启和关闭,灯环的点亮和熄灭,获取环境温度和湿度,做为一个小练习还是蛮好的,下面是作业完成情况。

d975e81c043a468a66c5abd4cbf4c58.jpgd6f10a441ba7c7d296019798950c4db.jpg

材料清单

  • M10行空板 X1
  • M10电池扩展板组合 X1
  • RGB灯环(16灯) X1
  • 儿童小风扇 X1
  • 锂电池盒(风扇供电) X1
  • 数字继电器 X1
  • K10行空板(安装小智) X1
  • 锂电池(K10供电) X1
  • USB线 X1
  • DHT22温湿度传感器 X1

步骤1 行空板M10上安装pyenv环境和Python 3.12.7

这一步已在上个项目中按照官方教程完成,记录在:M10仰望小车(MCP)的学习与尝试

https://makelogapi.dfrobot.com.cn/api/project/handleShareUrl?pid=165_317714

image.png

同时感谢云天老师的教程和指导。

步骤2 K10小智聊天机器人配置

K10刷小智固件见官方教程。

链接上一帖子中都有。

89cd2768c183395d160be2152b3a7e1.png
0883719fdee0dade12586c7378045c1.png

我的仰望小车和智能家共用同一个角色配置,用了同一个MCP接入点地址。

步骤3 硬件连接

硬件接线简图

551859efc616260d7017e110abc38aa.png

实物图

4960fcd2fcb7cd2bc9bf87554a163cb.jpg

步骤4 M10程序编写

1、编写mcp服务文件,move.py。

新建文件move.py,复制云天老师写的代码,修改为智能家的控制内容。

功能描述:控制M10风扇开启、关闭,灯环点亮、熄灭。

代码
# move.py
from mcp.server.fastmcp import FastMCP
import sys
import logging

#配置日志
logger = logging.getLogger('MoveServer')

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 创建MCP服务器
mcp = FastMCP("MoveServer")

#======================= 设备控制工具 =======================
#添加一个工具
@mcp.tool()
def open_fan() -> dict:
    """
    控制风扇开启。   
    该函数将设置P21引脚为高电平,控制接在该引脚上的继电器闭合电源开关,使风扇开启。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "风扇开启"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_fan() -> dict:
    """
    控制风扇关闭。   
    该函数将设置P21引脚为低电平,控制接在该引脚上的继电器断开电源开关,使风扇关闭。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "风扇关闭"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def open_light() -> dict:
    """
    控制RGB灯环点亮。   
    该函数将设置P23引脚RGB灯环点亮,设置为红色,亮度100。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "灯环已点亮"
    logger.info(result)
    return {"success": True, "result": result}
#添加一个工具
@mcp.tool()
def close_light() -> dict:
    """
    控RGB灯环熄灭。   
    该函数将设置P23引脚RGB灯环熄灭。   
    Returns:
        dict: 返回一个字典,包含操作结果。
    """
    result = "灯环已熄灭"
    logger.info(result)
    return {"success": True, "result": result}

# 启动服务器
if __name__ == "__main__":
    mcp.run(transport="stdio")

这个代码文件放到DeepSeek中优化,得到如下代码,增加了获取实时温度、湿度。。

代码
"""
真实硬件版本:MCP服务器设备控制工具
版本: 0.3.0
"""

import sys
import logging
import time
from typing import Dict
from mcp.server.fastmcp import FastMCP
from pinpong.board import Board, Pin, DHT22, NeoPixel

# 配置日志系统
def setup_logger() -> logging.Logger:
    logger = logging.getLogger('MoveServer')
    logger.setLevel(logging.INFO)
    
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    file_handler = logging.FileHandler('move_server.log')
    file_handler.setLevel(logging.DEBUG)
    
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

# 修复Windows控制台UTF-8编码问题
if sys.platform == 'win32':
    try:
        sys.stderr.reconfigure(encoding='utf-8')
        sys.stdout.reconfigure(encoding='utf-8')
    except AttributeError:
        import io
        sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8')
        sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8')

# 创建日志记录器
logger = setup_logger()

# 创建MCP服务器
mcp = FastMCP("MoveServer")

# ======================= 硬件配置 =======================
HARDWARE_CONFIG = {
    "FAN_PIN": 21,      # 风扇控制引脚
    "SENSOR_PIN": 22,   # 温湿度传感器引脚
    "LIGHT_PIN": 23,    # 灯带控制引脚
    "NUM_LEDS": 16      # LED数量
}

# ======================= 设备控制工具 =======================

class HardwareController:
    """真实硬件操作类"""
    
    def __init__(self, config: dict):
        """初始化真实硬件"""
        try:
            # 初始化开发板
            Board().begin()
            
            # 初始化风扇控制引脚
            self.fan_pin = Pin(config["FAN_PIN"], Pin.OUT)
            
            # 初始化温湿度传感器
            self.dht_sensor = DHT22(Pin(config["SENSOR_PIN"]))
            
            # 初始化灯带
            self.light_pin = NeoPixel(Pin(config["LIGHT_PIN"]), config["NUM_LEDS"])
            self.light_pin.brightness(100)
            
            logger.info("真实硬件初始化成功")
        except Exception as e:
            logger.critical(f"硬件初始化失败: {e}")
            raise

    def control_fan(self, state: bool) -> str:
        try:
            self.fan_pin.write_digital(1 if state else 0)
            return f"风扇已{'开启' if state else '关闭'}"
        except Exception as e:
            logger.error(f"风扇控制失败: {e}")
            return f"风扇控制失败: {str(e)}"
    
    def control_light(self, state: bool) -> str:
        try:
            if state:
                self.light_pin.range_color(0, self.light_pin.num, 0xFF0000)  # 红色
            else:
                self.light_pin.clear()
            return f"灯带已{'点亮' if state else '熄灭'}"
        except Exception as e:
            logger.error(f"灯带控制失败: {e}")
            return f"灯带控制失败: {str(e)}"
    
    def get_temperature(self) -> str:
        """获取真实温度数据"""
        try:
            # 尝试最多3次读取,防止偶尔读取失败
            for _ in range(3):
                try:
                    temp = self.dht_sensor.temp_c()
                    return f"当前温度: {temp}℃"
                except:
                    time.sleep(0.2)  # 短暂延迟后重试
            return "温度读取失败"
        except Exception as e:
            logger.error(f"温度读取失败: {e}")
            return f"温度读取失败: {str(e)}"
    
    def get_humidity(self) -> str:
        """获取真实湿度数据"""
        try:
            for _ in range(3):
                try:
                    humidity = self.dht_sensor.humidity()
                    return f"当前湿度: {humidity}%"
                except:
                    time.sleep(0.2)
            return "湿度读取失败"
        except Exception as e:
            logger.error(f"湿度读取失败: {e}")
            return f"湿度读取失败: {str(e)}"

def tool_response(success: bool, result: str) -> Dict[str, object]:
    return {
        "success": success,
        "result": result
    }

# 创建硬件控制器实例
try:
    hardware_controller = HardwareController(HARDWARE_CONFIG)
except Exception as e:
    logger.critical("硬件初始化失败,服务器无法启动")
    sys.exit(1)

@mcp.tool()
def open_fan() -> Dict[str, object]:
    logger.info("执行风扇开启命令")
    result = hardware_controller.control_fan(True)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def close_fan() -> Dict[str, object]:
    logger.info("执行风扇关闭命令")
    result = hardware_controller.control_fan(False)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def open_light() -> Dict[str, object]:
    logger.info("执行灯带开启命令")
    result = hardware_controller.control_light(True)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def close_light() -> Dict[str, object]:
    logger.info("执行灯带关闭命令")
    result = hardware_controller.control_light(False)
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def get_temp() -> Dict[str, object]:
    logger.info("读取温度数据")
    result = hardware_controller.get_temperature()
    logger.info(result)
    return tool_response("失败" not in result, result)

@mcp.tool()
def get_humidity() -> Dict[str, object]:
    logger.info("读取湿度数据")
    result = hardware_controller.get_humidity()
    logger.info(result)
    return tool_response("失败" not in result, result)

# 启动服务器
if __name__ == "__main__":
    try:
        logger.info("启动MCP服务器...")
        logger.info(f"硬件配置: {HARDWARE_CONFIG}")
        mcp.run(transport="stdio")
        logger.info("MCP服务器已停止")
    except Exception as e:
        logger.critical(f"服务器运行失败: {e}")
        sys.exit(1)

修改小智AI管道文件mcp_pipe.py。

新建文件mcp_pipe.py,复制云天老师代码,并做控制动作方面的修改,填入自己的MCP接入点地址。

功能描述:连接到 MCP 服务器,并通过 WebSocket 端点将输入和输出与指定的 Python 脚本进行管道通信。

加入了开机画面。

back.png

代码
"""
This script is used to connect to the MCP server and pipe the input and output to the websocket endpoint.
Version: 0.1.0

Usage:

export MCP_ENDPOINT=
python mcp_pipe.py 

"""

#  -*- coding: UTF-8 -*-
 
# MindPlus
# Python
from unihiker import GUI
import asyncio
import websockets
import subprocess
import logging
import os
import signal
import sys
import random
from dotenv import load_dotenv
from pinpong.board import Pin
from pinpong.board import Board
from pinpong.board import Board,Pin
from pinpong.board import DHT22
from pinpong.board import NeoPixel
import json

Board().begin()
u_gui=GUI()
pin1 = Pin(Pin.P21, Pin.OUT)
pin2 = Pin(Pin.P22, Pin.IN)
pin3 = Pin(Pin.P23, Pin.PWM)
dht1 = DHT22(pin2)
np1 = NeoPixel(pin3,16)
np1.brightness(128)
np1.brightness(100)

u_gui.draw_image(image="back.png",x=0,y=0)
u_gui.draw_text(text="M10智能助手",x=10,y=20,font_size=25, color="#FF00FF")

# 设置日志记录器
# Load environment variables from .env file
#load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MCP_PIPE')

# Reconnection settings
INITIAL_BACKOFF = 1  # Initial wait time in seconds
MAX_BACKOFF = 600  # Maximum wait time in seconds
reconnect_attempt = 0
backoff = INITIAL_BACKOFF

async def connect_with_retry(uri):
    """Connect to WebSocket server with retry mechanism"""
    global reconnect_attempt, backoff
    while True:  # Infinite reconnection
        try:
            if reconnect_attempt > 0:
                wait_time = backoff * (1 + random.random() * 0.1)  # Add some random jitter
                logger.info(f"Waiting {wait_time:.2f} seconds before reconnection attempt {reconnect_attempt}...")
                await asyncio.sleep(wait_time)

            # Attempt to connect
            await connect_to_server(uri)

        except Exception as e:
            reconnect_attempt += 1
            logger.warning(f"Connection closed (attempt: {reconnect_attempt}): {e}")            
            # Calculate wait time for next reconnection (exponential backoff)
            backoff = min(backoff * 2, MAX_BACKOFF)

async def connect_to_server(uri):
    """Connect to WebSocket server and establish bidirectional communication with `mcp_script`"""
    global reconnect_attempt, backoff
    try:
        logger.info(f"Connecting to WebSocket server...")
        async with websockets.connect(uri) as websocket:
            logger.info(f"Successfully connected to WebSocket server")

            # Reset reconnection counter if connection closes normally
            reconnect_attempt = 0
            backoff = INITIAL_BACKOFF

            # Start mcp_script process
            process = subprocess.Popen(
                ['python', mcp_script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                encoding='utf-8',
                text=True  # Use text mode
            )
            logger.info(f"Started {mcp_script} process")

            # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket
            await asyncio.gather(
                pipe_websocket_to_process(websocket, process),
                pipe_process_to_websocket(process, websocket),
                pipe_process_stderr_to_terminal(process)
            )
    except websockets.exceptions.ConnectionClosed as e:
        logger.error(f"WebSocket connection closed: {e}")
        raise  # Re-throw exception to trigger reconnection
    except Exception as e:
        logger.error(f"Connection error: {e}")
        raise  # Re-throw exception
    finally:
        # Ensure the child process is properly terminated
        if 'process' in locals():
            logger.info(f"Terminating {mcp_script} process")
            try:
                process.terminate()
                process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                process.kill()
            logger.info(f"{mcp_script} process terminated")

async def pipe_websocket_to_process(websocket, process):
    """Read data from WebSocket and write to process stdin"""
    try:
        while True:
            # Read message from WebSocket
            message = await websocket.recv()
            logger.debug(f"<< {message[:120]}...")

            # Write to process stdin (in text mode)
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            process.stdin.write(message + '\n')
            process.stdin.flush()
    except Exception as e:
        logger.error(f"Error in WebSocket to process pipe: {e}")
        raise  # Re-throw exception to trigger reconnection
    finally:
        # Close process stdin
        if not process.stdin.closed:
            process.stdin.close()

async def pipe_process_to_websocket(process, websocket):
    """Read data from process stdout and send to WebSocket"""
    try:
        while True:
            # Read data from process stdout
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended output")
                break

            # Send data to WebSocket
            logger.debug(f">> {data[:120]}...")
            print(data)
            # 解析 JSON 字符串
            json_str = json.loads(data)
            print(json_str['id'])

            if json_str['id']>1:
              print(json_str['id'])
              if json_str.get('result', {}):

                text=json.loads(json_str['result']['content'][0]['text'])
                if text['success']:
                   print(text['result'])
                   L=480
                   if "开启" in text['result']:
                             print("******************************")
                             pin1.write_digital(1)                            
                   if "关闭" in text['result']:
                             print("******************************")
                             pin1.write_digital(0)                            
                   if "点亮" in text['result']:
                             print("******************************")
                             np1.range_color(0,15,0xFF0000) 
                   if "熄灭" in text['result']:
                             print("******************************")
                             np1.clear()                   
            # In text mode, data is already a string, no need to decode
            await websocket.send(data)
    except Exception as e:
        logger.error(f"Error in process to WebSocket pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

async def pipe_process_stderr_to_terminal(process):
    """Read data from process stderr and print to terminal"""
    try:
        while True:
            # Read data from process stderr
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )

            if not data:  # If no data, the process may have ended
                logger.info("Process has ended stderr output")
                break

            # Print stderr data to terminal (in text mode, data is already a string)
            sys.stderr.write(data)
            sys.stderr.flush()
    except Exception as e:
        logger.error(f"Error in process stderr pipe: {e}")
        raise  # Re-throw exception to trigger reconnection

def signal_handler(sig, frame):
    """Handle interrupt signals"""
    logger.info("Received interrupt signal, shutting down...")
    sys.exit(0)

if __name__ == "__main__":
    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    # mcp_script
    #if len(sys.argv) < 2:
    #    logger.error("Usage: mcp_pipe.py ")
    #    sys.exit(1)

    mcp_script = "move.py"

    # Get token from environment variable or command line arguments
    #endpoint_url = os.environ.get('MCP_ENDPOINT')
    endpoint_url="wss://api.xiaozhi.me/mcp/?token=eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0.ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    if not endpoint_url:
        logger.error("Please set the `MCP_ENDPOINT` environment variable")
        sys.exit(1)

    # Start main loop
    try:
        asyncio.run(connect_with_retry(endpoint_url))
    except KeyboardInterrupt:
        logger.info("Program interrupted by user")
    except Exception as e:
        logger.error(f"Program execution error: {e}")

跑通后放在DeepSeek中优化,得到如下代码。

这个文件配合上一组文件中下面的DS优化版使用。

[小贴士]

教程中放了4个代码文件,上move.pyt和上mcp_pipe.py两个文件一组配合使用,只能控制开关和风扇开关。

下move.pyt和下mcp_pipe.py两个DeepSeek优化版文件一组配合使用,增加了获取温湿度传感器实时读数的功能,从文件风格和内容中可看出来,请不要混合,混合不能运行。

MCP接入地址要换成自己小智的接入点地址。

代码
"""
优化版本:连接MCP服务器并通过WebSocket传输输入输出
版本: 0.2.0

改进内容:
1. 模块化设计 - 功能拆分为独立函数
2. 增强错误处理 - 添加更完善的异常处理
3. 硬件初始化封装 - 集中管理硬件配置
4. JSON处理优化 - 简化解析逻辑
5. 资源管理改进 - 确保正确释放所有资源
6. 日志系统优化 - 提供更详细的错误信息

用法:
export MCP_ENDPOINT=
python mcp_pipe.py
"""

# -*- coding: UTF-8 -*-
import asyncio
import json
import logging
import signal
import sys
import random
import subprocess
from unihiker import GUI
import websockets
from pinpong.board import Board, Pin, DHT22, NeoPixel

# 全局常量
INITIAL_BACKOFF = 1  # 初始重连等待时间(秒)
MAX_BACKOFF = 600    # 最大重连等待时间(秒)
MCP_SCRIPT = "move.py"  # MCP脚本名称

# 配置日志系统
def setup_logger():
    """配置并返回日志记录器"""
    logger = logging.getLogger('MCP_PIPE_OPT')
    logger.setLevel(logging.INFO)
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 创建文件处理器
    file_handler = logging.FileHandler('mcp_pipe.log')
    file_handler.setLevel(logging.DEBUG)
    
    # 创建日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    # 添加处理器到日志记录器
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

logger = setup_logger()

# 硬件初始化
def init_hardware():
    """初始化所有硬件设备并返回对象"""
    try:
        # 初始化开发板
        Board().begin()
        
        # 初始化GUI
        gui = GUI()
        gui.draw_image(image="back.png", x=0, y=0)
        gui.draw_text(
            text="M10智能助手", 
            x=10, 
            y=20, 
            font_size=25, 
            color="#FF00FF"
        )

        # 初始化引脚
        pin1 = Pin(Pin.P21, Pin.OUT)
        pin2 = Pin(Pin.P22, Pin.IN)
        pin3 = Pin(Pin.P23, Pin.PWM)
        
        # 初始化传感器
        dht = DHT22(pin2)
        
        # 初始化NeoPixel
        np = NeoPixel(pin3, 16)
        np.brightness(100)
        
        logger.info("硬件初始化成功")
        return {
            'gui': gui,
            'pin1': pin1,
            'pin2': pin2,
            'pin3': pin3,
            'dht': dht,
            'np': np
        }
    
    except Exception as e:
        logger.error(f"硬件初始化失败: {e}")
        raise

# 处理硬件控制命令
def handle_hardware_command(command: str, hardware: dict):
    """
    根据命令执行硬件操作
    
    参数:
        command: 接收到的命令字符串
        hardware: 包含所有硬件对象的字典
    """
    try:
        if "开启" in command:
            hardware['pin1'].write_digital(1)
            logger.info("设备已开启")
        elif "关闭" in command:
            hardware['pin1'].write_digital(0)
            logger.info("设备已关闭")
        elif "点亮" in command:
            hardware['np'].range_color(0, 15, 0xFF0000)
            logger.info("灯带已点亮")
        elif "熄灭" in command:
            hardware['np'].clear()
            logger.info("灯带已熄灭")
        elif "温度" in command:
            temp = hardware['dht'].temp_c()
            logger.info(f"当前温度: {temp}℃")
        elif "湿度" in command:
            humidity = hardware['dht'].humidity()
            logger.info(f"当前湿度: {humidity}%")
    except Exception as e:
        logger.error(f"执行硬件命令时出错: {command} - {e}")

# 处理JSON数据
def process_json_data(data: str, hardware: dict):
    """
    处理JSON数据并执行相应的硬件操作
    
    参数:
        data: JSON格式的字符串数据
        hardware: 包含所有硬件对象的字典
    """
    try:
        json_data = json.loads(data)
        message_id = json_data.get('id', 0)
        
        # 只处理ID大于1的消息
        if message_id <= 1:
            return
        
        result = json_data.get('result', {})
        if not result:
            return
        
        # 提取内容文本
        contents = result.get('content', [])
        if not contents:
            return
        
        first_content = contents[0]
        text_content = first_content.get('text', '')
        
        if not text_content:
            return
        
        # 尝试解析内部JSON
        try:
            inner_data = json.loads(text_content)
            if inner_data.get('success', False):
                command_result = inner_data.get('result', '')
                if command_result:
                    handle_hardware_command(command_result, hardware)
        except json.JSONDecodeError:
            # 如果不是JSON,直接作为命令处理
            handle_hardware_command(text_content, hardware)
    
    except Exception as e:
        logger.error(f"处理JSON数据时出错: {e}\n原始数据: {data[:200]}")

# WebSocket到进程的管道
async def pipe_websocket_to_process(websocket, process):
    """从WebSocket读取数据并写入进程的stdin"""
    try:
        while True:
            message = await websocket.recv()
            logger.debug(f"<< 收到消息: {message[:120]}...")
            
            if isinstance(message, bytes):
                message = message.decode('utf-8')
            
            try:
                process.stdin.write(message + '\n')
                process.stdin.flush()
            except BrokenPipeError:
                logger.error("进程stdin管道已断开")
                break
    
    except websockets.exceptions.ConnectionClosed:
        logger.warning("WebSocket连接已关闭")
    except Exception as e:
        logger.error(f"WebSocket到进程管道错误: {e}")
    finally:
        if not process.stdin.closed:
            process.stdin.close()

# 进程到WebSocket的管道
async def pipe_process_to_websocket(process, websocket, hardware):
    """从进程的stdout读取数据并发送到WebSocket"""
    try:
        while True:
            data = await asyncio.get_event_loop().run_in_executor(
                None, process.stdout.readline
            )
            
            if not data:
                logger.info("进程输出结束")
                break
                
            logger.debug(f">> 发送消息: {data[:120]}...")
            
            # 处理数据并控制硬件
            process_json_data(data, hardware)
            
            # 发送到WebSocket
            await websocket.send(data)
    
    except Exception as e:
        logger.error(f"进程到WebSocket管道错误: {e}")

# 进程stderr到终端的管道
async def pipe_process_stderr_to_terminal(process):
    """从进程的stderr读取数据并打印到终端"""
    try:
        while True:
            err_data = await asyncio.get_event_loop().run_in_executor(
                None, process.stderr.readline
            )
            
            if not err_data:
                logger.info("进程错误输出结束")
                break
                
            sys.stderr.write(err_data)
            sys.stderr.flush()
    
    except Exception as e:
        logger.error(f"进程stderr管道错误: {e}")

# 连接WebSocket服务器
async def connect_to_server(uri, hardware):
    """连接到WebSocket服务器并建立与MCP脚本的双向通信"""
    reconnect_attempt = 0
    backoff = INITIAL_BACKOFF
    
    while True:
        try:
            if reconnect_attempt > 0:
                wait_time = min(backoff * (1 + random.random() * 0.1), MAX_BACKOFF)
                logger.info(f"等待 {wait_time:.2f} 秒后尝试重新连接 #{reconnect_attempt}...")
                await asyncio.sleep(wait_time)
            
            logger.info(f"尝试连接到WebSocket服务器: {uri[:60]}...")
            
            async with websockets.connect(uri) as websocket:
                logger.info("成功连接到WebSocket服务器")
                
                # 重置重连计数器
                reconnect_attempt = 0
                backoff = INITIAL_BACKOFF
                
                # 启动MCP脚本进程
                process = subprocess.Popen(
                    ['python', MCP_SCRIPT],
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    encoding='utf-8',
                    text=True
                )
                logger.info(f"已启动 {MCP_SCRIPT} 进程")
                
                # 创建通信任务
                try:
                    await asyncio.gather(
                        pipe_websocket_to_process(websocket, process),
                        pipe_process_to_websocket(process, websocket, hardware),
                        pipe_process_stderr_to_terminal(process)
                    )
                except Exception as e:
                    logger.error(f"通信任务出错: {e}")
                finally:
                    # 确保子进程正确终止
                    terminate_process(process)
        
        except websockets.exceptions.ConnectionClosed as e:
            reconnect_attempt += 1
            logger.warning(f"WebSocket连接关闭: {e}")
            backoff = min(backoff * 2, MAX_BACKOFF)
        except Exception as e:
            reconnect_attempt += 1
            logger.error(f"连接错误: {e}")
            backoff = min(backoff * 2, MAX_BACKOFF)

# 终止进程
def terminate_process(process):
    """安全终止子进程"""
    if process.poll() is None:  # 检查进程是否仍在运行
        logger.info(f"终止 {MCP_SCRIPT} 进程")
        try:
            process.terminate()
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            logger.warning("进程未响应,强制终止")
            process.kill()
        logger.info(f"{MCP_SCRIPT} 进程已终止")

# 信号处理
def signal_handler(sig, frame):
    """处理中断信号"""
    logger.info("接收到中断信号,正在关闭...")
    sys.exit(0)

# 主函数
async def main():
    """主异步函数"""
    # 注册信号处理
    signal.signal(signal.SIGINT, signal_handler)
    
    # 初始化硬件
    try:
        hardware = init_hardware()
    except Exception as e:
        logger.critical(f"硬件初始化失败,程序终止: {e}")
        return
    
    # WebSocket端点URL
    endpoint_url = (
        "wss://api.xiaozhi.me/mcp/?token="
        "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9."
        "eyJ1c2VySWQiOjIxNjQzNywiYWdlbnRJZCI6MTQ5OTAyLCJlbmRwb2ludElkIjoiYWdlbnRfMTQ5OTAyIiwicHVycG9zZSI6Im1jcC1lbmRwb2ludCIsImlhdCI6MTc1MTU0NjIzOH0."
        "ha8DI8RcXNCWntx22plEAtxd2ovVLEFS7H8V7TpED-6FfqYKcjQewWIFlEVfIjoE9UJNTQoytxdlQ5hJ4IsDHg"
    )
    
    # 启动主循环
    try:
        await connect_to_server(endpoint_url, hardware)
    except KeyboardInterrupt:
        logger.info("用户中断程序")
    except Exception as e:
        logger.error(f"程序执行错误: {e}")

if __name__ == "__main__":
    asyncio.run(main())

在mind+中点击运行。

image.png

 

稍等一会,成功连接服务器。

5d1af2b3dc8d533da08a4cf0580a7d8.png

此时接入点刷新后是这样的:在线,工具可用。

image.png

下面就可以用K10小智和M10智能家(MCP)互动了。

d6f10a441ba7c7d296019798950c4db.jpg

小结:

1、在实操过程中,硬件操作的代码(小白)先用图形化写好再自动生成,在修改move.py和mcp_pipe.py时参考。

2、在本项目中参考云天老师代码顺利完成灯环和小风扇的控制,但是实时读取dht22的温度和湿度数据的代码不会写,这部分交给DeepSeeK优化完成。

评论

user-avatar