回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

二哈识图2+小智ai制作学生心理疏导员 简单

头像 _深蓝_ 2025.12.20 52 0

一、【项目背景】

    中小学阶段是青少年心理发展的关键期,学业压力、同伴关系、亲子沟通等问题易引发焦虑、低落、烦躁等负面情绪,而校园内专业心理疏导资源有限,学生往往因羞于表达错过及时干预。基于此,我们结合二哈识图 2 AI 视觉传感器、Arduino UNO 主控板与小智 AI,依托 FastMCP 协议打造「学生心理疏导员」。该设备部署于校园心理角 / 班级角落,为学生提供低成本、无压力的即时心理支持,填补校园心理关怀的即时性缺口。

图片.png

二、【项目核心目标】

  1. 1.基于二哈识图 2 的表情识别能力,精准识别学生常见情绪(焦虑、低落、开心、烦躁、委屈等);
  2. 2.依托 FastMCP 协议打通二哈识图 2、UNO 与小智 AI 的通信链路,实现情绪数据实时传输;
  3. 3.支持学生主动触发深度对话,小智 AI 可引导学生表达情绪原因,提供可落地的调节建议。
  4.  

三、【技术栈与硬件清单】

1. 硬件清单

材料清单

FrWcLXTYRp2EA4SdvHOHpnpJlGQW.jpg
FsaHqYp7tAFlLLJqIoanEPErWJ6F.jpg
Fq295n7xdAWVr1xXsr-6c0LHrbVJ.jpg

2. 技术栈

  • 通信协议:FastMCP(Model Context Protocol)—— 打通 UNO、二哈识图 2 与小智 AI 的核心协议,实现跨设备数据标准化传输;
  • 视觉识别:二哈识图 2 内置表情识别算法(无需额外训练,支持自定义情绪阈值校准);
  • 语音交互:小智 AI 原生语音引擎(支持青少年友好的语音合成、中文对话解析);
  • 数据传输:串口通信(二哈识图 2→UNO、UNO→ESP32)、WebSocket(FastMCP 客户端→小智 AI 云端)。
  •  

四、【核心功能实现】

 

1. FastMCP Server 扩展:情绪识别与疏导工具

基于小智 AI 开源代码扩展 FastMCP Server,新增 2 个核心工具函数,实现「读取串口列表」「读取串口数据」,核心代码逻辑(工具注册)如下:

 

代码
from fastmcp import FastMCP
import sys
import serial
import serial.tools.list_ports
from typing import Optional, Dict, Any
from datetime import datetime
import threading
import time
import os

DEFAULT_PORT = "COM13"          # 默认串口
DEFAULT_BAUDRATE = 9600         # 默认波特率
DEFAULT_TIMEOUT = 2.0           # 串口超时时间
AUTO_READ_INTERVAL = 1.0        # 自动读取间隔(秒)
SHOW_BANNER = True              # 是否显示FastMCP启动横幅
# =======================================================================

# 修复Windows CMD编码问题(解决中文乱码)
if sys.platform == 'win32':
    os.system("chcp 65001 >nul")  # 切换CMD编码为UTF-8
    sys.stdout.reconfigure(encoding='utf-8', line_buffering=True)
    sys.stderr.reconfigure(encoding='utf-8', line_buffering=True)

# 全局状态变量(用于可视化执行状态)
global_status = {
    "server_running": False,
    "serial_connected": False,
    "last_read_time": None,
    "last_data": "",
    "error_count": 0,
    "total_read_count": 0
}

