回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

用“小智”控制行空板小车 中等

头像 qzs1982 2025.06.30 14 0

一、创作背景

在人工智能技术蓬勃发展的今天,人机交互正逐渐走向自然化与智能化。作为信息科技教师,我希望带领学生探索AI与硬件的融合应用,让抽象的技术概念在真实场景中“动”起来。本次作品以“小智聊天机器人控制的行空板小车”为主题,将自然语言处理与嵌入式开发相结合。学生只需用语音发出指令,AI便能理解并转化为控制信号,驱动小车完成前进、后退、打开车灯、关闭车灯等任务。

我们期待通过这个项目,展现“AI+硬件”在教育中的无限可能——让机器听懂人言,让思维照亮现实。

二、作品简介

本作品是通过mind+图形化编程软件的python模式实现编程控制,通过与小智AI对话进而控制车辆的前进、后退、停止、开灯、关灯等任务。

三、硬件清单

1.行空板M10     1个

2.行空板M10扩展板组合      1个

3.TT电机、麦克纳姆轮    2个

4.红色led模块   1个

5.耳麦(含麦克风) 1个

四、制作过程

1.硬件组装

2.程序编写

步骤1 组装行空板M10+行空板M10扩展板组合

步骤2 组装TT电机和麦克纳姆轮,共2个

步骤3 将TT电机连接至行空板M10扩展板的M1与M2插口

步骤4 将红色LED模块连接至扩展板的P1引脚

步骤5 参考论坛中的有关行空板M10、小智文章,结合大模型deepseek辅助编程调试

材料清单

  • 行空板M10 X1 链接
  • 行空板M10扩展板组合 X1 链接
  • TT电机 X2 链接
  • Gravity: 数字食人鱼红色LED发光模块 X1 链接
代码
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import time
import requests
import paho.mqtt.client as mqtt
import threading
import pyaudio
import opuslib
import socket
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import logging
from unihiker import GUI
from pinpong.board import Pin, Board, Servo
from pinpong.board import NeoPixel
import webrtcvad

# 初始化语音活动检测
vad = webrtcvad.Vad(3)

# 初始化行空板硬件接口
Board().begin()
gui = GUI()
gui.clear()

#按键引脚设置
p_p24_in = Pin(Pin.P24, Pin.IN)

pin1 = Pin(Pin.D13)
np1 = NeoPixel(pin1,2)#这里的2代表什么
np1.brightness(128)

np1.clear()
led = Pin(Pin.P1, Pin.OUT)  # 根据实际LED连接的GPIO修改

m1_p5_out=Pin(Pin.P5, Pin.OUT)#拓展板M1电机的配置P5控制方向
m1_p8_pwm=Pin(Pin.P8, Pin.PWM)#拓展板M1电机的配置P8控制速度
m2_p6_out=Pin(Pin.P6, Pin.OUT)#拓展板M2电机的配置P6控制方向
m2_p16_pwm=Pin(Pin.P16, Pin.PWM)#拓展板M2电机的配置P16控制速度



def control_led(state):
    """控制LED灯状态
    :param state: True开灯, False关灯
    """
    if state:
        led.write_digital(1)  # 开灯
        print("LED灯已打开")
    else:
        led.write_digital(0)  # 关灯
        print("LED灯已关闭")

def set_motor_speed(speed=0, direction=1):
    """
    设置小车马达速度和方向
    
    参数:
        speed (int): 速度值,范围0-1023
        direction (int): 方向,1表示前进,0表示后退
    """
    # 设置方向
    # 假设方向引脚高电平为前进,低电平为后退
    m1_p5_out.write_digital(direction)
    m2_p6_out.write_digital(direction)
    
    # 设置PWM占空比 (0-1023对应0%-100%)
    m1_p8_pwm.write_analog(speed)
    m2_p16_pwm.write_analog(speed)


# 显示内容设置
fontSize = 20
max_lines = 16
max_chars = 8

# 图形界面元素
status_label = gui.draw_text(x=80, y=10, text='初始化中...', color='red')
log_text = gui.draw_text(x=10, y=100, text='', font_size=fontSize, color='blue')
emotion = gui.draw_text(x=110, y=50, text='', font_size=fontSize, color='green')

