据统计,近年来超半数电动车火灾发生于室内、楼道充电过程,空间狭小密闭,火焰可在30秒内迅速蔓延,释放的有毒气体足以在1分钟内致人窒息,严重威胁居民生命安全。

传统依赖人力看守、张贴标语的管理方式存在监管盲区、响应滞后、成本高昂等痛点,难以实现全天候有效阻截。
一、系统概述
本系统旨在从源头进行实时识别与干预,解决电动车进入电梯带来的安全隐患。
系统以行空板M10为核心控制器,搭载二哈识图2运行AI模型,对电梯轿厢进行实时监测。
一旦识别到电动车等违规目标,系统将立即执行本地联动(声光报警、控制电梯开门)并拍照取证,同时将事件信息(含时间、照片、类别)自动上报至云端SIoT服务器,形成“识别-干预-记录”的完整管理闭环。
二、系统设计
为实现上述功能,系统采用经典的分层边缘计算架构,将任务清晰划分为感知、控制和云端三个层级,确保高实时性、高可靠性与易维护性。

感知层(前端识别):由二哈识图2构成,作为系统的“眼睛”,负责实时采集视频流并进行AI推理,精准识别电动车等目标。
控制层(中枢处理):由行空板M10构成,作为系统的“大脑”。它通过I2C总线接收感知层的识别结果,执行核心逻辑判断。一旦确认违规,则立即调度本级资源:通过GPIO口控制继电器实现电梯开门锁定,调用Audio模块播放告警语音,并驱动USB摄像头抓拍现场照片。最后,它将所有信息整合为结构化JSON数据。
云端层(数据服务):即SIoT物联网平台,作为系统的“档案库”。控制层通过Wi-Fi和MQTT协议,将带有时间戳、设备ID、Base64照片和中文类别标签的JSON数据包上报至此。
三、硬件选型与简介
硬件介绍
(1)行空板M10

详细信息,见官方WIKI:
https://www.unihiker.com.cn/wiki/m10/jianjie
(2)二哈识图2

详细信息,见官方WIKI:https://wiki.dfrobot.com.cn/_SKU_SEN0638_Gravity_HUSKYLENS_2_AI_Camera_Vision_Sensor
(3)USB摄像头

(4)继电器

详细信息,见官方WIKI:
(5)USB免驱3W小喇叭

四、硬件接线图

五、系统工作流程
系统上电后,进入持续监控循环,其核心工作流程与状态转换如下图所示,清晰展示了从“监控”到“报警联动”再到“恢复”的完整决策过程:

六、项目实施步骤
步骤1 数据集准备与处理
(1)数据集准备。
收集并标注包含'motorcycle'等目标的数据集。我下载到数据集是PASCAL VOC格式,包含三类'person', 'bicycle', 'motorcycle'目标标注。

