回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

基于二哈识图2的电动车入梯检测与联动控制系统(持续完善中) 简单

头像 豆爸 2026.02.07 16 0

据统计,近年来超半数电动车火灾发生于室内、楼道充电过程,空间狭小密闭,火焰可在30秒内迅速蔓延,释放的有毒气体足以在1分钟内致人窒息,严重威胁居民生命安全。

dc7a992c426b45b4ba724829656b331f.jpeg~tplv-a9rns2rl98-image_pre_watermark_1_6b.png

传统依赖人力看守、张贴标语的管理方式存在监管盲区、响应滞后、成本高昂等痛点,难以实现全天候有效阻截。

一、系统概述

本系统旨在从源头进行实时识别与干预,解决电动车进入电梯带来的安全隐患。

系统以行空板M10为核心控制器,搭载二哈识图2运行AI模型,对电梯轿厢进行实时监测。

一旦识别到电动车等违规目标,系统将立即执行本地联动(声光报警、控制电梯开门)并拍照取证,同时将事件信息(含时间、照片、类别)自动上报至云端SIoT服务器,形成“识别-干预-记录”的完整管理闭环。

二、系统设计

为实现上述功能,系统采用经典的分层边缘计算架构,将任务清晰划分为感知、控制和云端三个层级,确保高实时性、高可靠性与易维护性。

deepseek_mermaid_20260207_130997.png

感知层(前端识别):由二哈识图2构成,作为系统的“眼睛”,负责实时采集视频流并进行AI推理,精准识别电动车等目标。

控制层(中枢处理):由行空板M10构成,作为系统的“大脑”。它通过I2C总线接收感知层的识别结果,执行核心逻辑判断。一旦确认违规,则立即调度本级资源:通过GPIO口控制继电器实现电梯开门锁定,调用Audio模块播放告警语音,并驱动USB摄像头抓拍现场照片。最后,它将所有信息整合为结构化JSON数据。

云端层(数据服务):即SIoT物联网平台,作为系统的“档案库”。控制层通过Wi-Fi和MQTT协议,将带有时间戳、设备ID、Base64照片和中文类别标签的JSON数据包上报至此。

三、硬件选型与简介

材料清单

硬件介绍

(1)行空板M10

详细信息,见官方WIKI:

https://www.unihiker.com.cn/wiki/m10/jianjie

(2)二哈识图2

5cbf19aa5a99fd3a4c58ce5e02516d83.png

详细信息,见官方WIKI:https://wiki.dfrobot.com.cn/_SKU_SEN0638_Gravity_HUSKYLENS_2_AI_Camera_Vision_Sensor

(3)USB摄像头

downloaded-image.jpg

(4)继电器

fKffcEy47.jpeg

详细信息,见官方WIKI:

https://wiki.dfrobot.com.cn/_SKU_DFR0017_%E6%95%B0%E5%AD%97%E7%BB%A7%E7%94%B5%E5%99%A8%E6%A8%A1%E5%9D%97

(5)USB免驱3W小喇叭

四、硬件接线图

07b2087b6c20505fe30c4c22bc89620c.jpg

五、系统工作流程

系统上电后,进入持续监控循环,其核心工作流程与状态转换如下图所示,清晰展示了从“监控”到“报警联动”再到“恢复”的完整决策过程:

deepseek_mermaid_20260207_943434.png

六、项目实施步骤

步骤1 数据集准备与处理

(1)数据集准备。

 

收集并标注包含'motorcycle'等目标的数据集。我下载到数据集是PASCAL VOC格式,包含三类'person', 'bicycle', 'motorcycle'目标标注。

6a23b6bbc546d2f31c39a094565d3b1d.png

数据集核心结构为:

  • Annotations:XML 格式标注文件(记录目标边界框与类别)
  • JPEGImages:原始图片文件
  • train.txt:训练集文件名列表
  • val.txt: 验证集文件名列表
  • label_list.txt:类别定义文件(含三类目标名称)
  •  

(2)转换数据集。

 

  • 编辑并运行 Python 转换脚本(已优化绝对路径配置,含文件存在性校验),将 PASCAL VOC 格式(XML)转换为 YOLO 格式(txt 标注)。
  •  

代码
import os
import shutil
import xml.etree.ElementTree as ET