# 配置信息
OTA_VERSION_URL = 'https://api.tenclass.net/xiaozhi/ota/'
MAC_ADDR = '00:e0:4c:b8:98:08'  # 请替换为实际MAC地址

# 全局变量
mqtt_info = {}
aes_opus_info = {
    "type": "hello", 
    "version": 3, 
    "transport": "udp",
    "udp": {
        "server": "120.24.160.13", 
        "port": 8884, 
        "encryption": "aes-128-ctr",
        "key": "263094c3aa28cb42f3965a1020cb21a7", 
        "nonce": "01000000ccba9720b4bc268100000000"
    },
    "audio_params": {
        "format": "opus", 
        "sample_rate": 24000, 
        "channels": 1, 
        "frame_duration": 60
    },
    "session_id": None
}

local_sequence = 0
listen_state = None
tts_state = None
key_state = None
audio = None
vaddata = None
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn_state = False
recv_audio_thread = threading.Thread()
send_audio_thread = threading.Thread()
anjian_thread = threading.Thread()
mqttc = None
speekstate = 0
speekstoptime = 0

def get_ota_version():
    """获取OTA版本和MQTT配置信息"""
    global mqtt_info
    header = {
        'Device-Id': MAC_ADDR,
        'Content-Type': 'application/json'
    }
    post_data = {
        "flash_size": 16777216,
        "minimum_free_heap_size": 8318916,
        "mac_address": MAC_ADDR,
        "chip_model_name": "esp32s3",
        "chip_info": {
            "model": 9,
            "cores": 2,
            "revision": 2,
            "features": 18
        },
        "application": {
            "name": "xiaozhi",
            "version": "0.9.9",
            "compile_time": "Jan 22 2025T20:40:23Z",
            "idf_version": "v5.3.2-dirty",
            "elf_sha256": "22986216df095587c42f8aeb06b239781c68ad8df80321e260556da7fcf5f522"
        },
        "partition_table": [
            {"label": "nvs", "type": 1, "subtype": 2, "address": 36864, "size": 16384},
            {"label": "otadata", "type": 1, "subtype": 0, "address": 53248, "size": 8192},
            {"label": "phy_init", "type": 1, "subtype": 1, "address": 61440, "size": 4096},
            {"label": "model", "type": 1, "subtype": 130, "address": 65536, "size": 983040},
            {"label": "storage", "type": 1, "subtype": 130, "address": 1048576, "size": 1048576},
            {"label": "factory", "type": 0, "subtype": 0, "address": 2097152, "size": 4194304},
            {"label": "ota_0", "type": 0, "subtype": 16, "address": 6291456, "size": 4194304},
            {"label": "ota_1", "type": 0, "subtype": 17, "address": 10485760, "size": 4194304}
        ],
        "ota": {"label": "factory"},
        "board": {
            "type": "bread-compact-wifi",
            "ssid": "mzy",
            "rssi": -58,
            "channel": 6,
            "ip": "192.168.124.38",
            "mac": "cc:ba:97:20:b4:bc"
        }
    }

    try:
        response = requests.post(OTA_VERSION_URL, headers=header, data=json.dumps(post_data))
        logging.info(f"get version: {response}")
        mqtt_info = response.json()['mqtt']
    except Exception as e:
        logging.error(f"获取OTA版本失败: {e}")
        # 使用默认MQTT配置作为后备
        mqtt_info = {
            "endpoint": "post-cn-apg3xckag01.mqtt.aliyuncs.com",
            "client_id": f"GID_test@@@{MAC_ADDR.replace(':', '_')}",
            "username": "Signature|LTAI5tF8J3CrdWmRiuTjxHbF|post-cn-apg3xckag01",
            "password": "0mrkMFELXKyelhuYy2FpGDeCigU=",
            "publish_topic": "device-server",
            "subscribe_topic": "devices"
        }

# 加密解密函数
def aes_ctr_encrypt(key, nonce, plaintext):
    """AES-CTR加密"""
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
    encryptor = cipher.encryptor()
    return encryptor.update(plaintext) + encryptor.finalize()

def aes_ctr_decrypt(key, nonce, ciphertext):
    """AES-CTR解密"""
    cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
    decryptor = cipher.decryptor()
    plaintext = decryptor.update(ciphertext) + decryptor.finalize()
    return plaintext

