这次选择的主题是:气象科学与大气污染探究 ,当时就是想体验一把云雀,一冲动就选择了主题一,针对主题一直在思考,最终选择了测试场景为车库。设计思维导图如下:
车库中会有大量的汽车驶入、使出,油车产生有机性挥发物、CO2等有害气体。立体车库相对环境比较封闭,会形成有害气体的堆积,使用空气质量传感器对车库内的空气质量,及时发现对人体有害的情况,进行车库通风;通过P2.5等粉尘的数据记录,及时发现车库粉尘浓度。
车库出入口经常会产生狭管效应,促使风力变大,对车辆行驶造成安全隐患,借助云雀测风力及风向,提醒出入库车辆小心驾驶。
电动车自燃现象时有发生,借助火焰传感器、烟雾传感器及时发现火焰和烟雾,提前进行灭火,提高车库的安全度。
车库密闭的环境比较黑暗,所以需要灯光照明,为了节约能源采用体感照明系统,通过人体红外热释能传感器感应是否有人存在,如果有人灯开启,无人关闭。
通过摄像头观察车库的情况,实现远程控制车库门的开启和关闭。
欲先攻其事必先利器,准备传感器和车库模型、3D打印几辆小汽车,场景如下图。
1.规划连接口,传感器连接口如上图。
2.按照先设计好分三步将ENS160 空气质量传感器安装到云雀的肚子里,如下图。
3.由于行空板的线有点短只能在附近安装相关传感器。
下面来个特写。
4.接下来设计一下topic,用于获取相关信息。
5.设计人工智能部分,除了显示空气中挥发性有机化合物和CO2浓度数值,还要给管理人员建议,除了设计topic还要进行程序设计。
挥发性有机化合物
CO2浓度
6.检测火情,早发现,早处理,通过采用火焰传感器和烟雾传感器配合的方式实现。火焰传感器获取情况及时报警。
查看烟雾数值,因为点了蚊香,所以烟雾数值有点高,请忽略。
7.及时观察车库口风力、风向等信息,提前预防狭管效应。感谢大神的帮助,要不真不知道云雀的关键词,搞了好久才获取到数值。
8.借助摄像头远程控制车库内的灯光及闸机、报警系统,及时发现车库中的问题,报警促使人们及时疏散。
这次专门为摄像头增加了一个进程,图像很流畅,虽然有丢包现象,但是很好了。
9.为了起到灯光节能作用,采用人体感应识别的方式控制灯。
10.下面分享一下源程序开头初始化设置及全部代码
# -*- coding: UTF-8 -*-
# MindPlus
# Python
import base64
from io import BytesIO
from PIL import Image
from pinpong.extension.unihiker import *
from pinpong.board import Board,Pin
from DFRobot_Atmospherlum import *
from dfrobot_rp2040_sci import *
from pinpong.board import Board
from pinpong.board import Servo
from unihiker import Audio
from unihiker import GUI
import siot
import time
import cv2
# 事件回调函数
def u_thread2_function():
while True:
if ((p_p24_in.read_digital()==True) == True):
p_p21_out=Pin(Pin.P21, Pin.OUT)
p_p21_out.write_digital(1)
time.sleep(5)
else:
p_p21_out=Pin(Pin.P21, Pin.OUT)
p_p21_out.write_digital(0)
def u_thread1_function():
while True:
if vd.grab():
ret, grab = vd.read()
siot.publish(topic="siot/camera", data=base642base64(grab))
else:
print("没有下一帧")
def on_message_callback(client, userdata, msg):
if (msg.payload.decode() == (str("on"))):
p_p21_out=Pin(Pin.P21, Pin.OUT)
p_p21_out.write_digital(1)
elif (msg.payload.decode() == (str("off"))):
p_p21_out=Pin(Pin.P21, Pin.OUT)
p_p21_out.write_digital(0)
elif (msg.payload.decode() == (str("don"))):
servo1.write_angle(180)
elif (msg.payload.decode() == (str("doff"))):
servo1.write_angle(90)
elif (msg.payload.decode() == (str("moff"))):
buzzer.stop()
elif (msg.payload.decode() == (str("mon"))):
buzzer.play(buzzer.BADDY,buzzer.ForeverInBackground)
elif (msg.payload.decode() == (str("dangerous"))):
buzzer.play(buzzer.FUNK,buzzer.ForeverInBackground)
Board().begin()
u_gui=GUI()
siot.init(client_id="",server="192.168.24.72",port=1883,user="siot",password="dfrobot")
u_audio = Audio()
siot.connect()
siot.loop()
p_p22_analog=Pin(Pin.P22, Pin.ANALOG)
p_p24_in=Pin(Pin.P24, Pin.IN)
siot.set_callback(on_message_callback)
yunque_i2c = DFRobot_Atmospherlum_I2C(0x42)
while (yunque_i2c.begin() != 0):
print("yunque_i2c initialize failed!!")
time.sleep(1)
print("Sensor initialize success!!")
yunque_i2c.set_local_time()
time.sleep(1)
servo1 = Servo(Pin((Pin.P23)))
FengXiang = { "E":"东","S":"南","W":"西","N":"北","NE":"东北","SE":"东南","SW":"西南","NW":"西北"}
def frame2base64(frame):
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame) #将每一帧转为Image
output_buffer = BytesIO() #创建一个BytesIO
img.save(output_buffer, format='JPEG') #写入output_buffer
byte_data = output_buffer.getvalue() #在内存中读取
base64_data = base64.b64encode(byte_data) #转为BASE64
return base64_data #转码成功 返回base64编码
def base642base64(frame):
data=str('data:image/png;base64,')
base64data = str(frame2base64(frame))
framedata = base64data[2:(len(base64data)-1)]
base642base64_data = data + str(framedata)
return base642base64_data
servo1.write_angle(0)
vd = cv2.VideoCapture()
vd.open(0)
while not (vd.isOpened()):
print("摄像头初始化中……")
u_gui.draw_image(image="123.jpg",x=0,y=0)
SCI1 = DFRobot_RP2040_SCI_IIC(addr=0x21)
while SCI1.begin() != 0:
print("Initialization Sensor Universal Adapter Board failed.")
time.sleep(1)
print("Initialization Sensor Universal Adapter Board done.")
siot.getsubscribe(topic="siot/车库数据传输")
my_variable = { "True":"是","False":"否"}
thread1=u_gui.start_thread(u_thread1_function)
thread2=u_gui.start_thread(u_thread2_function)
while True:
print((str("apple") + str((str("apple") + str((yunque_i2c.get_value("Speed")))))))
siot.publish_save(topic="siot/sound", data=u_audio.sound_level())
siot.publish_save(topic="siot/光线数值", data=light.read())
siot.publish_save(topic="siot/humi", data=SCI1.get_value1(SCI1.ePort3,"Humi_Air"))
siot.publish_save(topic="siot/温度", data=SCI1.get_value1(SCI1.ePort3,"Temp_Air"))
siot.publish_save(topic="siot/pm2.5", data=(str((str(SCI1.get_value1(SCI1.ePort2,"PM1.0")) + str(","))) + str((str((str(SCI1.get_value1(SCI1.ePort2,"PM2.5")) + str(","))) + str(SCI1.get_value1(SCI1.ePort2,"PM10"))))))
siot.publish_save(topic="siot/y空气质量", data=(yunque_i2c.get_value("AQI")))
siot.publish_save(topic="siot/y湿度", data=(yunque_i2c.get_value("Humi")))
siot.publish_save(topic="siot/y风速", data=(yunque_i2c.get_value("speed")))
siot.publish_save(topic="siot/y风向", data=(FengXiang[(yunque_i2c.get_value("dir"))]))
siot.publish_save(topic="siot/y温度", data=(yunque_i2c.get_value("Temp")))
siot.publish_save(topic="siot/y气压", data=(yunque_i2c.get_value("Pressure")))
siot.publish_save(topic="siot/烟雾", data=p_p22_analog.read_analog())
siot.publish_save(topic="siot/体感", data=(my_variable[(str((p_p24_in.read_digital()==True)))]))
CO2 = (int(float((yunque_i2c.get_value("ECO2")))))
if ((CO2 > 400) and (CO2 <= 600)):
siot.publish_save(topic="siot/CO2浓度", data="非常好")
siot.publish_save(topic="siot/CO2建议", data="浓度不超标")
elif ((CO2 > 600) and (CO2 <= 800)):
siot.publish_save(topic="siot/CO2浓度", data="好")
siot.publish_save(topic="siot/CO2建议", data="保持正常")
elif ((CO2 > 800) and (CO2 <= 1000)):
siot.publish_save(topic="siot/CO2浓度", data="一般")
siot.publish_save(topic="siot/CO2建议", data="可以通风")
elif ((CO2 > 1000) and (CO2 <= 1500)):
siot.publish_save(topic="siot/CO2浓度", data="差")
siot.publish_save(topic="siot/CO2建议", data="室内空气污染开启通风")
else:
siot.publish_save(topic="siot/CO2浓度", data="严重")
siot.publish_save(topic="siot/CO2建议", data="室内空气污染严重,开启通风")
if ((int(float(SCI1.get_value1(SCI1.ePort1,"Analog")))) > 2000):
siot.publish_save(topic="siot/火焰", data="请注意!有火情,抓紧灭火")
else:
siot.publish_save(topic="siot/火焰", data="一切正常")
TVOC = (int(float((yunque_i2c.get_value("TVOC")))))
siot.publish_save(topic="siot/TVOC", data=(yunque_i2c.get_value("TVOC")))
if (TVOC > 6000):
siot.publish_save(topic="siot/TVOC建议", data="会引发人体头疼等神经问题")
elif ((TVOC > 750) and (TVOC <= 6000)):
siot.publish_save(topic="siot/TVOC建议", data="会引发人急躁不安和头疼")
elif ((TVOC > 50) and (TVOC <= 750)):
siot.publish_save(topic="siot/TVOC建议", data="会让人急躁不安、不舒服")
elif (50 > TVOC):
siot.publish_save(topic="siot/TVOC建议", data="正常环境")
time.sleep(1)
这一阶段的挑战让我用上了云雀,果真大有乾坤,期待有更好的设备进入中小学教育,让孩子在科学探究中成长。
三春牛-创客2024.02.14
赞赞赞!
三春牛-创客2024.02.14
不错不错
hacker_2023.08.19
666