# ===================== 1. 配置参数(无需修改,适配你的数据集结构) =====================
XML_DIR = "Annotations"               # VOC标注目录
IMG_DIR = "JPEGImages"                # 图片目录
LABEL_LIST_PATH = "label_list.txt"    # 类别文件
TRAIN_LIST_PATH = "train.txt"    # 训练集列表
VAL_LIST_PATH = "val.txt"        # 验证集列表
# YOLO输出目录(自动创建)
YOLO_ROOT = "yolo_dataset"
YOLO_IMG_TRAIN = os.path.join(YOLO_ROOT, "images", "train")
YOLO_IMG_VAL = os.path.join(YOLO_ROOT, "images", "val")
YOLO_LBL_TRAIN = os.path.join(YOLO_ROOT, "labels", "train")
YOLO_LBL_VAL = os.path.join(YOLO_ROOT, "labels", "val")

# ===================== 2. 初始化(创建目录+读取类别映射) =====================
# 创建YOLO目录结构
for dir_path in [YOLO_IMG_TRAIN, YOLO_IMG_VAL, YOLO_LBL_TRAIN, YOLO_LBL_VAL]:
    os.makedirs(dir_path, exist_ok=True)

# 读取类别并建立ID映射(person=0, bicycle=1, motorcycle=2)
with open(LABEL_LIST_PATH, "r", encoding="utf-8") as f:
    class_names = [line.strip() for line in f if line.strip()]
class_id_map = {name: idx for idx, name in enumerate(class_names)}
print(f"类别ID映射:{class_id_map}")

# ===================== 3. 核心函数:XML转YOLO TXT =====================
def xml_to_yolo(xml_path, img_width, img_height):
    """解析XML,返回YOLO格式的标注内容(归一化坐标)"""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    yolo_lines = []
    
    for obj in root.findall("object"):
        # 获取类别ID(跳过未定义类别)
        obj_name = obj.find("name").text.strip()
        if obj_name not in class_id_map:
            print(f"警告:跳过未定义类别 {obj_name}(XML文件:{xml_path})")
            continue
        class_id = class_id_map[obj_name]
        
        # 解析VOC边界框(xmin, ymin, xmax, ymax)
        bndbox = obj.find("bndbox")
        xmin = int(bndbox.find("xmin").text)
        ymin = int(bndbox.find("ymin").text)
        xmax = int(bndbox.find("xmax").text)
        ymax = int(bndbox.find("ymax").text)
        
        # 转换为YOLO格式(归一化中心坐标+宽高)
        center_x = (xmin + xmax) / (2 * img_width)
        center_y = (ymin + ymax) / (2 * img_height)
        width = (xmax - xmin) / img_width
        height = (ymax - ymin) / img_height
        
        # 保留6位小数,拼接成一行
        yolo_lines.append(f"{class_id} {center_x:.6f} {center_y:.6f} {width:.6f} {height:.6f}\n")
    return yolo_lines

# ===================== 4. 批量处理训练集/验证集 =====================
def process_split(split_list_path, yolo_img_dir, yolo_lbl_dir):
    """根据数据集列表,批量复制图片+生成YOLO标注"""
    with open(split_list_path, "r", encoding="utf-8") as f:
        # 读取所有文件名(无后缀)
        base_names = [line.strip() for line in f if line.strip()]
    
    for base_name in base_names:
        # 1. 复制图片到YOLO images目录
        img_src = os.path.join(IMG_DIR, f"{base_name}.jpg")
        img_dst = os.path.join(yolo_img_dir, f"{base_name}.jpg")
        if not os.path.exists(img_src):
            print(f"警告:图片不存在 {img_src},跳过")
            continue
        shutil.copyfile(img_src, img_dst)
        
        # 2. 解析XML生成YOLO TXT标注
        xml_src = os.path.join(XML_DIR, f"{base_name}.xml")
        lbl_dst = os.path.join(yolo_lbl_dir, f"{base_name}.txt")
        if not os.path.exists(xml_src):
            print(f"警告:标注不存在 {xml_src},跳过")
            continue
        
        # 获取图片宽高(用于坐标归一化)
        img = ET.parse(xml_src).find("size")
        img_w = int(img.find("width").text)
        img_h = int(img.find("height").text)
        
        # 转换并写入YOLO标注
        yolo_content = xml_to_yolo(xml_src, img_w, img_h)
        with open(lbl_dst, "w", encoding="utf-8") as f:
            f.writelines(yolo_content)

