回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

基于AI的校园气象台 中等

头像 市一中 2025.06.27 15 0

基于AI的校园气象台

作品集成云雀气象仪实时采集风向、风速、温度、湿度、气压等基础数据;行空板搭载双芯片模块:监测室内外光强并同步显示气象数据,定位模块通过GPS精准定位经纬度;可视化面板动态呈现七大类深度分析结果(含气候特征、突变检验、适生作物推荐等),核心数据经DeepSeek AI模型处理生成决策建议。系统将硬件传感、定位技术与人工智能深度融合,为校园提供实时气象监测、历史规律分析和未来预测服务,兼具科学性与实用性,是中学生创新实践与气象科普教育的典范。

创意说明 

1. 三层架构创新:  

  感知层(云雀+光强/GPS传感器)→ 控制层(芯控板双芯片处理)→ 应用层(DeepSeek AI分析+可视化面板)  

2. AI深度赋能:  

   利用DeepSeek模型实现从基础监测到农业推荐(如经济作物优化)、健康防护(热指数评估)、生态分析(植被生产力)的跨越;  

3. 多时空维度融合:  

   -实时数据(GPS定位/紫外线指数) + 历史检验(1990-2025年气候突变) + 预测服务(ECMWF/CMA未来7天预报);  

4. 科普沉浸式体验:  

   -可视化面板将专业气候参数(如边界层特征、土壤参数)转化为图表与互动建议,降低中学生理解门槛。

三、创作目的与方法  

1、目的:  

- 构建校园级“监测-分析-决策”气象闭环系统,服务科学课程与课外实践;  

- 通过AI简化专业气象分析流程,助力新疆特色农业与生态保护科普教育;  

- 培养学生“硬件搭建+数据分析+跨学科应用”的综合能力。  

2、方法:  

1. 硬件组网:  

   - 云雀气象仪采集环境参数 → 芯控板芯片一处理光强/显示数据 → 芯片二输出经纬度定位;  

2. AI模型部署:  

   - 本地气象数据 + 公开历史数据集 → 输入DeepSeek模型执行七大分析模块;  

3. 交互设计:  

   - 可视化面板分区块展示实时数据、气候分析图表及图文建议(如适生作物种植方案)。

 

 微信图片_20250627192519.jpg

 

代码
#  -*- coding: UTF-8 -*-

# MindPlus
# Python
import sys
sys.path.append("/root/mindplus/.lib/thirdExtension/nick-base64-thirdex")
sys.path.append("/root/mindplus/.lib/thirdExtension/mengchangfeng-deepseek-thirdex")
sys.path.append("/root/mindplus/.lib/thirdExtension/liliang-gravitygnss-thirdex")
sys.path.append("/root/mindplus/.lib/thirdExtension/liliang-yunque-thirdex")
import cv2
import siot
import time
from unihiker import GUI
from unihiker import Audio
from pinpong.board import Board
from DFRobot_Atmospherlum import *
from pinpong.board import Board,Pin
from pinpong.extension.unihiker import *
import base64
from io import BytesIO
from PIL import Image
import requests
import json
from pathlib import Path
import base64
import sys
sys.path.append("/root/mindplus/.lib/thirdExtension/liliang-gravitygnss-thirdex")
from DFRobot_GNSS import *



class LLMChat:
    def __init__(self):
        base_url = "https://api.deepseek.com".rstrip('/')
        if base_url.endswith('/chat/completions'):
            self.api_url = base_url
        else:
            self.api_url = f"{base_url}/chat/completions"
        self.api_key = "sk-0d83cef0324642be8d19ad73cc9ec2f8"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }
        self.messages = [{"role": "user", "content": "回答简短,不要出现emoji表情"}]
        self.model = "deepseek-chat"
    def dfrobot_chat_image(self, image_path):
        try:
            with open(image_path, "rb") as image_file:
                base64_image = base64.b64encode(image_file.read()).decode('utf-8')

            image_content = {
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/png;base64,{base64_image}"
                }
            }
            self.messages.append({
                "role": "user",
                "content": [image_content]
            })
        except Exception as e:
            print("图片处理失败: {str(e)}")

    def dfrobot_chat(self, user_input):

        self.messages.append({"role": "user", "content": user_input})

        payload = {
            "model": self.model,
            "messages": self.messages
        }
        try:
            response = requests.post(
                url=self.api_url,
                headers=self.headers,
                data=json.dumps(payload)
            )
            if response.status_code == 200:
                response_data = response.json()
                ai_response = response_data['choices'][0]['message']['content']
                self.messages.append({"role": "assistant", "content": ai_response})
                return ai_response
            else:
                return f"Error: {response.status_code} - {response.text}"
        except Exception as e:
            return f"Exception: {str(e)}"