数据集核心结构为:
- Annotations:XML 格式标注文件(记录目标边界框与类别)
- JPEGImages:原始图片文件
- train.txt:训练集文件名列表
- val.txt: 验证集文件名列表
- label_list.txt:类别定义文件(含三类目标名称)
(2)转换数据集。
- 编辑并运行 Python 转换脚本(已优化绝对路径配置,含文件存在性校验),将 PASCAL VOC 格式(XML)转换为 YOLO 格式(txt 标注)。
import os
import shutil
import xml.etree.ElementTree as ET
# ===================== 1. 配置参数(无需修改,适配你的数据集结构) =====================
XML_DIR = "Annotations" # VOC标注目录
IMG_DIR = "JPEGImages" # 图片目录
LABEL_LIST_PATH = "label_list.txt" # 类别文件
TRAIN_LIST_PATH = "train.txt" # 训练集列表
VAL_LIST_PATH = "val.txt" # 验证集列表
# YOLO输出目录(自动创建)
YOLO_ROOT = "yolo_dataset"
YOLO_IMG_TRAIN = os.path.join(YOLO_ROOT, "images", "train")
YOLO_IMG_VAL = os.path.join(YOLO_ROOT, "images", "val")
YOLO_LBL_TRAIN = os.path.join(YOLO_ROOT, "labels", "train")
YOLO_LBL_VAL = os.path.join(YOLO_ROOT, "labels", "val")
# ===================== 2. 初始化(创建目录+读取类别映射) =====================
# 创建YOLO目录结构
for dir_path in [YOLO_IMG_TRAIN, YOLO_IMG_VAL, YOLO_LBL_TRAIN, YOLO_LBL_VAL]:
os.makedirs(dir_path, exist_ok=True)
# 读取类别并建立ID映射(person=0, bicycle=1, motorcycle=2)
with open(LABEL_LIST_PATH, "r", encoding="utf-8") as f:
class_names = [line.strip() for line in f if line.strip()]
class_id_map = {name: idx for idx, name in enumerate(class_names)}
print(f"类别ID映射:{class_id_map}")
# ===================== 3. 核心函数:XML转YOLO TXT =====================
def xml_to_yolo(xml_path, img_width, img_height):
"""解析XML,返回YOLO格式的标注内容(归一化坐标)"""
tree = ET.parse(xml_path)
root = tree.getroot()
yolo_lines = []
for obj in root.findall("object"):
# 获取类别ID(跳过未定义类别)
obj_name = obj.find("name").text.strip()
if obj_name not in class_id_map:
print(f"警告:跳过未定义类别 {obj_name}(XML文件:{xml_path})")
continue
class_id = class_id_map[obj_name]
# 解析VOC边界框(xmin, ymin, xmax, ymax)
bndbox = obj.find("bndbox")
xmin = int(bndbox.find("xmin").text)
ymin = int(bndbox.find("ymin").text)
xmax = int(bndbox.find("xmax").text)
ymax = int(bndbox.find("ymax").text)
# 转换为YOLO格式(归一化中心坐标+宽高)
center_x = (xmin + xmax) / (2 * img_width)
center_y = (ymin + ymax) / (2 * img_height)
width = (xmax - xmin) / img_width
height = (ymax - ymin) / img_height
# 保留6位小数,拼接成一行
yolo_lines.append(f"{class_id} {center_x:.6f} {center_y:.6f} {width:.6f} {height:.6f}\n")
return yolo_lines
# ===================== 4. 批量处理训练集/验证集 =====================
def process_split(split_list_path, yolo_img_dir, yolo_lbl_dir):
"""根据数据集列表,批量复制图片+生成YOLO标注"""
with open(split_list_path, "r", encoding="utf-8") as f:
# 读取所有文件名(无后缀)
base_names = [line.strip() for line in f if line.strip()]
for base_name in base_names:
# 1. 复制图片到YOLO images目录
img_src = os.path.join(IMG_DIR, f"{base_name}.jpg")
img_dst = os.path.join(yolo_img_dir, f"{base_name}.jpg")
if not os.path.exists(img_src):
print(f"警告:图片不存在 {img_src},跳过")
continue
shutil.copyfile(img_src, img_dst)
# 2. 解析XML生成YOLO TXT标注
xml_src = os.path.join(XML_DIR, f"{base_name}.xml")
lbl_dst = os.path.join(yolo_lbl_dir, f"{base_name}.txt")
if not os.path.exists(xml_src):
print(f"警告:标注不存在 {xml_src},跳过")
continue
# 获取图片宽高(用于坐标归一化)
img = ET.parse(xml_src).find("size")
img_w = int(img.find("width").text)
img_h = int(img.find("height").text)
# 转换并写入YOLO标注
yolo_content = xml_to_yolo(xml_src, img_w, img_h)
with open(lbl_dst, "w", encoding="utf-8") as f:
f.writelines(yolo_content)
# 执行训练集处理
print("正在处理训练集...")
process_split(TRAIN_LIST_PATH, YOLO_IMG_TRAIN, YOLO_LBL_TRAIN)
# 执行验证集处理
print("正在处理验证集...")
process_split(VAL_LIST_PATH, YOLO_IMG_VAL, YOLO_LBL_VAL)
print(f"\n转换完成!YOLO数据集保存至:{YOLO_ROOT}")
print("YOLO目录结构:")
print(f"- {YOLO_ROOT}/images/train:训练集图片")
print(f"- {YOLO_ROOT}/images/val:验证集图片")
print(f"- {YOLO_ROOT}/labels/train:训练集YOLO标注(.txt)")
print(f"- {YOLO_ROOT}/labels/val:验证集YOLO标注(.txt)")python代码执行后,会生成标注好的YOLO格式的数据集位于yolo_dataset目录下。
转换后自动生成标准 YOLO 数据集目录yolo_dataset,结构如下:

(3)生成 YOLO 训练配置文件(.yaml)。
在yolo_dataset目录下,新建data.yaml文件,内容如下。

(4)压缩数据集。
将yolo_dataset目录下的images文件夹、labels文件夹、data.yaml文件一起压缩为yolo_dataset.zip。

步骤2 模型训练与部署(基于 Mind+ V2)
(1)模型训练。
打开Mind+ V2,进入“模型训练”->“目标识别”,选择“上传”。

选择“有标注数据(YOLO格式)”,选择刚才压缩得到的yoyo_dataset.zip文件上传。

点击“训练模型”,开始训练。