# 执行训练集处理
print("正在处理训练集...")
process_split(TRAIN_LIST_PATH, YOLO_IMG_TRAIN, YOLO_LBL_TRAIN)
# 执行验证集处理
print("正在处理验证集...")
process_split(VAL_LIST_PATH, YOLO_IMG_VAL, YOLO_LBL_VAL)

print(f"\n转换完成!YOLO数据集保存至:{YOLO_ROOT}")
print("YOLO目录结构:")
print(f"- {YOLO_ROOT}/images/train:训练集图片")
print(f"- {YOLO_ROOT}/images/val:验证集图片")
print(f"- {YOLO_ROOT}/labels/train:训练集YOLO标注(.txt)")
print(f"- {YOLO_ROOT}/labels/val:验证集YOLO标注(.txt)")

python代码执行后,会生成标注好的YOLO格式的数据集位于yolo_dataset目录下。

转换后自动生成标准 YOLO 数据集目录yolo_dataset,结构如下:

9ED0FA2C-D1F8-415d-8D7F-6364D6D04C14.png

(3)生成 YOLO 训练配置文件(.yaml)。

在yolo_dataset目录下,新建data.yaml文件,内容如下。

D19055AD-EA0A-489b-BEFA-C38282553173.png

(4)压缩数据集。

将yolo_dataset目录下的images文件夹、labels文件夹、data.yaml文件一起压缩为yolo_dataset.zip。

25c5404b7afccde538735bb4b40a3eea.png

步骤2 模型训练与部署(基于 Mind+ V2)

(1)模型训练。

打开Mind+ V2,进入“模型训练”->“目标识别”,选择“上传”。

image.png

选择“有标注数据(YOLO格式)”,选择刚才压缩得到的yoyo_dataset.zip文件上传。

8d308feabf31bc9853fd013c6cd551ca.png

点击“训练模型”,开始训练。

8ed16032e7111c9284f7b70a5fb29ef7.png

训练完成。

b41afdcbc9d6829490abf93f067813f1.png

(2)模型校验。

输入选择“文件”,点击“点击上传文件”,选择yolo_dataset\images\val下图片,进行模型校验。

9c631fde81cbb48756c286c94631f17d.png

图片能正常标记出人、电动车等,说明校验成功。

b188960b2cb51d0d5237260110ea8255.png

(3)模型部署至二哈识图 2

点击“部署至二哈识图2”,选择应用图标,设置应用名称、标题名称。

ddec14222ec796311f1a8fb366bb8416.png

开始“模型转换”,直到“模型转换成功”。

24a91d464f77d3a4a864fad07ca94ca7.png

点击“下载到本地电脑”,选择路径进行保存。

bf4d9d65a59b21c841a6658223fd8f3e.png

将下载的模型,赋值到二哈识图2的 Huskylens>storage > installation_package目录下。

eeb4b34943537f4d9ef9b3ee9a8e050e.png

(4)模型安装。

点击二哈识图2屏幕,选择“模型安装”,“本地安装”。

lQDPKHr7ST4lExfNCCDNEhCw7waC86LUzb4JVfIt_rkdAA_4624_2080.jpg

模型安装成功。

lQDPJxW4f6w055fNCCDNEhCwJvn2liFZjdkJVfI2L1nFAA_4624_2080.jpg

在屏幕上选择“电动车检测”模型,即可使用。

步骤3 SIoT 服务器安装与配置

(1)服务器启动。


解压下载的SIoT V2,双击start SIoT.bat即可启动新版SIoT,

bfb85324462fadb1f94fcd05280aa1db.png


(2)弹出的“Windows安全中心警报”对话框,勾选专用网络和公用网络,否则外部设备可能无法访问。

559a141f0cdaa126db77c1eb2ec8aa76.png


(3)出现下图所示弹窗,siot服务器运行正常。

7B956CD8-DEE4-4b67-91C5-254080E1CCD4.png


(2)服务器登录

在浏览器地址栏输入http://127.0.1:8080或者http://本机ip地址:8080,输入用户名siot,密码dfrobot登录。

18A06A46-98CE-40a7-9542-E207AFC9BB0B.png


(3)新建MQTT主题。

点击“新建主题(Topic)”,新建elevator_project主题。

1E0C8691-C837-4d47-B4C9-EC8A11888C25.png

系统自动生成对应的MQTT主题:siot/elevator_project。