bot = LLMChat()

# 自定义函数
def FengXiangPanDuan():
    if ((str((yunque_i2c.get_value("dir")))).find("W")!=-1):
        siot.publish_save(topic="siot/fengxiang", data=(str("西")))
    if ((str((yunque_i2c.get_value("dir")))).find("S")!=-1):
        siot.publish_save(topic="siot/fengxiang", data=(str("南")))
    if ((str((yunque_i2c.get_value("dir")))).find("E")!=-1):
        siot.publish_save(topic="siot/fengxiang", data=(str("东")))
    if ((str((yunque_i2c.get_value("dir")))).find("N")!=-1):
        siot.publish_save(topic="siot/fengxiang", data=(str("北")))
    if (((str((yunque_i2c.get_value("dir")))).find("S")!=-1) and ((str((yunque_i2c.get_value("dir")))).find("E")!=-1)):
        siot.publish_save(topic="siot/fengxiang", data=(str("东南")))
    if (((str((yunque_i2c.get_value("dir")))).find("S")!=-1) and ((str((yunque_i2c.get_value("dir")))).find("W")!=-1)):
        siot.publish_save(topic="siot/fengxiang", data=(str("西南")))
    if (((str((yunque_i2c.get_value("dir")))).find("N")!=-1) and ((str((yunque_i2c.get_value("dir")))).find("E")!=-1)):
        siot.publish_save(topic="siot/fengxiang", data=(str("东北")))
    if (((str((yunque_i2c.get_value("dir")))).find("N")!=-1) and ((str((yunque_i2c.get_value("dir")))).find("W")!=-1)):
        siot.publish_save(topic="siot/fengxiang", data=(str("西北")))

# 事件回调函数
def on_buttona_click_callback():
    siot.publish_save(topic="siot/dk", data=bot.dfrobot_chat((str("这是用云雀气象仪和gnss模块测出来的经纬度和天气数据,经度76.150447,纬度39.702306这是第一次测量的数据经度在76这个范围内,纬度在39这个范围内,gnss测不出来你就用这个数据,经纬度必须要准确,你先在根据这些数据帮我判断这是哪个地区气候信息,然后跟这个地区前几年的气候信息比较跟我说有哪些变化,还有在这个气候信息的前提条件下给我说在这个地区可以种植哪些农作物,水果,植物可以合适等等,然后再跟我说这个地区的气候特点,然后出门要注意什么,什么时候出门最合适,以及给我预测往来7天内的准确气候信息,比如下雨,下雪,刮风,晴天,多云这类的,最前面要加上准确的年月日和现在给你提供的风速,风向,温度,湿度等等这些数据也加上,必须要详细的分析每一个问题以及数据") + str((str((str((str((str((str((str((str((str((yunque_i2c.get_value("Humi"))) + str((yunque_i2c.get_value("dir"))))) + str((yunque_i2c.get_value("Pressure"))))) + str((yunque_i2c.get_value("Temp"))))) + str((yunque_i2c.get_value("speed"))))) + str((str(gnss_lat_lon.latitude_degree))))) + str((str(gnss_lat_lon.lonitude_degree))))) + str((str(gnss_utc.year)+'/'+str(gnss_utc.month)+'/'+str(gnss_utc.date))))) + str((yunque_i2c.get_unit("speed"))))))))


u_gui=GUI()
Board().begin()
siot.init(client_id="6792698134135706",server="192.168.1.105",port=1883,user="siot",password="dfrobot")
u_audio = Audio()
u_gui.on_a_click(on_buttona_click_callback)
siot.connect()
siot.loop()
gnss = DFRobot_GNSS_I2C (bus=0, addr=0x20)
while (gnss.begin() == False):
    print("Sensor initialize failed!!")
    time.sleep(1)
gnss.enable_power()
gnss.set_gnss(GPS_BeiDou_GLONASS)
gnss.rgb_on()
gnss_utc = struct_utc_tim()
gnss_lat_lon = struct_lat_lon()

yunque_i2c = DFRobot_Atmospherlum_I2C(0x42)
while (yunque_i2c.begin() != 0):
    print("yunque_i2c initialize failed!!")
    time.sleep(1)
print("Sensor initialize success!!")
yunque_i2c.set_local_time()
time.sleep(1)


def frame2base64(frame):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(frame) #将每一帧转为Image
    output_buffer = BytesIO() #创建一个BytesIO
    img.save(output_buffer, format='JPEG') #写入output_buffer
    byte_data = output_buffer.getvalue() #在内存中读取
    base64_data = base64.b64encode(byte_data) #转为BASE64
    return base64_data #转码成功 返回base64编码

