回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页
best-icon

行空板M10扩展板校园监控车 简单

头像 Sunny_zhu 2025.06.18 60 1

一.创作背景

目前市面上主流的监控设备大多为定点监控,固定位置的摄像头视野有限,存在物理遮挡或视角限制,无法覆盖所有区域,导致存在许多安全隐患。因此,本方案计划采用行空板小车搭配摄像头的移动监控模式,方便对一些监控死角进行近距离全方位的检测,排查监控死角可能带来的安全隐患,防患于未然。

 

二.作品简介

本作品的主要功能是:通过遥控端遥控行空板小车的移动,并经由行空板小车实现拍照,再调用人工智能大模型实现对拍照画面的分析解读,并将分析结果通过物联网回传到遥控端,通过遥控端的扬声器进行播报。

 

三.硬件连接

遥控端:行空板、无源音箱小喇叭、手机充电宝(用于给行空板供电)

移动端(小车):行空板、摄像头、电机IO扩展板、金手指扩展板、800mAh电池扩展板、车轮2个、万向轮1个、TT直流减速电机2个(带PH2.0端子线)、车身框架

 

硬件连接示意图如下:

连线示意图.jpg

 

 

四.软件准备

编程软件:mind+ V1.8.1 RC2.0

移动端用到的库文件:

移动端用到的库文件.jpg

遥控端用到的库文件:

遥控端用到的库文件.jpg
遥控端用户库.jpg

 

 

五.制作过程

1.两块行空板的网络连接:两块行空板通过wifi接入同一局域网中。将移动端设置为SIOT服务器。具体的网络信息详见下图:

行空板网络信息示意图.jpg

2.程序设计

(1)移动端程序如下:

主程序如下:

主程序.jpg

主程序完成的主要任务是:MQTT初始化、摄像头初始化、设置移动端行空板初始画面、然后通过变量take_photo_requested、show_live_video循环读取拍照请求及摄像头状态,如果接收到拍照请求,则摄像头开始拍照,并保存图像,然后启动线程1;如果没有接收到拍照请求,同时摄像头空闲,则行空板显示初始画面。

 

事件回调函数(用于定义接收到遥控端指令后,行空板小车执行具体的任务)具体程序如下:

事件回调函数.jpg

截图不是很完整,缺少右转和停止的相关代码,具体同前进后退指令,就不赘述了。

 

线程1程序如下:

线程1.jpg

线程1主要执行的任务是:进行图像理解,并把图像理解的结果回传到SIOT物联网平台,并通过SIOT传回遥控端。

 

这里为图像理解设置线程的原因是:图像理解可能耗时较长,如果放置在事件回调函数中拍照后直接执行有可能阻塞MQTT回调。

 

 

下面展示每个自定义函数的具体代码:

小车运动状态的定义:

小车运动状态的定义.jpg
停止.jpg

摄像头初始化程序如下:

摄像头初始化.jpg

MQTT初始化程序如下:

MQTT初始化.jpg

 

图像理解函数定义如下:

图像理解.jpg

这段程序主要用于调用讯飞开放平台图像理解的API。其中api_key、appid、api_secret需要在讯飞开放平台注册后申请试用。有使用次数和时间限制,使用时需注意。导入的pic_recognition.py文件详见附件。

 

 

(2)遥控端主程序如下:

遥控端主程序.jpg

这段程序主要用于进行MQTT初始化设置、设置遥控端行空板界面的控制按钮。

 

遥控端回调函数的动作指令如下所示:

遥控端回调函数.jpg

这几段程序主要用于按下按钮时,向SIOT发送小车运动/拍照的指令。

 

下面这段程序用于接收SIOT传回的图像识别结果,并进行语音播报。

图像理解语音播报.jpg

3.外观设计及行空板的界面设计

(1)行空板界面设计

行空板界面设计.jpg

(2)小车外观设计

智能车外观设计.jpg

4.作品演示

遥控车展示:https://www.bilibili.com/video/BV1NXNjzSEcz/