def print_status():
    """打印格式化的程序状态(CMD界面可视化)"""
    # 清屏(保持界面整洁)
    os.system("cls" if sys.platform == 'win32' else "clear")
    
    # 打印标题
    print("="*80)
    print("                    SerialReader - FastMCP 串口服务 (实时监控)")
    print("="*80)
    
    # 打印核心状态
    print(f"[服务状态] FastMCP服务器: {'✅ 运行中' if global_status['server_running'] else '❌ 未启动'}")
    print(f"[串口状态] {DEFAULT_PORT} 连接: {'✅ 已连接' if global_status['serial_connected'] else '❌ 未连接'}")
    print(f"[读取统计] 总读取次数: {global_status['total_read_count']} | 错误次数: {global_status['error_count']}")
    print(f"[最后读取] 时间: {global_status['last_read_time'] or '无'} | 数据: {global_status['last_data'] or '无'}")
    print("-"*80)
    print("[实时日志] (按 Ctrl+C 退出)")
    print("-"*80)

def print_log(level: str, message: str, show_in_status: bool = True):
    """
    打印带时间戳的日志(适配CMD可视化)
    :param level: 日志级别 (info/error/warn)
    :param message: 日志内容
    :param show_in_status: 是否在状态界面显示
    """
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    level_icon = {
        "info": "ℹ️",
        "error": "❌",
        "warn": "⚠️"
    }.get(level.lower(), "ℹ️")
    
    log_msg = f"{timestamp} {level_icon} {message}"
    
    # 如果需要在状态界面显示,先更新状态再打印
    if show_in_status:
        print_status()
        print(log_msg)
    else:
        # 仅打印日志(不刷新状态)
        print(log_msg)

def validate_serial_port(port: str) -> bool:
    """验证串口是否存在,并打印可用串口列表"""
    try:
        available_ports = [p.device for p in serial.tools.list_ports.comports()]
        print_log("info", f"检测到可用串口: {available_ports}", show_in_status=False)
        return port in available_ports
    except Exception as e:
        print_log("error", f"获取串口列表失败: {str(e)}")
        return False