# 音频处理函数
def send_audio():
    """发送音频数据的线程函数"""
    global aes_opus_info, udp_socket, local_sequence, listen_state, audio, vaddata
    
    key = aes_opus_info['udp']['key']
    nonce = aes_opus_info['udp']['nonce']
    server_ip = aes_opus_info['udp']['server']
    server_port = aes_opus_info['udp']['port']
    
    # 初始化Opus编码器
    encoder = opuslib.Encoder(16000, 1, opuslib.APPLICATION_AUDIO)
    
    # 打开麦克风流
    mic = audio.open(format=pyaudio.paInt16, channels=1, rate=16000, 
                    input=True, frames_per_buffer=480)
    
    try:
        while True:
            if listen_state == "stop":
                time.sleep(0.1)
                continue
            
            # 读取音频数据
            vaddata = mic.read(480)
            data = vaddata + mic.read(480)
            
            # 编码音频数据
            encoded_data = encoder.encode(data, 960)
            
            # 更新nonce并加密数据
            local_sequence += 1
            new_nonce = nonce[0:4] + format(len(encoded_data), '04x') + nonce[8:24] + format(local_sequence, '08x')
            encrypt_encoded_data = aes_ctr_encrypt(bytes.fromhex(key), bytes.fromhex(new_nonce), bytes(encoded_data))
            data = bytes.fromhex(new_nonce) + encrypt_encoded_data
            
            # 发送数据
            udp_socket.sendto(data, (server_ip, server_port))
            
    except Exception as e:
        logging.error(f"发送音频错误: {e}")
    finally:
        logging.info("发送音频线程退出")
        local_sequence = 0
        if mic:
            mic.stop_stream()
            mic.close()

def recv_audio():
    """接收音频数据的线程函数"""
    global aes_opus_info, udp_socket, audio, speekstoptime
    
    key = aes_opus_info['udp']['key']
    nonce = aes_opus_info['udp']['nonce']
    sample_rate = aes_opus_info['audio_params']['sample_rate']
    frame_duration = aes_opus_info['audio_params']['frame_duration']
    frame_num = int(frame_duration / (1000 / sample_rate))
    
    # 初始化Opus解码器
    decoder = opuslib.Decoder(sample_rate, 1)
    
    # 打开扬声器流
    spk = audio.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, 
                    output=True, frames_per_buffer=frame_num)
    
    try:
        while True:
            data, server = udp_socket.recvfrom(4096)
            
            # 分离nonce和加密数据
            split_encrypt_encoded_data_nonce = data[:16]
            split_encrypt_encoded_data = data[16:]
            
            # 解密数据
            decrypt_data = aes_ctr_decrypt(
                bytes.fromhex(key),
                split_encrypt_encoded_data_nonce,
                split_encrypt_encoded_data
            )
            
            # 解码并播放音频
            spk.write(decoder.decode(decrypt_data, frame_num))
            speekstoptime = time.time()
            
    except Exception as e:
        logging.error(f"接收音频错误: {e}")
    finally:
        logging.info("接收音频线程退出")
        if spk:
            spk.stop_stream()
            spk.close()

# 文本处理函数
def wrap_hanzi(text, first_line_width=5, other_line_width=16):
    """格式化文本显示,第一行和其他行宽度不同"""
    lines = []
    
    # 处理第一行
    if len(text) > first_line_width:
        lines.append(text[:first_line_width])
        remaining_text = text[first_line_width:]
    else:
        lines.append(text)
        remaining_text = ""
    
    # 处理后续行
    for i in range(0, len(remaining_text), other_line_width):
        lines.append(remaining_text[i:i + other_line_width])
    
    return "\n".join(lines)

def get_ascii_emotion(emotion):
    """根据情绪返回对应的ASCII表情"""
    emotions = {
        "happy": ":)",
        "sad": ":(",
        "winking": ";)",
        "surprised": ":O",
        "angry": ">:((",
        "laughing": ":D",
        "cool": "B-)",
        "crying": ":'(",
        "shy": "^_^",
        "thinking": ":|",
        "love": "<3",
        "sleepy": "-.-",
        "neutral": ":|",
        "excited": ":D",
        "confused": ":S"
    }
    return emotions.get(emotion, ":(")  # 默认表情

