一、【项目背景】
中小学阶段是青少年心理发展的关键期,学业压力、同伴关系、亲子沟通等问题易引发焦虑、低落、烦躁等负面情绪,而校园内专业心理疏导资源有限,学生往往因羞于表达错过及时干预。基于此,我们结合二哈识图 2 AI 视觉传感器、Arduino UNO 主控板与小智 AI,依托 FastMCP 协议打造「学生心理疏导员」。该设备部署于校园心理角 / 班级角落,为学生提供低成本、无压力的即时心理支持,填补校园心理关怀的即时性缺口。

二、【项目核心目标】
- 1.基于二哈识图 2 的表情识别能力,精准识别学生常见情绪(焦虑、低落、开心、烦躁、委屈等);
- 2.依托 FastMCP 协议打通二哈识图 2、UNO 与小智 AI 的通信链路,实现情绪数据实时传输;
- 3.支持学生主动触发深度对话,小智 AI 可引导学生表达情绪原因,提供可落地的调节建议。
三、【技术栈与硬件清单】
1. 硬件清单



2. 技术栈
- 通信协议:FastMCP(Model Context Protocol)—— 打通 UNO、二哈识图 2 与小智 AI 的核心协议,实现跨设备数据标准化传输;
- 视觉识别:二哈识图 2 内置表情识别算法(无需额外训练,支持自定义情绪阈值校准);
- 语音交互:小智 AI 原生语音引擎(支持青少年友好的语音合成、中文对话解析);
- 数据传输:串口通信(二哈识图 2→UNO、UNO→ESP32)、WebSocket(FastMCP 客户端→小智 AI 云端)。
四、【核心功能实现】
1. FastMCP Server 扩展:情绪识别与疏导工具
基于小智 AI 开源代码扩展 FastMCP Server,新增 2 个核心工具函数,实现「读取串口列表」「读取串口数据」,核心代码逻辑(工具注册)如下:
from fastmcp import FastMCP
import sys
import serial
import serial.tools.list_ports
from typing import Optional, Dict, Any
from datetime import datetime
import threading
import time
import os
DEFAULT_PORT = "COM13" # 默认串口
DEFAULT_BAUDRATE = 9600 # 默认波特率
DEFAULT_TIMEOUT = 2.0 # 串口超时时间
AUTO_READ_INTERVAL = 1.0 # 自动读取间隔(秒)
SHOW_BANNER = True # 是否显示FastMCP启动横幅
# =======================================================================
# 修复Windows CMD编码问题(解决中文乱码)
if sys.platform == 'win32':
os.system("chcp 65001 >nul") # 切换CMD编码为UTF-8
sys.stdout.reconfigure(encoding='utf-8', line_buffering=True)
sys.stderr.reconfigure(encoding='utf-8', line_buffering=True)
# 全局状态变量(用于可视化执行状态)
global_status = {
"server_running": False,
"serial_connected": False,
"last_read_time": None,
"last_data": "",
"error_count": 0,
"total_read_count": 0
}
def print_status():
"""打印格式化的程序状态(CMD界面可视化)"""
# 清屏(保持界面整洁)
os.system("cls" if sys.platform == 'win32' else "clear")
# 打印标题
print("="*80)
print(" SerialReader - FastMCP 串口服务 (实时监控)")
print("="*80)
# 打印核心状态
print(f"[服务状态] FastMCP服务器: {'✅ 运行中' if global_status['server_running'] else '❌ 未启动'}")
print(f"[串口状态] {DEFAULT_PORT} 连接: {'✅ 已连接' if global_status['serial_connected'] else '❌ 未连接'}")
print(f"[读取统计] 总读取次数: {global_status['total_read_count']} | 错误次数: {global_status['error_count']}")
print(f"[最后读取] 时间: {global_status['last_read_time'] or '无'} | 数据: {global_status['last_data'] or '无'}")
print("-"*80)
print("[实时日志] (按 Ctrl+C 退出)")
print("-"*80)
def print_log(level: str, message: str, show_in_status: bool = True):
"""
打印带时间戳的日志(适配CMD可视化)
:param level: 日志级别 (info/error/warn)
:param message: 日志内容
:param show_in_status: 是否在状态界面显示
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
level_icon = {
"info": "ℹ️",
"error": "❌",
"warn": "⚠️"
}.get(level.lower(), "ℹ️")
log_msg = f"{timestamp} {level_icon} {message}"
# 如果需要在状态界面显示,先更新状态再打印
if show_in_status:
print_status()
print(log_msg)
else:
# 仅打印日志(不刷新状态)
print(log_msg)
def validate_serial_port(port: str) -> bool:
"""验证串口是否存在,并打印可用串口列表"""
try:
available_ports = [p.device for p in serial.tools.list_ports.comports()]
print_log("info", f"检测到可用串口: {available_ports}", show_in_status=False)
return port in available_ports
except Exception as e:
print_log("error", f"获取串口列表失败: {str(e)}")
return False
def read_serial_port_core(port: str, baudrate: int, timeout: float) -> Dict[str, Any]:
"""串口读取核心逻辑(独立函数,便于复用)"""
global global_status
ser = None
try:
# 验证串口
if not validate_serial_port(port):
global_status["serial_connected"] = False
global_status["error_count"] += 1
return {
"success": False,
"error": f"串口{port}不存在,请检查设备连接"
}
# 打开串口
ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
xonxoff=False,
rtscts=False,
dsrdtr=False
)
if not ser.is_open:
ser.open()
global_status["serial_connected"] = True
print_log("info", f"成功连接串口 {port} (波特率: {baudrate})", show_in_status=False)
# 清空缓冲区,避免读取历史数据
ser.flushInput()
time.sleep(0.1)
# 读取数据(按行读取,自动处理换行符)
raw_data = ser.readline()
if not raw_data:
return {
"success": True,
"result": "",
"port_info": {"port": port, "baudrate": baudrate, "status": "connected"}
}
# 解码数据(兼容UTF-8/GBK)
try:
data = raw_data.decode('utf-8').strip()
except:
data = raw_data.decode('gbk', errors='ignore').strip()
# 更新全局状态
global_status["last_read_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
global_status["last_data"] = data
global_status["total_read_count"] += 1
print_log("info", f"读取到串口数据: [{data}]", show_in_status=False)
return {
"success": True,
"result": data,
"port_info": {"port": port, "baudrate": baudrate, "status": "connected"}
}
except serial.SerialException as e:
global_status["serial_connected"] = False
global_status["error_count"] += 1
error_msg = f"串口操作失败: {str(e)}"
if "AccessDenied" in str(e):
error_msg += " (串口被占用,请关闭Putty/其他串口工具)"
print_log("error", error_msg)
return {"success": False, "error": error_msg}
except Exception as e:
global_status["serial_connected"] = False
global_status["error_count"] += 1
error_msg = f"未知错误: {str(e)}"
print_log("error", error_msg)
return {"success": False, "error": error_msg}
finally:
if ser and ser.is_open:
ser.close()
print_log("info", f"已关闭串口 {port}", show_in_status=False)
global_status["serial_connected"] = False
mcp = FastMCP("SerialReader")
@mcp.tool()
def read_serial_port(
port: str = DEFAULT_PORT,
baudrate: int = DEFAULT_BAUDRATE,
timeout: float = DEFAULT_TIMEOUT,
read_line: bool = True # 保留兼容参数
) -> Dict[str, Any]:
"""
FastMCP标准工具:读取指定串口数据
:param port: 串口名称 (如COM13)
:param baudrate: 波特率
:param timeout: 读取超时时间
:param read_line: 按行读取(固定为True,兼容旧调用)
:return: 读取结果(含状态/数据/错误信息)
"""
print_log("info", f"收到MCP调用请求 → 读取串口 {port} (波特率: {baudrate})", show_in_status=False)
return read_serial_port_core(port, baudrate, timeout)
@mcp.tool()
def list_serial_ports() -> Dict[str, Any]:
"""FastMCP标准工具:列出所有可用串口"""
try:
ports = []
for p in serial.tools.list_ports.comports():
ports.append({
"device": p.device,
"name": p.name,
"description": p.description,
"manufacturer": p.manufacturer or "未知",
"product": p.product or "未知"
})
print_log("info", f"MCP调用 → 列出可用串口: {[p['device'] for p in ports]}", show_in_status=False)
return {
"success": True,
"result": ports
}
except Exception as e:
error_msg = f"列出串口失败: {str(e)}"
print_log("error", error_msg)
return {"success": False, "error": error_msg}
@mcp.tool()
def get_service_status() -> Dict[str, Any]:
print_log("info", "收到MCP调用请求 → 获取服务状态", show_in_status=False)
return {
"success": True,
"result": {
"server_running": global_status["server_running"],
"serial_connected": global_status["serial_connected"],
"last_read_time": global_status["last_read_time"],
"last_data": global_status["last_data"],
"error_count": global_status["error_count"],
"total_read_count": global_status["total_read_count"],
"default_config": {
"port": DEFAULT_PORT,
"baudrate": DEFAULT_BAUDRATE,
"auto_read_interval": AUTO_READ_INTERVAL
}
}
}
def auto_read_loop():
"""后台自动读取串口,持续更新状态"""
print_log("info", f"启动自动读取线程 → 间隔 {AUTO_READ_INTERVAL} 秒", show_in_status=False)
while global_status["server_running"]:
try:
read_serial_port_core(DEFAULT_PORT, DEFAULT_BAUDRATE, DEFAULT_TIMEOUT)
time.sleep(AUTO_READ_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
print_log("error", f"自动读取线程异常: {str(e)}")
time.sleep(AUTO_READ_INTERVAL)
if __name__ == "__main__":
try:
global_status["server_running"] = True
print_status()
print_log("info", "启动SerialReader FastMCP服务器...", show_in_status=False)
auto_read_thread = threading.Thread(target=auto_read_loop, daemon=True)
auto_read_thread.start()
mcp.run(
transport="stdio",
show_banner=SHOW_BANNER
)
except KeyboardInterrupt:
print_log("info", "收到退出信号,正在停止服务...", show_in_status=True)
except Exception as e:
print_log("error", f"服务器启动失败: {str(e)}", show_in_status=True)
finally:
global_status["server_running"] = False
print_log("info", "SerialReader服务已停止", show_in_status=True)
sys.exit(0)
2. 硬件通信链路搭建
(1)二哈识图 2 与 UNO 的连接
- 二哈识图 2 通过 I2C 接口与 UNO 连接(SDA 接 A4、SCL 接 A5),供电 5V;
- UNO 通过串口读取二哈识图 2 的表情识别数据;
- 核心代码(UNO 端):

- 代码如下:
#include "DFRobot_HuskylensV2.h"
// 创建对象
HuskylensV2 huskylens;
// 主程序开始
void setup() {
Wire.begin();
while (!huskylens.begin(Wire)) {
delay(100);
}
huskylens.switchAlgorithm(ALGORITHM_EMOTION_RECOGNITION);
delay(5000);
Serial.begin(9600);
}
void loop() {
huskylens.getResult(ALGORITHM_EMOTION_RECOGNITION);
if (huskylens.available(ALGORITHM_EMOTION_RECOGNITION)) {
Serial.println(RET_ITEM_STR(huskylens.getCachedCenterResult(ALGORITHM_EMOTION_RECOGNITION), Result, name));
delay(1000);
}
}
(2)烧录小智,获取mcp接入点
行空板K10烧录小智 AI 客户端(xiaozhi-client)(具体请查看行空板K10烧录小智AI教程),配置 WiFi 并连接至校园局域网并获取 MCP 接入点;
打开注册的小智控制台,点击设备》配置角色》MCP设置》获取MCP接入点




- (3)配置角色介绍

我是{{assistant_name}},通过识别您的表情,疏解您的烦恼,排解您的忧虑,与您共同享受开心。请说命令:识别表情,让我看看你今天是否开心呢。
当收到串口情绪数据时:
首先确认识别结果,并以自然方式提及 “通过串口读取到您当前的情绪为...”
提供以下结构化心理疏导信息:
情绪解析(3-4 个关键点,说明该情绪的正常性、常见触发场景、身体反应信号)
即时舒缓方法(2-3 种简单易操作的调节方式,无需特殊工具)
深度疏导建议(1-2 个可落地的心理调节步骤,适配青少年 / 普通人群)
场景化适配(1-2 种对应场景的应对技巧,如学习 / 生活 / 社交场景)
如果用户补充表达情绪原因、持续时间等信息,请据此调整疏导方向,针对性给出建议
使用温和、共情、支持性的语气,适当使用温暖的表情符号(如🤍🌿💛)
如情绪识别模糊或数据不明确,可补充询问 “您当前的感受更接近这种情绪吗?还是有其他不一样的感受呀?”
请避免:
专业晦涩的心理学术语
绝对化的情绪判断(如 “你肯定是因为 XX 才难过”)
否定性表述(如 “别难过了”“这有什么好焦虑的”)
过度夸大情绪影响或轻描淡写情绪重量
- (4)运行fastmcp服务
- 使用命令下载mcp示例代码,
git clone https://github.com/78/mcp-calculator

修改或者加入串口读取数据服务,并修改mcp_config.json内容即可
打开win 10 命令提示符,

安装必要的模块
pip install -r requirements.txt
加入MCP接入点
set MCP_ENDPOINT=<从控制台获取的您设备的mcp接入点>

启动服务
python python mcp_pipe.py
出现如下标志说明服务启动成功:

也可以使用powershell启动,与cmd命令提示窗口不同的是配置环境语句如下:
$env:MCP_ENDPOINT="<小智ai的MCP接入点>"
五、效果展示



返回首页
回到顶部








评论