训练完成。

(2)模型校验。
输入选择“文件”,点击“点击上传文件”,选择yolo_dataset\images\val下图片,进行模型校验。

图片能正常标记出人、电动车等,说明校验成功。

(3)模型部署至二哈识图 2
点击“部署至二哈识图2”,选择应用图标,设置应用名称、标题名称。

开始“模型转换”,直到“模型转换成功”。

点击“下载到本地电脑”,选择路径进行保存。

将下载的模型,赋值到二哈识图2的 Huskylens>storage > installation_package目录下。

(4)模型安装。
点击二哈识图2屏幕,选择“模型安装”,“本地安装”。

模型安装成功。

在屏幕上选择“电动车检测”模型,即可使用。
步骤3 SIoT 服务器安装与配置
(1)服务器启动。
解压下载的SIoT V2,双击start SIoT.bat即可启动新版SIoT,

(2)弹出的“Windows安全中心警报”对话框,勾选专用网络和公用网络,否则外部设备可能无法访问。

(3)出现下图所示弹窗,siot服务器运行正常。

(2)服务器登录
在浏览器地址栏输入http://127.0.1:8080或者http://本机ip地址:8080,输入用户名siot,密码dfrobot登录。

(3)新建MQTT主题。
点击“新建主题(Topic)”,新建elevator_project主题。

系统自动生成对应的MQTT主题:siot/elevator_project。

步骤4 Mind+ Python 模式用户库安装与程序编写
(1)软件准备。
在电脑安装Mind+软件(V1.8.2或以上),并切换至 “Python模式”。

(2)官方库安装。
在“扩展”面板中,依次选择官方库 中“行空板M10”、“MQTT-py”、“OpenCV”库,进行安装。

(3)用户库安装。
在用户库中搜索 “HuskyLens 2”,选择进行安装。

同样的方法,搜索“base64”用户库,选择进行安装。

(4)基础功能程序编写。