A72792F8-E730-4054-82E5-D2F2290BC5D5.png

步骤4 Mind+ Python 模式用户库安装与程序编写

(1)软件准备。

在电脑安装Mind+软件(V1.8.2或以上),并切换至 “Python模式”。

40E7F711-0066-4806-A941-72365C686F80.png

(2)官方库安装。

在“扩展”面板中,依次选择官方库 中“行空板M10”、“MQTT-py”、“OpenCV”库,进行安装。

35D9DDB5-56AA-4ef9-BDEA-65C1B8369EFC.png

(3)用户库安装。

在用户库中搜索 “HuskyLens 2”,选择进行安装。

61DCDEFC-049D-40de-A65C-EE5ED9FFC9D3.png

同样的方法,搜索“base64”用户库,选择进行安装。

A2690A93-A4B5-4886-94E7-C02DF56BFBA7.png

(4)基础功能程序编写。

screenshots-电动车入梯检测.mp-1770439808793.png

七、完整的程序

在上面图形化程序的基础上继续完善功能:

代码
from unihiker import GUI, Audio
import time
import json
import cv2
import base64
import random
from datetime import datetime
from dfrobot_huskylensv2 import *
from pinpong.board import Board, Pin
import siot

# ---------- 1. 初始化硬件 ----------
Board().begin()
u_gui = GUI()
u_audio = Audio()  # 初始化音频播放模块

# 继电器控制引脚初始化
RELAY_PIN = Pin(Pin.P22, Pin.OUT)
RELAY_PIN.write_digital(0)
print("[INFO] 继电器初始化完成,状态:断开")

# ---------- 2. 初始化二哈识图2 (I2C) ----------
huskylens = HuskylensV2_I2C()
huskylens.knock()
ALGORITHM_CUSTOM_MODEL = 129  # 自训练模型算法ID
huskylens.switchAlgorithm(ALGORITHM_CUSTOM_MODEL)
print("[INFO] 二哈识图2初始化完成,自训练模型已加载。")

# ---------- 3. 初始化USB摄像头 ----------
cap = cv2.VideoCapture(0)
if cap.isOpened():
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    print("[INFO] USB摄像头初始化成功。")
else:
    print("[ERROR] 摄像头初始化失败!")
    cap = None

# ---------- 4. MQTT与标签配置 ----------
SIOT_SERVER = "127.0.0.1"  # 改为实际SIoT服务器IP
SIOT_PORT = 1883
SIOT_USER = "siot"
SIOT_PASSWORD = "dfrobot"
DEVICE_ID = "siot"
PROJECT_NAME = "elevator_project"
TOPIC = f"{DEVICE_ID}/{PROJECT_NAME}"

# 标签中英文映射(匹配你的新标签)
LABEL_MAP = {
    'person': '人',
    'bicycle': '自行车',
    'motorcycle': '电动车',
}

# ---------- 5. 屏幕显示初始化 ----------
title = u_gui.draw_text(text="电梯安全联动监控系统", x=10, y=10, font_size=16, color="#0000FF")
status_label = u_gui.draw_text(text="状态:初始化...", x=10, y=50, font_size=12)
relay_label = u_gui.draw_text(text="继电器:断开", x=10, y=80, font_size=12)
count_label = u_gui.draw_text(text="检测目标:0", x=10, y=110, font_size=12)
last_event_label = u_gui.draw_text(text="最后事件:无", x=10, y=140, font_size=10, color="#666666")

# ---------- 6. 连接SIoT服务器 ----------
def connect_siot():
    try:
        client_id = f"m10_{random.randint(10000, 99999)}"
        siot.init(client_id=client_id, server=SIOT_SERVER, port=SIOT_PORT, 
                  user=SIOT_USER, password=SIOT_PASSWORD)
        siot.connect()
        siot.loop()
        print(f"[INFO] SIoT连接成功: {SIOT_SERVER}")
        status_label.config(text="状态:已连接,监控中")
        status_label.config(color="#008000")
        return True
    except Exception as e:
        print(f"[ERROR] SIoT连接失败: {e}")
        status_label.config(text="状态:网络连接失败")
        status_label.config(color="red")
        return False

siot_connected = connect_siot()

