一、创作背景
在人工智能技术蓬勃发展的今天,人机交互正逐渐走向自然化与智能化。作为信息科技教师,我希望带领学生探索AI与硬件的融合应用,让抽象的技术概念在真实场景中“动”起来。本次作品以“小智聊天机器人控制的行空板小车”为主题,将自然语言处理与嵌入式开发相结合。学生只需用语音发出指令,AI便能理解并转化为控制信号,驱动小车完成前进、后退、打开车灯、关闭车灯等任务。
我们期待通过这个项目,展现“AI+硬件”在教育中的无限可能——让机器听懂人言,让思维照亮现实。
二、作品简介
本作品是通过mind+图形化编程软件的python模式实现编程控制,通过与小智AI对话进而控制车辆的前进、后退、停止、开灯、关灯等任务。
三、硬件清单
1.行空板M10 1个
2.行空板M10扩展板组合 1个
3.TT电机、麦克纳姆轮 2个
4.红色led模块 1个
5.耳麦(含麦克风) 1个
四、制作过程
1.硬件组装
2.程序编写
步骤1 组装行空板M10+行空板M10扩展板组合
步骤2 组装TT电机和麦克纳姆轮,共2个
步骤3 将TT电机连接至行空板M10扩展板的M1与M2插口
步骤4 将红色LED模块连接至扩展板的P1引脚
步骤5 参考论坛中的有关行空板M10、小智文章,结合大模型deepseek辅助编程调试
代码
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import json
import time
import requests
import paho.mqtt.client as mqtt
import threading
import pyaudio
import opuslib
import socket
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import logging
from unihiker import GUI
from pinpong.board import Pin, Board, Servo
from pinpong.board import NeoPixel
import webrtcvad
# 初始化语音活动检测
vad = webrtcvad.Vad(3)
# 初始化行空板硬件接口
Board().begin()
gui = GUI()
gui.clear()
#按键引脚设置
p_p24_in = Pin(Pin.P24, Pin.IN)
pin1 = Pin(Pin.D13)
np1 = NeoPixel(pin1,2)#这里的2代表什么
np1.brightness(128)
np1.clear()
led = Pin(Pin.P1, Pin.OUT) # 根据实际LED连接的GPIO修改
m1_p5_out=Pin(Pin.P5, Pin.OUT)#拓展板M1电机的配置P5控制方向
m1_p8_pwm=Pin(Pin.P8, Pin.PWM)#拓展板M1电机的配置P8控制速度
m2_p6_out=Pin(Pin.P6, Pin.OUT)#拓展板M2电机的配置P6控制方向
m2_p16_pwm=Pin(Pin.P16, Pin.PWM)#拓展板M2电机的配置P16控制速度
def control_led(state):
"""控制LED灯状态
:param state: True开灯, False关灯
"""
if state:
led.write_digital(1) # 开灯
print("LED灯已打开")
else:
led.write_digital(0) # 关灯
print("LED灯已关闭")
def set_motor_speed(speed=0, direction=1):
"""
设置小车马达速度和方向
参数:
speed (int): 速度值,范围0-1023
direction (int): 方向,1表示前进,0表示后退
"""
# 设置方向
# 假设方向引脚高电平为前进,低电平为后退
m1_p5_out.write_digital(direction)
m2_p6_out.write_digital(direction)
# 设置PWM占空比 (0-1023对应0%-100%)
m1_p8_pwm.write_analog(speed)
m2_p16_pwm.write_analog(speed)
# 显示内容设置
fontSize = 20
max_lines = 16
max_chars = 8
# 图形界面元素
status_label = gui.draw_text(x=80, y=10, text='初始化中...', color='red')
log_text = gui.draw_text(x=10, y=100, text='', font_size=fontSize, color='blue')
emotion = gui.draw_text(x=110, y=50, text='', font_size=fontSize, color='green')
# 配置信息
OTA_VERSION_URL = 'https://api.tenclass.net/xiaozhi/ota/'
MAC_ADDR = '00:e0:4c:b8:98:08' # 请替换为实际MAC地址
# 全局变量
mqtt_info = {}
aes_opus_info = {
"type": "hello",
"version": 3,
"transport": "udp",
"udp": {
"server": "120.24.160.13",
"port": 8884,
"encryption": "aes-128-ctr",
"key": "263094c3aa28cb42f3965a1020cb21a7",
"nonce": "01000000ccba9720b4bc268100000000"
},
"audio_params": {
"format": "opus",
"sample_rate": 24000,
"channels": 1,
"frame_duration": 60
},
"session_id": None
}
local_sequence = 0
listen_state = None
tts_state = None
key_state = None
audio = None
vaddata = None
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn_state = False
recv_audio_thread = threading.Thread()
send_audio_thread = threading.Thread()
anjian_thread = threading.Thread()
mqttc = None
speekstate = 0
speekstoptime = 0
def get_ota_version():
"""获取OTA版本和MQTT配置信息"""
global mqtt_info
header = {
'Device-Id': MAC_ADDR,
'Content-Type': 'application/json'
}
post_data = {
"flash_size": 16777216,
"minimum_free_heap_size": 8318916,
"mac_address": MAC_ADDR,
"chip_model_name": "esp32s3",
"chip_info": {
"model": 9,
"cores": 2,
"revision": 2,
"features": 18
},
"application": {
"name": "xiaozhi",
"version": "0.9.9",
"compile_time": "Jan 22 2025T20:40:23Z",
"idf_version": "v5.3.2-dirty",
"elf_sha256": "22986216df095587c42f8aeb06b239781c68ad8df80321e260556da7fcf5f522"
},
"partition_table": [
{"label": "nvs", "type": 1, "subtype": 2, "address": 36864, "size": 16384},
{"label": "otadata", "type": 1, "subtype": 0, "address": 53248, "size": 8192},
{"label": "phy_init", "type": 1, "subtype": 1, "address": 61440, "size": 4096},
{"label": "model", "type": 1, "subtype": 130, "address": 65536, "size": 983040},
{"label": "storage", "type": 1, "subtype": 130, "address": 1048576, "size": 1048576},
{"label": "factory", "type": 0, "subtype": 0, "address": 2097152, "size": 4194304},
{"label": "ota_0", "type": 0, "subtype": 16, "address": 6291456, "size": 4194304},
{"label": "ota_1", "type": 0, "subtype": 17, "address": 10485760, "size": 4194304}
],
"ota": {"label": "factory"},
"board": {
"type": "bread-compact-wifi",
"ssid": "mzy",
"rssi": -58,
"channel": 6,
"ip": "192.168.124.38",
"mac": "cc:ba:97:20:b4:bc"
}
}
try:
response = requests.post(OTA_VERSION_URL, headers=header, data=json.dumps(post_data))
logging.info(f"get version: {response}")
mqtt_info = response.json()['mqtt']
except Exception as e:
logging.error(f"获取OTA版本失败: {e}")
# 使用默认MQTT配置作为后备
mqtt_info = {
"endpoint": "post-cn-apg3xckag01.mqtt.aliyuncs.com",
"client_id": f"GID_test@@@{MAC_ADDR.replace(':', '_')}",
"username": "Signature|LTAI5tF8J3CrdWmRiuTjxHbF|post-cn-apg3xckag01",
"password": "0mrkMFELXKyelhuYy2FpGDeCigU=",
"publish_topic": "device-server",
"subscribe_topic": "devices"
}
# 加密解密函数
def aes_ctr_encrypt(key, nonce, plaintext):
"""AES-CTR加密"""
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()
def aes_ctr_decrypt(key, nonce, ciphertext):
"""AES-CTR解密"""
cipher = Cipher(algorithms.AES(key), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
# 音频处理函数
def send_audio():
"""发送音频数据的线程函数"""
global aes_opus_info, udp_socket, local_sequence, listen_state, audio, vaddata
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
server_ip = aes_opus_info['udp']['server']
server_port = aes_opus_info['udp']['port']
# 初始化Opus编码器
encoder = opuslib.Encoder(16000, 1, opuslib.APPLICATION_AUDIO)
# 打开麦克风流
mic = audio.open(format=pyaudio.paInt16, channels=1, rate=16000,
input=True, frames_per_buffer=480)
try:
while True:
if listen_state == "stop":
time.sleep(0.1)
continue
# 读取音频数据
vaddata = mic.read(480)
data = vaddata + mic.read(480)
# 编码音频数据
encoded_data = encoder.encode(data, 960)
# 更新nonce并加密数据
local_sequence += 1
new_nonce = nonce[0:4] + format(len(encoded_data), '04x') + nonce[8:24] + format(local_sequence, '08x')
encrypt_encoded_data = aes_ctr_encrypt(bytes.fromhex(key), bytes.fromhex(new_nonce), bytes(encoded_data))
data = bytes.fromhex(new_nonce) + encrypt_encoded_data
# 发送数据
udp_socket.sendto(data, (server_ip, server_port))
except Exception as e:
logging.error(f"发送音频错误: {e}")
finally:
logging.info("发送音频线程退出")
local_sequence = 0
if mic:
mic.stop_stream()
mic.close()
def recv_audio():
"""接收音频数据的线程函数"""
global aes_opus_info, udp_socket, audio, speekstoptime
key = aes_opus_info['udp']['key']
nonce = aes_opus_info['udp']['nonce']
sample_rate = aes_opus_info['audio_params']['sample_rate']
frame_duration = aes_opus_info['audio_params']['frame_duration']
frame_num = int(frame_duration / (1000 / sample_rate))
# 初始化Opus解码器
decoder = opuslib.Decoder(sample_rate, 1)
# 打开扬声器流
spk = audio.open(format=pyaudio.paInt16, channels=1, rate=sample_rate,
output=True, frames_per_buffer=frame_num)
try:
while True:
data, server = udp_socket.recvfrom(4096)
# 分离nonce和加密数据
split_encrypt_encoded_data_nonce = data[:16]
split_encrypt_encoded_data = data[16:]
# 解密数据
decrypt_data = aes_ctr_decrypt(
bytes.fromhex(key),
split_encrypt_encoded_data_nonce,
split_encrypt_encoded_data
)
# 解码并播放音频
spk.write(decoder.decode(decrypt_data, frame_num))
speekstoptime = time.time()
except Exception as e:
logging.error(f"接收音频错误: {e}")
finally:
logging.info("接收音频线程退出")
if spk:
spk.stop_stream()
spk.close()
# 文本处理函数
def wrap_hanzi(text, first_line_width=5, other_line_width=16):
"""格式化文本显示,第一行和其他行宽度不同"""
lines = []
# 处理第一行
if len(text) > first_line_width:
lines.append(text[:first_line_width])
remaining_text = text[first_line_width:]
else:
lines.append(text)
remaining_text = ""
# 处理后续行
for i in range(0, len(remaining_text), other_line_width):
lines.append(remaining_text[i:i + other_line_width])
return "\n".join(lines)
def get_ascii_emotion(emotion):
"""根据情绪返回对应的ASCII表情"""
emotions = {
"happy": ":)",
"sad": ":(",
"winking": ";)",
"surprised": ":O",
"angry": ">:((",
"laughing": ":D",
"cool": "B-)",
"crying": ":'(",
"shy": "^_^",
"thinking": ":|",
"love": "<3",
"sleepy": "-.-",
"neutral": ":|",
"excited": ":D",
"confused": ":S"
}
return emotions.get(emotion, ":(") # 默认表情
# MQTT回调函数
def on_message(client, userdata, message):
"""MQTT消息回调"""
global aes_opus_info, udp_socket, tts_state, recv_audio_thread, send_audio_thread
global listen_state, speekstate, speekstoptime
try:
msg = json.loads(message.payload)
logging.info(f"收到消息: {msg}")
if msg['type'] == 'hello':
handle_hello_message(msg)
elif msg['type'] == 'tts':
handle_tts_message(msg)
elif msg['type'] == 'llm':
handle_llm_message(msg)
elif msg['type'] == 'stt':
handle_stt_message(msg)
elif msg['type'] == 'goodbye':
handle_goodbye_message(msg)
except Exception as e:
logging.error(f"处理消息错误: {e}")
def handle_hello_message(msg):
"""处理hello类型消息"""
global aes_opus_info, udp_socket, recv_audio_thread, send_audio_thread, listen_state
aes_opus_info = msg
udp_socket.connect((msg['udp']['server'], msg['udp']['port']))
# 启动接收音频线程
if not recv_audio_thread.is_alive():
recv_audio_thread = threading.Thread(target=recv_audio)
recv_audio_thread.start()
# 启动发送音频线程
if not send_audio_thread.is_alive():
send_audio_thread = threading.Thread(target=send_audio)
send_audio_thread.start()
# 发送开始聆听消息
if listen_state is None:
listen_state = "hello"
start_listen_msg = {
"session_id": aes_opus_info['session_id'],
"type": "listen",
"state": "start",
"mode": "manual"
}
push_mqtt_msg(start_listen_msg)
status_label.config(text="聆听中……")
def handle_tts_message(msg):
"""处理TTS类型消息"""
global tts_state, speekstate
tts_state = msg['state']
if msg['state'] == 'start':
status_label.config(text="讲话中……")
speekstate = 0
elif msg['state'] == 'stop':
status_label.config(text="就绪")
speekstate = 1
elif msg['state'] == 'sentence_start':
text = wrap_hanzi(msg['text'], 5, max_chars)
log_text.config(text="阿美: " + text)
# 新增LED控制逻辑
if "开灯" in msg['text']:
control_led(True)
np1.range_color(0, 1, 0x0000FF) # 同时控制NeoPixel
elif "关灯" in msg['text']:
control_led(False)
np1.clear()
# 新增M1,M2电机控制逻辑
if "前进" in msg['text']:
set_motor_speed(1023,1)#小车全速forward
elif "后退" in msg['text']:
set_motor_speed(1023,0)#小车全速backward
elif "停止" in msg['text']:
set_motor_speed(0,1)#小车stop
def handle_llm_message(msg):
"""处理LLM类型消息"""
ascii_emotion = get_ascii_emotion(msg['emotion'])
emotion.config(text=ascii_emotion)
def handle_stt_message(msg):
"""处理STT类型消息"""
text = wrap_hanzi(msg['text'], 5, max_chars)
log_text.config(text="我: " + text)
def handle_goodbye_message(msg):
"""处理goodbye类型消息"""
global aes_opus_info, listen_state, udp_socket
if udp_socket and msg['session_id'] == aes_opus_info['session_id']:
aes_opus_info['session_id'] = None
listen_state = None
log_text.config(text="")
status_label.config(text="休息中")
# 关闭UDP连接
udp_socket.close()
udp_socket = None
# 等待线程结束
for thread in (recv_audio_thread, send_audio_thread):
if thread and thread.is_alive():
thread.join(timeout=1)
def on_connect(client, userdata, flags, rc, properties=None):
"""MQTT连接回调"""
subscribe_topic = mqtt_info['subscribe_topic'].split("/")[0] + '/p2p/GID_test@@@' + MAC_ADDR.replace(':', '_')
client.subscribe(subscribe_topic)
status_label.config(text="就绪")
# MQTT消息推送
def push_mqtt_msg(message):
"""推送MQTT消息"""
try:
mqttc.publish(mqtt_info['publish_topic'], json.dumps(message))
except Exception as e:
logging.error(f"推送MQTT消息失败: {e}")
# 语音交互控制
def listen_start():
"""开始聆听"""
global key_state, udp_socket, aes_opus_info, listen_state, conn_state
if key_state == "press":
return
key_state = "press"
# 判断是否需要发送hello消息
if conn_state is False or aes_opus_info['session_id'] is None:
# 清理旧连接
if udp_socket:
udp_socket.close()
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn_state = True
# 发送hello消息,建立udp连接
hello_msg = {
"type": "hello",
"version": 3,
"transport": "udp",
"audio_params": {
"format": "opus",
"sample_rate": 16000,
"channels": 1,
"frame_duration": 60
}
}
push_mqtt_msg(hello_msg)
# 在播放状态下发送abort消息
if tts_state == "start" or tts_state == "entence_start":
push_mqtt_msg({"type": "abort"})
# 发送开始聆听消息
if aes_opus_info['session_id'] is not None:
msg = {
"session_id": aes_opus_info['session_id'],
"type": "listen",
"state": "start",
"mode": "manual"
}
push_mqtt_msg(msg)
def listen_stop():
"""停止聆听"""
global key_state
key_state = "release"
# 发送停止聆听消息
if aes_opus_info['session_id'] is not None:
msg = {
"session_id": aes_opus_info['session_id'],
"type": "listen",
"state": "stop"
}
push_mqtt_msg(msg)
# 按键检测线程
def anjian():
"""按键检测线程函数"""
global vaddata, speekstate, speekstoptime
bs = 0
last_voice_time = 0
while True:
if vaddata is not None:
# 检测语音活动
if vad.is_speech(vaddata, 16000) and speekstate == 1 and time.time() - speekstoptime > 1.5:
status_label.config(text="聆听中……")
bs = 1
listen_start()
last_voice_time = time.time()
elif time.time() - last_voice_time > 1.5 and bs == 1:
bs = 0
listen_stop()
else:
time.sleep(2)
bs = 1
listen_start()
last_voice_time = time.time()
# 主程序
def run():
"""主程序"""
global mqtt_info, mqttc, audio
# 初始化音频
audio = pyaudio.PyAudio()
# 获取MQTT配置
get_ota_version()
# 创建MQTT客户端
mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=mqtt_info['client_id']
)
mqttc.username_pw_set(
username=mqtt_info['username'],
password=mqtt_info['password']
)
mqttc.tls_set(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=mqtt.ssl.CERT_REQUIRED,
tls_version=mqtt.ssl.PROTOCOL_TLS,
ciphers=None
)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
# 连接MQTT服务器
try:
mqttc.connect(host=mqtt_info['endpoint'], port=8883)
except Exception as e:
logging.error(f"MQTT连接失败: {e}")
return
# 启动按键检测线程
anjian_thread = threading.Thread(target=anjian)
anjian_thread.daemon = True
anjian_thread.start()
# 启动MQTT循环
mqttc.loop_forever()
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
control_led(True)#开灯
time.sleep(1)
control_led(False)#关灯
time.sleep(1)
set_motor_speed(1023, 1)#小车全速前进
time.sleep(3)
set_motor_speed(512, 1)#小车低速前进
time.sleep(3)
set_motor_speed(0, 1)#小车停止
time.sleep(3)
set_motor_speed(1023,0)#小车全速后退
time.sleep(3)
set_motor_speed(0, 1)#小车停止
time.sleep(1)
run()
except KeyboardInterrupt:
logging.info("程序被用户中断")
except Exception as e:
logging.error(f"程序运行错误: {e}")
finally:
# 清理资源
if audio:
audio.terminate()
if udp_socket:
udp_socket.close()
logging.info("程序退出")
评论