七、完整的程序
在上面图形化程序的基础上继续完善功能:
from unihiker import GUI, Audio
import time
import json
import cv2
import base64
import random
from datetime import datetime
from dfrobot_huskylensv2 import *
from pinpong.board import Board, Pin
import siot
# ---------- 1. 初始化硬件 ----------
Board().begin()
u_gui = GUI()
u_audio = Audio() # 初始化音频播放模块
# 继电器控制引脚初始化
RELAY_PIN = Pin(Pin.P22, Pin.OUT)
RELAY_PIN.write_digital(0)
print("[INFO] 继电器初始化完成,状态:断开")
# ---------- 2. 初始化二哈识图2 (I2C) ----------
huskylens = HuskylensV2_I2C()
huskylens.knock()
ALGORITHM_CUSTOM_MODEL = 129 # 自训练模型算法ID
huskylens.switchAlgorithm(ALGORITHM_CUSTOM_MODEL)
print("[INFO] 二哈识图2初始化完成,自训练模型已加载。")
# ---------- 3. 初始化USB摄像头 ----------
cap = cv2.VideoCapture(0)
if cap.isOpened():
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
print("[INFO] USB摄像头初始化成功。")
else:
print("[ERROR] 摄像头初始化失败!")
cap = None
# ---------- 4. MQTT与标签配置 ----------
SIOT_SERVER = "127.0.0.1" # 改为实际SIoT服务器IP
SIOT_PORT = 1883
SIOT_USER = "siot"
SIOT_PASSWORD = "dfrobot"
DEVICE_ID = "siot"
PROJECT_NAME = "elevator_project"
TOPIC = f"{DEVICE_ID}/{PROJECT_NAME}"
# 标签中英文映射(匹配你的新标签)
LABEL_MAP = {
'person': '人',
'bicycle': '自行车',
'motorcycle': '电动车',
}
# ---------- 5. 屏幕显示初始化 ----------
title = u_gui.draw_text(text="电梯安全联动监控系统", x=10, y=10, font_size=16, color="#0000FF")
status_label = u_gui.draw_text(text="状态:初始化...", x=10, y=50, font_size=12)
relay_label = u_gui.draw_text(text="继电器:断开", x=10, y=80, font_size=12)
count_label = u_gui.draw_text(text="检测目标:0", x=10, y=110, font_size=12)
last_event_label = u_gui.draw_text(text="最后事件:无", x=10, y=140, font_size=10, color="#666666")
# ---------- 6. 连接SIoT服务器 ----------
def connect_siot():
try:
client_id = f"m10_{random.randint(10000, 99999)}"
siot.init(client_id=client_id, server=SIOT_SERVER, port=SIOT_PORT,
user=SIOT_USER, password=SIOT_PASSWORD)
siot.connect()
siot.loop()
print(f"[INFO] SIoT连接成功: {SIOT_SERVER}")
status_label.config(text="状态:已连接,监控中")
status_label.config(color="#008000")
return True
except Exception as e:
print(f"[ERROR] SIoT连接失败: {e}")
status_label.config(text="状态:网络连接失败")
status_label.config(color="red")
return False
siot_connected = connect_siot()
# ---------- 7. 核心功能函数 ----------
def set_relay(state):
"""控制继电器状态 (0:断开/正常, 1:闭合/开门)"""
RELAY_PIN.write_digital(state)
state_text = "闭合" if state else "断开"
color = "red" if state else "green"
relay_label.config(text=f"继电器:{state_text}")
relay_label.config(color=color)
print(f"[RELAY] 状态 -> {state_text}")
def play_alarm():
"""播放告警语音"""
try:
u_audio.start_play("/home/pi/record.wav") # 请确保语音文件路径正确
print("[AUDIO] 播放告警语音")
except Exception as e:
print(f"[ERROR] 播放语音失败: {e}")
def capture_photo():
"""捕获单张现场照片并返回Base64字符串"""
if cap and cap.isOpened():
ret, frame = cap.read()
if ret:
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
return base64.b64encode(buffer).decode('utf-8')
return ""
def send_target_data(target_obj, photo_base64, timestamp):
"""为单个目标生成并上报数据到SIoT"""
if not siot_connected:
return False
try:
label_en = target_obj.name
label_cn = LABEL_MAP.get(label_en, label_en)
payload = {
"time": timestamp,
"lng": 116.397129, # 模拟数据,实际项目可接GPS模块
"lat": 39.916527,
"photo": photo_base64,
"deviceId": "1号电梯",
"damageCategory": label_cn
}
siot.publish_save(topic=TOPIC, data=json.dumps(payload, ensure_ascii=False))
print(f"[SIoT] 上报成功: {label_cn}")
return True
except Exception as e:
print(f"[ERROR] 上报失败: {e}")
return False
# ---------- 8. 主状态循环 ----------
print("[INFO] 系统启动,进入监控状态...")
electric_vehicle_present = False
departure_timer = 0
DEPARTURE_CONFIRM_TIME = 3 # 电动车离开确认时间(秒)
last_trigger_time = 0
EVENT_COOLDOWN = 10 # 事件上报冷却时间(秒)
while True:
# 维护MQTT连接
if siot_connected:
siot.loop()
# 获取二哈识图2识别结果
huskylens.getResult(ALGORITHM_CUSTOM_MODEL)
if huskylens.available(ALGORITHM_CUSTOM_MODEL):
target_count = huskylens.getCachedResultNum(ALGORITHM_CUSTOM_MODEL)
count_label.config(text=f"检测目标:{target_count}")
current_ev_detected = False
detected_targets = []
# 遍历所有识别到的目标
for i in range(target_count):
target = huskylens.getCachedResultByIndex(ALGORITHM_CUSTOM_MODEL, i)
if target:
if target.name == 'motorcycle':
current_ev_detected = True
if target.name in LABEL_MAP:
detected_targets.append(target)
current_time = time.time()
# 状态1:检测到电动车进入
if current_ev_detected and not electric_vehicle_present:
print("[ALERT] 电动车进入!触发联动。")
electric_vehicle_present = True
departure_timer = 0
# 执行联动动作
play_alarm()
set_relay(1)
# 拍照并上报
if current_time - last_trigger_time > EVENT_COOLDOWN:
snapshot = capture_photo()
event_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for target in detected_targets:
if target.name == 'motorcycle':
send_target_data(target, snapshot, event_time)
last_trigger_time = current_time
break
status_label.config(text="状态:电动车入梯,电梯已锁定")
status_label.config(color="red")
# 状态2:电动车持续存在
elif current_ev_detected and electric_vehicle_present:
departure_timer = 0 # 重置离开计时器
status_label.config(text="状态:电动车仍在电梯内")
# 状态3:电动车离开(检测中)
elif not current_ev_detected and electric_vehicle_present:
departure_timer += 0.1
if departure_timer >= DEPARTURE_CONFIRM_TIME:
print("[INFO] 电动车已离开,恢复电梯。")
electric_vehicle_present = False
set_relay(0)
status_label.config(text="状态:已恢复,监控中")
status_label.config(color="green")
time.sleep(2)
status_label.config(text="状态:监控中")
status_label.config(color="black")
time.sleep(0.1)
# 资源释放
if cap:
cap.release()
print("[INFO] 系统停止。")附件

返回首页
回到顶部


评论