def read_serial_port_core(port: str, baudrate: int, timeout: float) -> Dict[str, Any]:
    """串口读取核心逻辑(独立函数,便于复用)"""
    global global_status
    ser = None
    try:
        # 验证串口
        if not validate_serial_port(port):
            global_status["serial_connected"] = False
            global_status["error_count"] += 1
            return {
                "success": False,
                "error": f"串口{port}不存在,请检查设备连接"
            }
        
        # 打开串口
        ser = serial.Serial(
            port=port,
            baudrate=baudrate,
            timeout=timeout,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
            xonxoff=False,
            rtscts=False,
            dsrdtr=False
        )
        
        if not ser.is_open:
            ser.open()
        
        global_status["serial_connected"] = True
        print_log("info", f"成功连接串口 {port} (波特率: {baudrate})", show_in_status=False)
        
        # 清空缓冲区,避免读取历史数据
        ser.flushInput()
        time.sleep(0.1)
        
        # 读取数据(按行读取,自动处理换行符)
        raw_data = ser.readline()
        if not raw_data:
            return {
                "success": True,
                "result": "",
                "port_info": {"port": port, "baudrate": baudrate, "status": "connected"}
            }
        
        # 解码数据(兼容UTF-8/GBK)
        try:
            data = raw_data.decode('utf-8').strip()
        except:
            data = raw_data.decode('gbk', errors='ignore').strip()
        
        # 更新全局状态
        global_status["last_read_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        global_status["last_data"] = data
        global_status["total_read_count"] += 1
        
        print_log("info", f"读取到串口数据: [{data}]", show_in_status=False)
        
        return {
            "success": True,
            "result": data,
            "port_info": {"port": port, "baudrate": baudrate, "status": "connected"}
        }
    
    except serial.SerialException as e:
        global_status["serial_connected"] = False
        global_status["error_count"] += 1
        error_msg = f"串口操作失败: {str(e)}"
        if "AccessDenied" in str(e):
            error_msg += " (串口被占用,请关闭Putty/其他串口工具)"
        print_log("error", error_msg)
        return {"success": False, "error": error_msg}
    
    except Exception as e:
        global_status["serial_connected"] = False
        global_status["error_count"] += 1
        error_msg = f"未知错误: {str(e)}"
        print_log("error", error_msg)
        return {"success": False, "error": error_msg}
    
    finally:
        if ser and ser.is_open:
            ser.close()
            print_log("info", f"已关闭串口 {port}", show_in_status=False)
            global_status["serial_connected"] = False

mcp = FastMCP("SerialReader")

@mcp.tool()
def read_serial_port(
    port: str = DEFAULT_PORT,
    baudrate: int = DEFAULT_BAUDRATE,
    timeout: float = DEFAULT_TIMEOUT,
    read_line: bool = True  # 保留兼容参数
) -> Dict[str, Any]:
    """
    FastMCP标准工具:读取指定串口数据
    :param port: 串口名称 (如COM13)
    :param baudrate: 波特率
    :param timeout: 读取超时时间
    :param read_line: 按行读取(固定为True,兼容旧调用)
    :return: 读取结果(含状态/数据/错误信息)
    """
    print_log("info", f"收到MCP调用请求 → 读取串口 {port} (波特率: {baudrate})", show_in_status=False)
    return read_serial_port_core(port, baudrate, timeout)

@mcp.tool()
def list_serial_ports() -> Dict[str, Any]:
    """FastMCP标准工具:列出所有可用串口"""
    try:
        ports = []
        for p in serial.tools.list_ports.comports():
            ports.append({
                "device": p.device,
                "name": p.name,
                "description": p.description,
                "manufacturer": p.manufacturer or "未知",
                "product": p.product or "未知"
            })
        print_log("info", f"MCP调用 → 列出可用串口: {[p['device'] for p in ports]}", show_in_status=False)
        return {
            "success": True,
            "result": ports
        }
    except Exception as e:
        error_msg = f"列出串口失败: {str(e)}"
        print_log("error", error_msg)
        return {"success": False, "error": error_msg}

@mcp.tool()
def get_service_status() -> Dict[str, Any]:
    print_log("info", "收到MCP调用请求 → 获取服务状态", show_in_status=False)
    return {
        "success": True,
        "result": {
            "server_running": global_status["server_running"],
            "serial_connected": global_status["serial_connected"],
            "last_read_time": global_status["last_read_time"],
            "last_data": global_status["last_data"],
            "error_count": global_status["error_count"],
            "total_read_count": global_status["total_read_count"],
            "default_config": {
                "port": DEFAULT_PORT,
                "baudrate": DEFAULT_BAUDRATE,
                "auto_read_interval": AUTO_READ_INTERVAL
            }
        }
    }

def auto_read_loop():
    """后台自动读取串口,持续更新状态"""
    print_log("info", f"启动自动读取线程 → 间隔 {AUTO_READ_INTERVAL} 秒", show_in_status=False)
    while global_status["server_running"]:
        try:
            read_serial_port_core(DEFAULT_PORT, DEFAULT_BAUDRATE, DEFAULT_TIMEOUT)
            time.sleep(AUTO_READ_INTERVAL)
        except KeyboardInterrupt:
            break
        except Exception as e:
            print_log("error", f"自动读取线程异常: {str(e)}")
            time.sleep(AUTO_READ_INTERVAL)

if __name__ == "__main__":
    try:
        global_status["server_running"] = True
        
        print_status()
        print_log("info", "启动SerialReader FastMCP服务器...", show_in_status=False)
        
        auto_read_thread = threading.Thread(target=auto_read_loop, daemon=True)
        auto_read_thread.start()
        
        mcp.run(
            transport="stdio",
            show_banner=SHOW_BANNER  
        )
        
    except KeyboardInterrupt:
        print_log("info", "收到退出信号,正在停止服务...", show_in_status=True)
    except Exception as e:
        print_log("error", f"服务器启动失败: {str(e)}", show_in_status=True)
    finally:
        global_status["server_running"] = False
        print_log("info", "SerialReader服务已停止", show_in_status=True)
        sys.exit(0)

2. 硬件通信链路搭建

(1)二哈识图 2 与 UNO 的连接

  • 二哈识图 2 通过 I2C 接口与 UNO 连接(SDA 接 A4、SCL 接 A5),供电 5V;
  • UNO 通过串口读取二哈识图 2 的表情识别数据;
  • 核心代码(UNO 端):
  • 图片.png
  • 代码如下:

代码
#include "DFRobot_HuskylensV2.h"
// 创建对象
HuskylensV2 huskylens;



// 主程序开始
void setup() {
	Wire.begin();
    while (!huskylens.begin(Wire)) {
        delay(100);
    }
	huskylens.switchAlgorithm(ALGORITHM_EMOTION_RECOGNITION);
    delay(5000);
	Serial.begin(9600);
}
void loop() {
	huskylens.getResult(ALGORITHM_EMOTION_RECOGNITION);

	if (huskylens.available(ALGORITHM_EMOTION_RECOGNITION)) {
		Serial.println(RET_ITEM_STR(huskylens.getCachedCenterResult(ALGORITHM_EMOTION_RECOGNITION), Result, name));
		delay(1000);
	}
}

(2)烧录小智,获取mcp接入点

行空板K10烧录小智 AI 客户端(xiaozhi-client)(具体请查看行空板K10烧录小智AI教程),配置 WiFi 并连接至校园局域网并获取 MCP 接入点;

打开注册的小智控制台,点击设备》配置角色》MCP设置》获取MCP接入点

  1. 图片.png
  2. 图片.png
  3. 图片.png
  4. 图片.png
  5.  
  6. (3)配置角色介绍
  7. 图片.png

我是{{assistant_name}},通过识别您的表情,疏解您的烦恼,排解您的忧虑,与您共同享受开心。请说命令:识别表情,让我看看你今天是否开心呢。

当收到串口情绪数据时:

   首先确认识别结果,并以自然方式提及 “通过串口读取到您当前的情绪为...”
   提供以下结构化心理疏导信息:
       情绪解析(3-4 个关键点,说明该情绪的正常性、常见触发场景、身体反应信号)
       即时舒缓方法(2-3 种简单易操作的调节方式,无需特殊工具)
       深度疏导建议(1-2 个可落地的心理调节步骤,适配青少年 / 普通人群)
       场景化适配(1-2 种对应场景的应对技巧,如学习 / 生活 / 社交场景)
   如果用户补充表达情绪原因、持续时间等信息,请据此调整疏导方向,针对性给出建议
   使用温和、共情、支持性的语气,适当使用温暖的表情符号(如🤍🌿💛)
   如情绪识别模糊或数据不明确,可补充询问 “您当前的感受更接近这种情绪吗?还是有其他不一样的感受呀?”

请避免:

   专业晦涩的心理学术语
   绝对化的情绪判断(如 “你肯定是因为 XX 才难过”)
   否定性表述(如 “别难过了”“这有什么好焦虑的”)
   过度夸大情绪影响或轻描淡写情绪重量

  1. (4)运行fastmcp服务
  2. 使用命令下载mcp示例代码,

git clone https://github.com/78/mcp-calculator

图片.png

 

修改或者加入串口读取数据服务,并修改mcp_config.json内容即可

打开win 10 命令提示符,

图片.png

安装必要的模块

pip install -r requirements.txt

加入MCP接入点

set MCP_ENDPOINT=<从控制台获取的您设备的mcp接入点>

图片.png

启动服务

python python mcp_pipe.py

出现如下标志说明服务启动成功:

图片.png

也可以使用powershell启动,与cmd命令提示窗口不同的是配置环境语句如下:

$env:MCP_ENDPOINT="<小智ai的MCP接入点>"

 

五、效果展示

 

aaf5bb72ee6755f00a0457481a83c2bf.jpg
2431197e32d213d1f3f4286d2586add0.jpg

 

  1.  
  2.  

评论

user-avatar