图像识别展示:https://www.bilibili.com/video/BV1sQNjznEmL/

 

 

六.总结与展望

 

不足之处:由于时间关系,小车安装不够精细,左轮电机螺丝松动,导致转弯略困难;摄像头初始化偶尔会失败,暂时还没有找到原因,还请各路高手略微指点一二。

 

未来的拓展方向:

1.目前小车还需要接收遥控指令才能运动,未来希望小车能够自动规划路线,实现无人驾驶。

2.移动端能够分析视频录像,并将分析结果上传到遥控端。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码
import _thread as thread
import base64
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from websocket import WebSocketApp
import sys
import os

appid = "XXX"  
api_secret = "XXX"
api_key = "XXX"

imageunderstanding_url = "wss://spark-api.cn-huabei-1.xf-yun.com/v2.1/image"

class Ws_Param:
    def __init__(self, APPID, APIKey, APISecret, imageunderstanding_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(imageunderstanding_url).netloc
        self.path = urlparse(imageunderstanding_url).path
        self.ImageUnderstanding_url = imageunderstanding_url

    def create_url(self):
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
        v = {"authorization": authorization, "date": date, "host": self.host}
        return self.ImageUnderstanding_url + '?' + urlencode(v)

def on_error(ws, error):
    print("### error:", error)
    ws.is_closed = True

def on_close(ws, *args):
    print("Connection closed")
    ws.is_closed = True

def on_open(ws):
    def run(*args):
        # 使用当前问题构建消息
        messages = [
            {
                "role": "user",
                "content": base64.b64encode(ws.imagedata).decode('utf-8'),
                "content_type": "image",
                "image_param": {"image_type": "PNG"}
            },
            {
                "role": "user",
                "content": ws.current_question,
                "content_type": "text"
            }
        ]
        data = json.dumps(gen_params(appid=ws.appid, messages=messages))
        ws.send(data)
    thread.start_new_thread(run, ())

def on_message(ws, message):
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
        print(f'Error: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content, end="")
        ws.answer += content
        if status == 2:
            print()  
            ws.close()

def gen_params(appid, messages):
    return {
        "header": {"app_id": appid},
        "parameter": {
            "chat": {
                "domain": "imagev3",
                "temperature": 0.5,
                "top_k": 4,
                "max_tokens": 2028,
                "auditing": "default"
            }
        },
        "payload": {"message": {"text": messages}}
    }

def main(appid, api_key, api_secret, imageunderstanding_url, question, image_path):
    # 读取图像数据
    try:
        if not os.path.exists(image_path):
            print(f"错误:图片文件 {image_path} 不存在")
            return f"识别失败:图片文件不存在 - {image_path}"
            
        with open(image_path, 'rb') as f:
            imagedata = f.read()
    except Exception as e:
        print(f"读取图片文件失败: {str(e)}")
        return f"识别失败:无法读取图片 - {str(e)}"
    
    wsParam = Ws_Param(appid, api_key, api_secret, imageunderstanding_url)
    wsUrl = wsParam.create_url()
    
    ws = websocket.WebSocketApp(
        wsUrl,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        on_open=on_open
    )
    
    # 添加自定义属性
    ws.appid = appid
    ws.current_question = question
    ws.imagedata = imagedata  # 传递图像数据
    ws.answer = ""
    ws.is_closed = False  # 跟踪连接状态
    
    print("Sending request...")
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
    
    # 确保连接关闭
    if not ws.is_closed:
        ws.close()
    
    return ws.answer

if __name__ == '__main__':
    # 示例调用
    image_path = "Mind+.png"
    question = "请说出这张照片中有哪些内容?"
    
    print(f"Question: {question}")
    print("Answer:", end="")
    
    answer = main(
        appid=appid,
        api_key=api_key,
        api_secret=api_secret,
        imageunderstanding_url=imageunderstanding_url,
        question=question,
        image_path=image_path
    )
    
    print("\nAPI调用完成,程序结束")

附件

评论

user-avatar
  • 寸进

    寸进2025.06.24

    优秀的案例赞

    0