def base642base64(frame):
    data=str('data:image/png;base64,')
    base64data = str(frame2base64(frame))
    framedata = base64data[2:(len(base64data)-1)]
    base642base64_data = data + str(framedata)
    return base642base64_data
vd = cv2.VideoCapture()
vd.open(0)
while not (vd.isOpened()):
    pass
vd.set(cv2.CAP_PROP_FRAME_WIDTH,240)
vd.set(cv2.CAP_PROP_FRAME_HEIGHT,320)
时间=u_gui.draw_text(text="",x=0,y=0,font_size=12, color="#0000FF")
风向=u_gui.draw_text(text="",x=0,y=20,font_size=15, color="#FF0000")
风速=u_gui.draw_text(text="",x=0,y=40,font_size=15, color="#0000FF")
温度=u_gui.draw_text(text="",x=0,y=60,font_size=15, color="#FF0000")
湿度=u_gui.draw_text(text="",x=0,y=80,font_size=15, color="#0000FF")
气压=u_gui.draw_text(text="",x=0,y=100,font_size=15, color="#FF0000")
声音=u_gui.draw_text(text="",x=0,y=120,font_size=15, color="#0000FF")
光线=u_gui.draw_text(text="",x=0,y=140,font_size=15, color="#FF0000")
经纬度=u_gui.draw_text(text="",x=0,y=160,font_size=15, color="#FF0000")
时间=u_gui.draw_text(text="",x=0,y=180,font_size=12, color="#000000")
while True:
    if vd.grab():
        ret, grab = vd.read()
    while True:
        siot.publish_save(topic="siot/tuxiang", data=base642base64(grab))
        time.sleep(1)
        while True:
            FengXiangPanDuan()
            gnss_utc = gnss.get_date()
            gnss_utc = gnss.get_utc()
            gnss_lat_lon = gnss.get_lat()
            gnss_lat_lon = gnss.get_lon()
            gnss_alt = gnss.get_alt()
            gnss_cog = gnss.get_cog()
            gnss_sog = gnss.get_sog()
            gnss_num = gnss.get_num_sta_used()
            gnss_star = gnss.get_gnss_mode()
            经纬度.config(text=(str((str(gnss_lat_lon.lonitude_degree))) + str((str(",") + str((str(gnss_lat_lon.latitude_degree)))))))
            siot.publish_save(topic="siot/jwdu", data=(str((str(gnss_lat_lon.lonitude_degree))) + str((str(",") + str((str(gnss_lat_lon.latitude_degree)))))))
            时间.config(text=(str((str(gnss_utc.year)+'/'+str(gnss_utc.month)+'/'+str(gnss_utc.date))) + str((str(gnss_utc.year)+'/'+str(gnss_utc.month)+'/'+str(gnss_utc.date)))))
            siot.publish_save(topic="siot/shijian", data=(str((str(gnss_utc.year)+'/'+str(gnss_utc.month)+'/'+str(gnss_utc.date))) + str((str(gnss_utc.year)+'/'+str(gnss_utc.month)+'/'+str(gnss_utc.date)))))
            声音.config(text=u_audio.sound_level())
            siot.publish_save(topic="siot/shengyin", data=u_audio.sound_level())
            光线.config(text=light.read())
            siot.publish_save(topic="siot/guangxian", data=light.read())
            ShiJian = (yunque_i2c.get_time_stamp())
            FengSu = (str((yunque_i2c.get_value("speed"))) + str((yunque_i2c.get_unit("speed"))))
            siot.publish_save(topic="siot/fengsu", data=(yunque_i2c.get_value("speed")))
            WenDu = (str((yunque_i2c.get_value("Temp"))) + str((yunque_i2c.get_unit("Temp"))))
            siot.publish_save(topic="siot/wendu", data=(yunque_i2c.get_value("Temp")))
            ShiDu = (str((yunque_i2c.get_value("Humi"))) + str((yunque_i2c.get_unit("Humi"))))
            siot.publish_save(topic="siot/shidu", data=(yunque_i2c.get_value("Humi")))
            QiYa = (str((yunque_i2c.get_value("Pressure"))) + str((yunque_i2c.get_unit("Pressure"))))
            siot.publish_save(topic="siot/qiya", data=(yunque_i2c.get_value("Pressure")))
            siot.publish_save(topic="siot/fengxiang", data=(yunque_i2c.get_value("dir")))
            时间.config(text=(str("时间") + str(ShiJian)))
            风速.config(text=(str("风速") + str(FengSu)))
            风向.config(text=(str("风向") + str((yunque_i2c.get_value("dir")))))
            温度.config(text=(str("温度") + str(WenDu)))
            湿度.config(text=(str("湿度") + str(ShiDu)))
            气压.config(text=(str("气压") + str(QiYa)))
            time.sleep(120)
            print("发送成功")

评论

user-avatar