回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

二哈识图2药品查询系统 简单

头像 Sunny_zhu 2025.12.16 30 0

背景介绍:随着我国人口老龄化的进程加剧,“居家养老”已经成为大部分老年人的首选。老年人大多面临着“多病共存、多重用药”的健康挑战。然而由于老年人记忆力减退,导致容易误服、漏服、重复用药等用药安全的事件发生。因此,开发一套针对老年人设计的“药品查询系统”势在必行。本项目开发了一个操作简便的以文字识别、语音交互为主的药品查询系统,旨在帮助老年人快速查询药品相关信息(功效、副作用、注意事项、禁忌事项等),从而助力于为老年人构建有尊严的居家养老环境。

系统的核心功能: 老年用户只需将药品包装上的药品名称文字区域对准 HuskyLens 2,系统将自动识别并触发查询,然后通过语音播报、文字显示等多种方式将查询内容反馈给老年用户。

系统的工作流程:

第一阶段:系统启动与传感器初始化

1.行空板开机,自动运行主程序。

2.程序初始化。行空板通过I2C协议与Huskylens 2建立通信;AI摄像头将工作模式设置为OCR模式,然后学习药盒上的药品名称;行空板加载主界面,提示用户要将药盒上的药品名称对准AI摄像头。

第二阶段:智能识别

1.将药品包装上的药品名称区域对准摄像头。

2.AI摄像头会进行光学字符识别,并将识别的文本发送给行空板。

3.行空板对识别到的药品名称信息进行有效性检查。通过才进入下一步,否则系统提示“未识别到有效的药品名称”。

第三阶段:数据查询

1.行空板接收来自Huskylens 2的文字识别结果。

2.调用讯飞星火大模型API查询药品相关信息。

3.执行查询,返回查询结果。

第四阶段:结果展示与交互

1.信息显示:行空板将查询结果(名称、功效、用法、禁忌、有效期等)在触摸屏上以清晰美观的界面展示。

2.语音播报:语音播报药品名称、用法、注意事项。

具体的流程图如下:

流程图.png

器材清单:

1.行空板M10:用于接收AI摄像头采集到的文本数据以及调用星火大模型获取相关的药品信息,并通过语音和文字输出查询结果。(购买链接:https://www.dfrobot.com.cn/goods-4237.html)

行空板M10.png

2.二哈识图2 AI视觉传感器(HuskyLens 2):采用OCR模式识别药品包装上的名称信息,并将识别结果传给行空板。(购买链接:https://www.dfrobot.com.cn/goods-4198.html)

二哈识图2 AI视觉传感器.png

3.USB喇叭:用于进行语音播报

硬件连接图:

硬件连接图.png

软件与服务:Mind+ (V1.8.1 RC3.0)、讯飞开放平台大模型Spark Pro V3.1

项目实施步骤:

1. 将二哈识图2调整到OCR模式,用它对要识别的药品名称进行学习,学习方法如下:对准要学习的文字,调整HUSKYLENS 2“看”的角度,使屏幕中间的“十”字光标位于白色框内,然后按下HUSKYLENS 2右上角A键,进行学习。学习完成后,如识别到已学习的文字,屏幕将以彩色方框框出该文字,并在上方显示“name: IDx 90%” 。

2.选择二哈识图2的通信方式为I2C方式。操作方法:点击“系统设置”->协议种类->选择I2C通讯模式。

通信协议.png

3.如上图所示进行硬件连接。

4.软件及相关服务的准备:

(1)大模型的API调用需要官方授权,需要购买或领用tokens。

(2)打开mind+ V1.8.1 RC3.0,并切换到python模式。在官方库中添加M10、代码生成器、讯飞语音。

官方库.png

在用户库中添加二哈识图2 AI摄像头。

用户库.png

然后返回编程主界面,点击连接远程终端,选择“默认-10.1.2.3”连接行空板M10。

行空板连接.png

5.程序编写如下所示:

主程序1:

主程序1.png

程序段1主要是导入库文件、全局变量赋初值及讯飞语音、行空板初始画面、二哈识图2摄像头的初始化。spark_drug_query库是自定义的库文件,主要用于调用讯飞认知大模型的接口程序。

主程序2:

主程序2.png
主程序3.png

程序段2用于让AI摄像头检测药品包装上的药品名称,验证药品名称的有效性,然后将合规的药品名称作为参数,调用药品信息查询函数进行药品信息的相关查询。并通过行空板屏幕画面、扬声器播报等方式向用户提供相应的查询结果。

下面是主程序中用到的自定义函数:

药品有效性检查.png

这段程序主要检查AI摄像头返回的字符串是否有效。如果返回字符串长度小于2或者为全数字,或者AI摄像头的返回值为-1,则判定识别到的药品名称无效。

下面这段是药品信息查询函数的定义:

药品信息查询1.png
药品信息查询2.png

其主要功能是调用讯飞大模型接口实现对药品信息的相关查询,查询结果存放在变量result中。程序段用deepseek进行了优化,deepseek提出的优化建议是:1.增加查询冷却时间,防止用户在短时间内频繁提交相同查询,避免产生不必要的API调用费用;2.增加缓存机制,将高频或已查询过的药品信息存储在本地缓存,后续相同查询可直接返回缓存结果,这里的缓存内容设置了1小时有效期,超出1小时,需要重新查询,从而获取最新数据。

下面这个函数用于在行空板屏幕上显示药品信息的查询结果:

显示查询结果.png

下面这个函数用于查询结束后,将行空板画面重置到初始化界面。

用户界面重置.png

spark_drug_query.py 库文件(详见附录)主要实现了用户发送药品查询请求,通过websocket通信与讯飞星火API建立连接,接收并处理API的返回值等功能。程序段来源于讯飞平台提供的调用API示例程序,并使用deepseek进行了程序优化,采用了线程锁,确保同一时间只有一个线程使用WebSocket连接,同时确保每个查询的结果独立、完整。

项目反思:

1.用二哈2对药品包装进行药名识别,准确率更高的方法其实是采用条形码模式,然后再访问在线API获取相关药品信息。用OCR方式进行药名识别,正确率很难达到100%,如果药名中有生僻字或者药品包装盒图案复杂,识别正确率就会偏低,识别结果存在乱码,还需要对识别结果进行过滤、纠错。更好的做法是将条码识别和OCR模式结合,如果药品包装没有条码,再使用OCR模式进行识别。不过这样的话,程序就比较复杂了,今后有机会再对程序进行优化升级。

2.为了提升用户体验,项目中调用讯飞的语音识别模型进行语音识别与播报,但语音反馈信息存在一定的延时,程序还需要进一步优化。

材料清单

  • X
代码
# spark_drug_query.py(添加完整线程锁)

import json
import base64
import hashlib
import hmac
import ssl
import threading
import time
from datetime import datetime
from time import mktime
from wsgiref.handlers import format_date_time
from urllib.parse import urlencode, urlparse

import websocket


class SparkDrugQuery:
    """讯飞星火大模型药物查询类 - 线程安全版本"""
    
    def __init__(self, appid, api_key, api_secret, spark_url=None, domain=None):
        """
        初始化药物查询模块
        
        Args:
            appid: 讯飞APPID
            api_key: API Key
            api_secret: API Secret
            spark_url: 星火API地址(可选,默认v3.1)
            domain: 模型域名(可选,默认generalv3.1)
        """
        self.appid = appid
        self.api_key = api_key
        self.api_secret = api_secret
        self.spark_url = spark_url or "wss://spark-api.xf-yun.com/v3.1/chat"
        self.domain = domain or "generalv3"
        
        # WebSocket相关属性
        self.ws = None
        
        # ✅ 使用锁保护所有共享变量
        self.lock = threading.Lock()
        self.full_response = []
        self.query_complete = False
        self.error = None
        self.current_query = ""
        
    def _create_url(self):
        """生成认证URL(内部方法)"""
        host = urlparse(self.spark_url).netloc
        path = urlparse(self.spark_url).path
        date = format_date_time(mktime(datetime.now().timetuple()))
        
        # 生成签名
        signature_origin = f"host: {host}\ndate: {date}\nGET {path} HTTP/1.1"
        signature_sha = hmac.new(
            self.api_secret.encode('utf-8'),
            signature_origin.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        signature_sha_base64 = base64.b64encode(signature_sha).decode('utf-8')
        
        # 生成authorization
        authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode('utf-8')
        
        # 组合参数
        v = {
            "authorization": authorization,
            "date": date,
            "host": host
        }
        
        # 生成完整URL
        url = f"{self.spark_url}?{urlencode(v)}"
        return url
    
    def _on_message(self, ws, message):
        """WebSocket消息回调(内部方法)"""
        try:
            data = json.loads(message)
            code = data['header']['code']
            
            if code != 0:
                # ✅ 使用锁保护共享变量
                with self.lock:
                    self.error = f"API错误 {code}: {data['header'].get('message', '未知错误')}"
                    self.query_complete = True
                print(f"❌ {self.error}")
                ws.close()
                return  # ✅ 提前返回
            
            choices = data["payload"]["choices"]
            status = choices["status"]
            content = choices["text"][0]["content"]
            
            # ✅ 使用锁保护共享变量
            with self.lock:
                self.full_response.append(content)
            
            print(content, end='', flush=True)
            
            if status == 2:
                print()  # 换行
                # ✅ 使用锁保护共享变量
                with self.lock:
                    self.query_complete = True
                ws.close()
                
        except Exception as e:
            # ✅ 使用锁保护共享变量
            with self.lock:
                self.error = f"消息解析错误: {e}"
                self.query_complete = True
            print(f"❌ {self.error}")
            ws.close()
    
    def _on_error(self, ws, error):
        """WebSocket错误回调(内部方法)"""
        # ✅ 使用锁保护共享变量
        with self.lock:
            self.error = f"连接错误: {error}"
            self.query_complete = True
        print(f"❌ {self.error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """WebSocket关闭回调(内部方法)"""
        # ✅ 使用锁保护共享变量
        with self.lock:
            self.query_complete = True
    
    def _on_open(self, ws):
        """WebSocket连接建立回调(内部方法)"""
        print("✅ WebSocket连接成功,正在查询...")
        
        # ✅ 使用锁保护当前查询
        with self.lock:
            query_to_send = self.current_query
        
        # 发送查询请求
        data = json.dumps({
            "header": {
                "app_id": self.appid,
                "uid": "drug_query_user"
            },
            "parameter": {
                "chat": {
                    "domain": self.domain,
                    "temperature": 0.5,
                    "max_tokens": 1024,
                    "auditing": "default",
                }
            },
            "payload": {
                "message": {
                    "text": [{"role": "user", "content": query_to_send}]
                }
            }
        })
        
        try:
            ws.send(data)
        except Exception as e:
            # ✅ 使用锁保护共享变量
            with self.lock:
                self.error = f"发送请求失败: {e}"
                self.query_complete = True
            print(f"❌ 发送请求失败: {e}")
    
    def query_drug_info(self, drug_name, query_type="all", timeout=20):
        """
        查询药物信息 - 线程安全版本
        
        Args:
            drug_name: 药物名称
            query_type: 查询类型
            timeout: 超时时间(秒)
            
        Returns:
            str: 查询结果,如果失败返回None
        """
        # 构建查询语句
        query_templates = {
            "all": f"请详细说明药物'{drug_name}'的以下信息:\n1. 主要功效\n2. 常见副作用\n3. 注意事项\n4. 用法用量\n请用简洁中文回答。",
            "simple": f"请用50-100字简要说明药物'{drug_name}'的主要功效、用法和注意事项,用中文回答。",
        }
        
        if query_type in query_templates:
            self.current_query = query_templates[query_type]
        else:
            self.current_query = f"查询药物'{drug_name}'的相关信息,用中文回答。"
        
        # ✅ 使用锁重置所有共享状态
        with self.lock:
            self.full_response = []
            self.query_complete = False
            self.error = None
        
        print(f"🔍 查询药物: {drug_name}")
        print(f"📋 查询类型: {query_type}")
        
        try:
            # 创建WebSocket连接
            ws_url = self._create_url()
            print(f"🔗 连接URL生成成功")
            
            # 关闭详细日志
            websocket.enableTrace(False)
            
            self.ws = websocket.WebSocketApp(
                ws_url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_open=self._on_open
            )
            
            # 运行WebSocket(带超时)
            def run_websocket():
                try:
                    self.ws.run_forever(
                        sslopt={
                            "cert_reqs": ssl.CERT_NONE,
                            "check_hostname": False
                        }
                    )
                except Exception as e:
                    # ✅ 使用锁保护共享变量
                    with self.lock:
                        if not self.error:
                            self.error = f"WebSocket运行错误: {e}"
                        self.query_complete = True
            
            # 启动WebSocket线程
            ws_thread = threading.Thread(target=run_websocket)
            ws_thread.daemon = True
            ws_thread.start()
            
            # 等待查询完成或超时
            start_time = time.time()
            
            while True:
                # ✅ 使用锁检查状态
                with self.lock:
                    if self.query_complete:
                        break
                
                if time.time() - start_time > timeout:
                    print(f"\n⏰ 查询超时 ({timeout}秒)")
                    if self.ws:
                        self.ws.close()
                    break
                
                time.sleep(0.1)
            
            # 等待线程结束
            ws_thread.join(timeout=2)
            
            # ✅ 使用锁获取最终结果
            with self.lock:
                if self.error:
                    print(f"\n❌ 查询失败: {self.error}")
                    return None
                elif self.full_response:
                    result = ''.join(self.full_response)
                    print(f"\n✅ 查询成功,字符数: {len(result)}")
                    return result
                else:
                    print("\n⚠️ 未获取到响应")
                    return None
                
        except KeyboardInterrupt:
            print("\n\n⏹️ 用户中断查询")
            if self.ws:
                self.ws.close()
            return None
        except Exception as e:
            print(f"\n❌ 查询异常: {e}")
            return None
    


class ThreadSafeSparkQuery:
    """完全线程安全的查询包装器"""
    
    def __init__(self, appid, api_key, api_secret):
        self.query_lock = threading.Lock()  # 实例级别的锁
        self.spark_query = SparkDrugQuery(appid, api_key, api_secret)
    
    def query(self, drug_name, query_type="simple", timeout=15):
        """
        线程安全的查询方法
        
        使用实例级别的锁确保同一时间只有一个查询在进行
        """
        with self.query_lock:
            print(f"🔒 获取查询锁,开始查询: {drug_name}")
            result = self.spark_query.query_drug_info(drug_name, query_type, timeout)
            print(f"🔓 释放查询锁")
            return result

评论

user-avatar