# MQTT回调函数
def on_message(client, userdata, message):
    """MQTT消息回调"""
    global aes_opus_info, udp_socket, tts_state, recv_audio_thread, send_audio_thread
    global listen_state, speekstate, speekstoptime
    
    try:
        msg = json.loads(message.payload)
        logging.info(f"收到消息: {msg}")
        
        if msg['type'] == 'hello':
            handle_hello_message(msg)
        elif msg['type'] == 'tts':
            handle_tts_message(msg)
        elif msg['type'] == 'llm':
            handle_llm_message(msg)
        elif msg['type'] == 'stt':
            handle_stt_message(msg)
        elif msg['type'] == 'goodbye':
            handle_goodbye_message(msg)
            
    except Exception as e:
        logging.error(f"处理消息错误: {e}")

def handle_hello_message(msg):
    """处理hello类型消息"""
    global aes_opus_info, udp_socket, recv_audio_thread, send_audio_thread, listen_state
    
    aes_opus_info = msg
    udp_socket.connect((msg['udp']['server'], msg['udp']['port']))
    
    # 启动接收音频线程
    if not recv_audio_thread.is_alive():
        recv_audio_thread = threading.Thread(target=recv_audio)
        recv_audio_thread.start()
    
    # 启动发送音频线程
    if not send_audio_thread.is_alive():
        send_audio_thread = threading.Thread(target=send_audio)
        send_audio_thread.start()
    
    # 发送开始聆听消息
    if listen_state is None:
        listen_state = "hello"
        start_listen_msg = {
            "session_id": aes_opus_info['session_id'],
            "type": "listen",
            "state": "start",
            "mode": "manual"
        }
        push_mqtt_msg(start_listen_msg)
        status_label.config(text="聆听中……")

def handle_tts_message(msg):
    """处理TTS类型消息"""
    global tts_state, speekstate
    
    tts_state = msg['state']
    
    if msg['state'] == 'start':
        status_label.config(text="讲话中……")
        speekstate = 0
    elif msg['state'] == 'stop':
        status_label.config(text="就绪")
        speekstate = 1
    elif msg['state'] == 'sentence_start':
        text = wrap_hanzi(msg['text'], 5, max_chars)
        log_text.config(text="阿美: " + text)
        
        # 新增LED控制逻辑
        if "开灯" in msg['text']:
            control_led(True)
            np1.range_color(0, 1, 0x0000FF)  # 同时控制NeoPixel
        elif "关灯" in msg['text']:
            control_led(False)
            np1.clear()
        # 新增M1,M2电机控制逻辑
        if "前进" in msg['text']:
            set_motor_speed(1023,1)#小车全速forward
        elif "后退" in msg['text']:
            set_motor_speed(1023,0)#小车全速backward
        elif "停止" in msg['text']:
            set_motor_speed(0,1)#小车stop
def handle_llm_message(msg):
    """处理LLM类型消息"""
    ascii_emotion = get_ascii_emotion(msg['emotion'])
    emotion.config(text=ascii_emotion)

def handle_stt_message(msg):
    """处理STT类型消息"""
    text = wrap_hanzi(msg['text'], 5, max_chars)
    log_text.config(text="我: " + text)

def handle_goodbye_message(msg):
    """处理goodbye类型消息"""
    global aes_opus_info, listen_state, udp_socket
    
    if udp_socket and msg['session_id'] == aes_opus_info['session_id']:
        aes_opus_info['session_id'] = None
        listen_state = None
        log_text.config(text="")
        status_label.config(text="休息中")
        
        # 关闭UDP连接
        udp_socket.close()
        udp_socket = None
        
        # 等待线程结束
        for thread in (recv_audio_thread, send_audio_thread):
            if thread and thread.is_alive():
                thread.join(timeout=1)

def on_connect(client, userdata, flags, rc, properties=None):
    """MQTT连接回调"""
    subscribe_topic = mqtt_info['subscribe_topic'].split("/")[0] + '/p2p/GID_test@@@' + MAC_ADDR.replace(':', '_')
    client.subscribe(subscribe_topic)
    status_label.config(text="就绪")

# MQTT消息推送
def push_mqtt_msg(message):
    """推送MQTT消息"""
    try:
        mqttc.publish(mqtt_info['publish_topic'], json.dumps(message))
    except Exception as e:
        logging.error(f"推送MQTT消息失败: {e}")