# ---------- 7. 核心功能函数 ----------
def set_relay(state):
    """控制继电器状态 (0:断开/正常, 1:闭合/开门)"""
    RELAY_PIN.write_digital(state)
    state_text = "闭合" if state else "断开"
    color = "red" if state else "green"
    relay_label.config(text=f"继电器:{state_text}")
    relay_label.config(color=color)
    print(f"[RELAY] 状态 -> {state_text}")

def play_alarm():
    """播放告警语音"""
    try:
        u_audio.start_play("/home/pi/record.wav")  # 请确保语音文件路径正确
        print("[AUDIO] 播放告警语音")
    except Exception as e:
        print(f"[ERROR] 播放语音失败: {e}")

def capture_photo():
    """捕获单张现场照片并返回Base64字符串"""
    if cap and cap.isOpened():
        ret, frame = cap.read()
        if ret:
            _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return base64.b64encode(buffer).decode('utf-8')
    return ""

def send_target_data(target_obj, photo_base64, timestamp):
    """为单个目标生成并上报数据到SIoT"""
    if not siot_connected:
        return False
    try:
        label_en = target_obj.name
        label_cn = LABEL_MAP.get(label_en, label_en)
        payload = {
            "time": timestamp,
            "lng": 116.397129,  # 模拟数据,实际项目可接GPS模块
            "lat": 39.916527,
            "photo": photo_base64,
            "deviceId": "1号电梯",
            "damageCategory": label_cn
        }
        siot.publish_save(topic=TOPIC, data=json.dumps(payload, ensure_ascii=False))
        print(f"[SIoT] 上报成功: {label_cn}")
        return True
    except Exception as e:
        print(f"[ERROR] 上报失败: {e}")
        return False

# ---------- 8. 主状态循环 ----------
print("[INFO] 系统启动,进入监控状态...")
electric_vehicle_present = False
departure_timer = 0
DEPARTURE_CONFIRM_TIME = 3  # 电动车离开确认时间(秒)
last_trigger_time = 0
EVENT_COOLDOWN = 10  # 事件上报冷却时间(秒)

while True:
    # 维护MQTT连接
    if siot_connected:
        siot.loop()
    
    # 获取二哈识图2识别结果
    huskylens.getResult(ALGORITHM_CUSTOM_MODEL)
    
    if huskylens.available(ALGORITHM_CUSTOM_MODEL):
        target_count = huskylens.getCachedResultNum(ALGORITHM_CUSTOM_MODEL)
        count_label.config(text=f"检测目标:{target_count}")
        current_ev_detected = False
        detected_targets = []
        
        # 遍历所有识别到的目标
        for i in range(target_count):
            target = huskylens.getCachedResultByIndex(ALGORITHM_CUSTOM_MODEL, i)
            if target:
                if target.name == 'motorcycle':
                    current_ev_detected = True
                if target.name in LABEL_MAP:
                    detected_targets.append(target)
        
        current_time = time.time()
        
        # 状态1:检测到电动车进入
        if current_ev_detected and not electric_vehicle_present:
            print("[ALERT] 电动车进入!触发联动。")
            electric_vehicle_present = True
            departure_timer = 0
            
            # 执行联动动作
            play_alarm()
            set_relay(1)
            
            # 拍照并上报
            if current_time - last_trigger_time > EVENT_COOLDOWN:
                snapshot = capture_photo()
                event_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                for target in detected_targets:
                    if target.name == 'motorcycle':
                        send_target_data(target, snapshot, event_time)
                        last_trigger_time = current_time
                        break
            
            status_label.config(text="状态:电动车入梯,电梯已锁定")
            status_label.config(color="red")
        
        # 状态2:电动车持续存在
        elif current_ev_detected and electric_vehicle_present:
            departure_timer = 0  # 重置离开计时器
            status_label.config(text="状态:电动车仍在电梯内")
        
        # 状态3:电动车离开(检测中)
        elif not current_ev_detected and electric_vehicle_present:
            departure_timer += 0.1
            if departure_timer >= DEPARTURE_CONFIRM_TIME:
                print("[INFO] 电动车已离开,恢复电梯。")
                electric_vehicle_present = False
                set_relay(0)
                status_label.config(text="状态:已恢复,监控中")
                status_label.config(color="green")
                time.sleep(2)
                status_label.config(text="状态:监控中")
                status_label.config(color="black")
    
    time.sleep(0.1)

# 资源释放
if cap:
    cap.release()
print("[INFO] 系统停止。")

评论

user-avatar