# 语音交互控制
def listen_start():
    """开始聆听"""
    global key_state, udp_socket, aes_opus_info, listen_state, conn_state
    
    if key_state == "press":
        return
    
    key_state = "press"
    
    # 判断是否需要发送hello消息
    if conn_state is False or aes_opus_info['session_id'] is None:
        # 清理旧连接
        if udp_socket:
            udp_socket.close()
            udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        conn_state = True
        # 发送hello消息,建立udp连接
        hello_msg = {
            "type": "hello",
            "version": 3,
            "transport": "udp",
            "audio_params": {
                "format": "opus",
                "sample_rate": 16000,
                "channels": 1,
                "frame_duration": 60
            }
        }
        push_mqtt_msg(hello_msg)
    
    # 在播放状态下发送abort消息
    if tts_state == "start" or tts_state == "entence_start":
        push_mqtt_msg({"type": "abort"})
    
    # 发送开始聆听消息
    if aes_opus_info['session_id'] is not None:
        msg = {
            "session_id": aes_opus_info['session_id'],
            "type": "listen",
            "state": "start",
            "mode": "manual"
        }
        push_mqtt_msg(msg)

def listen_stop():
    """停止聆听"""
    global key_state
    
    key_state = "release"
    
    # 发送停止聆听消息
    if aes_opus_info['session_id'] is not None:
        msg = {
            "session_id": aes_opus_info['session_id'],
            "type": "listen",
            "state": "stop"
        }
        push_mqtt_msg(msg)

# 按键检测线程
def anjian():
    """按键检测线程函数"""
    global vaddata, speekstate, speekstoptime
    
    bs = 0
    last_voice_time = 0
    
    while True:
        if vaddata is not None:
            # 检测语音活动
            if vad.is_speech(vaddata, 16000) and speekstate == 1 and time.time() - speekstoptime > 1.5:
                status_label.config(text="聆听中……")
                bs = 1
                listen_start()
                last_voice_time = time.time()
            elif time.time() - last_voice_time > 1.5 and bs == 1:
                bs = 0
                listen_stop()
        else:
            time.sleep(2)
            bs = 1
            listen_start()
            last_voice_time = time.time()

# 主程序
def run():
    """主程序"""
    global mqtt_info, mqttc, audio
    
    # 初始化音频
    audio = pyaudio.PyAudio()
    
    # 获取MQTT配置
    get_ota_version()
    
    # 创建MQTT客户端
    mqttc = mqtt.Client(
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        client_id=mqtt_info['client_id']
    )
    mqttc.username_pw_set(
        username=mqtt_info['username'],
        password=mqtt_info['password']
    )
    mqttc.tls_set(
        ca_certs=None,
        certfile=None,
        keyfile=None,
        cert_reqs=mqtt.ssl.CERT_REQUIRED,
        tls_version=mqtt.ssl.PROTOCOL_TLS,
        ciphers=None
    )
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    
    # 连接MQTT服务器
    try:
        mqttc.connect(host=mqtt_info['endpoint'], port=8883)
    except Exception as e:
        logging.error(f"MQTT连接失败: {e}")
        return
    
    # 启动按键检测线程
    anjian_thread = threading.Thread(target=anjian)
    anjian_thread.daemon = True
    anjian_thread.start()
    
    # 启动MQTT循环
    mqttc.loop_forever()

if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    try:
        control_led(True)#开灯
        time.sleep(1)
        control_led(False)#关灯
        time.sleep(1)
        set_motor_speed(1023, 1)#小车全速前进
        time.sleep(3)
        set_motor_speed(512, 1)#小车低速前进
        time.sleep(3)
        set_motor_speed(0, 1)#小车停止
        time.sleep(3)
        set_motor_speed(1023,0)#小车全速后退
        time.sleep(3)
        set_motor_speed(0, 1)#小车停止
        time.sleep(1)
        run()

    except KeyboardInterrupt:
        logging.info("程序被用户中断")
    except Exception as e:
        logging.error(f"程序运行错误: {e}")
    finally:
        # 清理资源
        if audio:
            audio.terminate()
        if udp_socket:
            udp_socket.close()
        logging.info("程序退出")

评论

user-avatar