行空板M10扩展板 —— 小智MCP(基础测试)
随着小智的出现,让大语言模型交互触手可及,又随着小智MCP的应用,使硬件更为智能,DIY的方式更加丰富。对于M10这套电机及电池扩展版,非常轻巧方便,我想让他成为一辆智能车的控制系统,接入小智,让他成为一辆受控的智能RC车。但在这之前,我们要做一些简单测试,后面再做进一步深入。
实现过程有些复杂,我们先直接上基础测试的视频来看一下效果。
首先,我们先做一个设备自检,确保设备都能正常运行:
确保设备没问题,然后接入小智,用小智控制硬件设备:
那么,这是如何实现的呢?我们先说一下整体框架:嘉立创ESP32实战派作为上位机,行空板作为下位机,经过小智的理解和MCP的控制,能够实现自然语言精准控制接入的硬件设备。以下是整个实现过程:

接下来,我们来实操一下,先把基础测试打通,后续搭建成智能小车,再进一步探索.你需要准备如下工作:
1、接收端:
首先准备一套M10轻便套装,M10开发板+电池扩展版+电机扩展版,采购链接为https://www.dfrobot.com.cn/goods-4127.html

2、控制端:
可以是电脑的pyxiaozhi,可以是手机的app,也可以是各种智能派。还有虾哥开源的小智ESP32,具体可到小智官方github的项目去了解 https://github.com/78/xiaozhi-esp32/tree/main

我这次选用的是嘉立创的实战派,刷好对应的固件:
通过网盘分享的文件:v1.4.6_lichuang-dev.zip
链接: https://pan.baidu.com/s/1dNOaRMxORnXE3IErOomtTg?pwd=LCKF
提取码: LCKF

3、服务端:
还需要一台电脑,跑DIY的小智MCP服务,把这个项目下载到本地
https://github.com/78/mcp-calculator/tree/main
除了MCP服务,还要自己跑一个UDP转发的服务,把小智的消息转发给行空板,后续可以把他简化掉。
4、测试设备清单:
a)LED灯
b)TT 马达
c)SG90 舵机
d)USB摄像头
最后直接上干货,烧录固件或者运行代码,让体统能够正常运行起来:
1、控制端:
嘉立创ESP32S3实战派烧录好固件,配好网,在xiaozhi.me注册好,并获取MCP站点信息。
2、服务端1:
定义小智控制设备的功能实现,把下载好的MCP中的calculator.py进行修改,增加行空板M10的接入设备控制功能,替换掉原来的文件:
from mcp.server.fastmcp import FastMCP
import sys
import logging
import socket
import json
logger = logging.getLogger('RC_Car')
# Fix UTF-8 encoding for Windows console
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# UDP Broadcast configuration
UDP_BROADCAST_PORT = 8089
UDP_BROADCAST_ADDR = '255.255.255.255'
def setup_udp_socket():
"""Set up a UDP socket for broadcasting"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
return sock
def broadcast_result(result_data):
"""Broadcast the result via UDP"""
try:
sock = setup_udp_socket()
message = json.dumps(result_data).encode('utf-8')
sock.sendto(message, (UDP_BROADCAST_ADDR, UDP_BROADCAST_PORT))
sock.close()
logger.info(f"Result broadcasted: {result_data}")
except Exception as e:
logger.error(f"Failed to broadcast result: {e}")
# Create an MCP server
mcp = FastMCP("RC_Car")
# LED Control Commands
@mcp.tool()
def led_on() -> dict:
"""
Turn on the LED.
Returns:
Dictionary with operation status and details
"""
logger.info("Turning on LED")
result_data = {
"success": True,
"action": "led_on"
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def led_off() -> dict:
"""
Turn off the LED.
Returns:
Dictionary with operation status and details
"""
logger.info("Turning off LED")
result_data = {
"success": True,
"action": "led_off"
}
broadcast_result(result_data)
return result_data
# Motor Control Commands
@mcp.tool()
def motorA_forward(speed: int) -> dict:
"""
Move motor A forward with specified speed.
Args:
speed: Speed value (0-1000)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Moving motor A forward at speed {speed}")
result_data = {
"success": True,
"action": "motorA_forward",
"speed": speed
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def motorA_reverse(speed: int) -> dict:
"""
Move motor A in reverse with specified speed.
Args:
speed: Speed value (0-1000)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Moving motor A in reverse at speed {speed}")
result_data = {
"success": True,
"action": "motorA_reverse",
"speed": speed
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def motorB_forward(speed: int) -> dict:
"""
Move motor B forward with specified speed.
Args:
speed: Speed value (0-1000)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Moving motor B forward at speed {speed}")
result_data = {
"success": True,
"action": "motorB_forward",
"speed": speed
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def motorB_reverse(speed: int) -> dict:
"""
Move motor B in reverse with specified speed.
Args:
speed: Speed value (0-1000)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Moving motor B in reverse at speed {speed}")
result_data = {
"success": True,
"action": "motorB_reverse",
"speed": speed
}
broadcast_result(result_data)
return result_data
# Servo Control Command
@mcp.tool()
def servo_angle(angle: int) -> dict:
"""
Set servo to specified angle.
Args:
angle: Angle value (0-180)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Setting servo angle to {angle}")
result_data = {
"success": True,
"action": "servo_angle",
"angle": angle
}
broadcast_result(result_data)
return result_data
# Neopixel Control Command
@mcp.tool()
def neopixel_color(red: int, green: int, blue: int) -> dict:
"""
Set Neopixel color using RGB values.
Args:
red: Red value (0-255)
green: Green value (0-255)
blue: Blue value (0-255)
Returns:
Dictionary with operation status and details
"""
logger.info(f"Setting Neopixel color to R:{red} G:{green} B:{blue}")
result_data = {
"success": True,
"action": "neopixel_color",
"color": [red, green, blue]
}
broadcast_result(result_data)
return result_data
# Display Commands
@mcp.tool()
def show_time() -> dict:
"""
Display current time on screen.
Returns:
Dictionary with operation status and details
"""
logger.info("Displaying current time")
result_data = {
"success": True,
"action": "show_time"
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def show_clock() -> dict:
"""
Display analog clock on screen.
Returns:
Dictionary with operation status and details
"""
logger.info("Displaying analog clock")
result_data = {
"success": True,
"action": "show_clock"
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def show_image(image_path: str) -> dict:
"""
Display specified image on screen.
Args:
image_path: Path to image file
Returns:
Dictionary with operation status and details
"""
logger.info(f"Displaying image: {image_path}")
result_data = {
"success": True,
"action": "show_image",
"image_path": image_path
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def draw_arrow(direction: str) -> dict:
"""
Draw arrow in specified direction.
Args:
direction: Arrow direction ("up", "down", "left", "right")
Returns:
Dictionary with operation status and details
"""
valid_directions = ["up", "down", "left", "right"]
if direction.lower() not in valid_directions:
raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
logger.info(f"Drawing arrow pointing {direction}")
result_data = {
"success": True,
"action": "draw_arrow",
"direction": direction
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def draw_polygon(points: list) -> dict:
"""
Draw polygon with specified points.
Args:
points: List of (x,y) coordinate tuples
Returns:
Dictionary with operation status and details
"""
logger.info(f"Drawing polygon with points: {points}")
result_data = {
"success": True,
"action": "draw_polygon",
"points": points
}
broadcast_result(result_data)
return result_data
# Camera Commands (take_photo already exists)
@mcp.tool()
def video_stream() -> dict:
"""
Start video streaming from camera.
Returns:
Dictionary with operation status and details
"""
logger.info("Starting video stream")
result_data = {
"success": True,
"action": "video_stream"
}
broadcast_result(result_data)
return result_data
# Buzzer Control Command
@mcp.tool()
def buzzer_tone(frequency: int, duration: int) -> dict:
"""
Play buzzer tone with specified frequency and duration.
Args:
frequency: Tone frequency in Hz
duration: Duration in seconds
Returns:
Dictionary with operation status and details
"""
logger.info(f"Playing buzzer tone at {frequency}Hz for {duration}ms")
result_data = {
"success": True,
"action": "buzzer_tone",
"frequency": frequency,
"duration": duration
}
broadcast_result(result_data)
return result_data
# IR Signal Command
@mcp.tool()
def ir_send(code: str) -> dict:
"""
Send IR signal with specified code.
Args:
code: IR code in hexadecimal format
Returns:
Dictionary with operation status and details
"""
logger.info(f"Sending IR signal: {code}")
result_data = {
"success": True,
"action": "ir_send",
"code": code
}
broadcast_result(result_data)
return result_data
# Original RC Car Control Functions (kept from previous version)
@mcp.tool()
def move_forward(distance: float) -> dict:
"""Move the RC car forward by a specified distance."""
logger.info(f"Moving forward {distance} meters")
result_data = {
"success": True,
"action": "move_forward",
"distance": distance
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def move_backward(distance: float) -> dict:
"""Move the RC car backward by a specified distance."""
logger.info(f"Moving backward {distance} meters")
result_data = {
"success": True,
"action": "move_backward",
"distance": distance
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def turn_left(angle: float) -> dict:
"""Turn the RC car left by a specified angle."""
logger.info(f"Turning left {angle} degrees")
result_data = {
"success": True,
"action": "turn_left",
"angle": angle
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def turn_right(angle: float) -> dict:
"""Turn the RC car right by a specified angle."""
logger.info(f"Turning right {angle} degrees")
result_data = {
"success": True,
"action": "turn_right",
"angle": angle
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def turn_signal(direction: str) -> dict:
"""Activate the turn signal (left or right) on the RC car."""
valid_directions = ["left", "right"]
if direction.lower() not in valid_directions:
raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
logger.info(f"Activating {direction} turn signal")
result_data = {
"success": True,
"action": "turn_signal",
"direction": direction
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def follow_line() -> dict:
"""Start the line-following mode on the RC car."""
logger.info("Starting line-following mode")
result_data = {
"success": True,
"action": "follow_line"
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def track_object() -> dict:
"""Start object tracking mode on the RC car."""
logger.info("Starting object tracking mode")
result_data = {
"success": True,
"action": "track_object"
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def get_current_direction() -> dict:
"""Get the current direction of the RC car."""
logger.info("Getting current direction")
direction = "north" # Placeholder value
result_data = {
"success": True,
"action": "get_current_direction",
"direction": direction
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def get_current_temperature() -> dict:
"""Get the current temperature reading from the RC car's sensor."""
logger.info("Getting current temperature")
temperature = 25.0 # Placeholder value
result_data = {
"success": True,
"action": "get_current_temperature",
"temperature": temperature
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def get_current_light_intensity() -> dict:
"""Get the current light intensity reading from the RC car's sensor."""
logger.info("Getting current light intensity")
light_intensity = 500 # Placeholder value
result_data = {
"success": True,
"action": "get_current_light_intensity",
"light_intensity": light_intensity
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def draw_shape(shape: str) -> dict:
"""Draw a specified shape using the RC car's movement."""
valid_shapes = ["circle", "square", "triangle"]
if shape.lower() not in valid_shapes:
raise ValueError(f"Invalid shape '{shape}'. Must be one of {valid_shapes}")
logger.info(f"Drawing shape: {shape}")
result_data = {
"success": True,
"action": "draw_shape",
"shape": shape
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def turn_on_turn_signal(direction: str) -> dict:
"""Turn on the turn signal (left or right) on the RC car."""
valid_directions = ["left", "right"]
if direction.lower() not in valid_directions:
raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
logger.info(f"Turning on {direction} turn signal")
result_data = {
"success": True,
"action": "turn_on_turn_signal",
"direction": direction
}
broadcast_result(result_data)
return result_data
@mcp.tool()
def turn_on_color_light(color: str) -> dict:
"""Turn on the color light on the RC car."""
valid_colors = ["red", "rainbow"]
if color.lower() not in valid_colors:
raise ValueError(f"Invalid color '{color}'. Must be one of {valid_colors}")
logger.info(f"Turning on {color} color light")
result_data = {
"success": True,
"action": "turn_on_color_light",
"color": color
}
broadcast_result(result_data)
return result_data
# Start the server
if __name__ == "__main__":
mcp.run(transport="stdio")
3、服务端2:
负责转发MCP生成的指令,让行空板M10做成相应的动作。文件名为:电脑UDP转发小智控制.py
import socket
import json
import time
def send_command(command):
udp_ip = "192.168.31.134"
udp_port = 5005
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(command.encode('utf-8'), (udp_ip, udp_port))
print(f"已发送指令:{command}")
sock.close()
def parse_and_send(json_message):
try:
data = json.loads(json_message)
if not data.get("success", False):
return
action = data["action"]
if action == "led_on":
send_command("led_on")
elif action == "led_off":
send_command("led_off")
elif action == "motorA_forward":
speed = data.get("speed", 0)
send_command(f"motorA_forward:{speed}")
elif action == "motorA_reverse":
speed = data.get("speed", 0)
send_command(f"motorA_reverse:{speed}")
elif action == "motorB_forward":
speed = data.get("speed", 0)
send_command(f"motorB_forward:{speed}")
elif action == "motorB_reverse":
speed = data.get("speed", 0)
send_command(f"motorB_reverse:{speed}")
elif action == "servo_angle":
angle = data.get("angle", 90)
send_command(f"servo_angle:{angle}")
elif action == "neopixel_color":
color = data.get("color", [0, 0, 0])
send_command(f"neopixel_color:{','.join(map(str, color))}")
elif action == "show_time":
send_command("show_time")
elif action == "show_clock":
send_command("show_clock")
elif action == "show_image":
image_path = data.get("image_path", "")
send_command(f"show_image:{image_path}")
elif action == "draw_arrow":
direction = data.get("direction", "up")
send_command(f"draw_arrow:{direction}")
elif action == "draw_polygon":
points = data.get("points", [])
points_str = "[" + ",".join([f"({x},{y})" for x, y in points]) + "]"
send_command(f"draw_polygon:{points_str}")
elif action == "buzzer_tone":
frequency = data.get("frequency", 440)
duration = data.get("duration", 500)
send_command(f"buzzer_tone:{frequency},{duration}")
elif action == "ir_send":
code = data.get("code", "0x0")
send_command(f"ir_send:{code}")
except json.JSONDecodeError:
print("Invalid JSON message")
except KeyError as e:
print(f"Missing key in message: {e}")
def udp_receive_and_forward():
# 创建一个UDP套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定地址和端口
host = '0.0.0.0' # 绑定到所有可用的网络接口
port = 8089 # 监听的端口号
sock.bind((host, port))
print(f"UDP服务器启动,正在监听端口 {port}...")
while True:
# 接收数据
data, addr = sock.recvfrom(1024) # 缓冲区大小为1024字节
log_message = data.decode('utf-8') # 将接收到的数据解码为字符串
print(f"收到原始消息: {log_message}")
# 解析并转发消息
parse_and_send(log_message)
if __name__ == "__main__":
udp_receive_and_forward()
4、执行端:
最后就是行空板上需要运行的程序,实现多种设备的控制,把几个设备的驱动的示例进行融合,然后做成一个设备自检的程序,再把这个程序启用UDP接口,以便接入上位机的指令。实现基础测试的代码,文件名为:行空UDPAPI.py:
import socket
import threading
from pinpong.board import Board, Pin, NeoPixel, Servo, IRRemote, IRRecv
from pinpong.extension.unihiker import *
import cv2
from PIL import Image
from unihiker import GUI
import time
import datetime
class ResourceManager:
def __init__(self):
Board().begin()
self.led = Pin(Pin.P25, Pin.OUT)
self.pwm8 = Pin(Pin.P8, Pin.PWM)
self.pwm16 = Pin(Pin.P16, Pin.PWM)
self.dir5 = Pin(Pin.P5, Pin.OUT)
self.dir6 = Pin(Pin.P6, Pin.OUT)
self.servo = Servo(Pin(Pin.D0))
self.np = NeoPixel(Pin(Pin.P13), 2)
self.gui = GUI()
self.vd = cv2.VideoCapture(0)
self.clk = self.gui.draw_clock(x=120, y=160, r=80, h=10, m=8, s=0, color="#0000FF")
self.buzzer = IRRemote(Pin(Pin.P15)) # 假设蜂鸣器连接到P15
self.ir_recv = IRRecv(Pin(Pin.P14), self.ir_event) # 假设红外接收器连接到P14
self.udp_ip = "127.0.0.1"
self.udp_port = 5005
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.udp_ip, self.udp_port))
self.running = True
threading.Thread(target=self.udp_listener).start()
def udp_listener(self):
while self.running:
data, addr = self.sock.recvfrom(1024)
command = data.decode('utf-8')
print(f"Received command: {command}")
self.handle_command(command)
def handle_command(self, command):
if command == "led_on":
self.led.write_digital(1)
elif command == "led_off":
self.led.write_digital(0)
elif command.startswith("motorA_forward:"):
speed = int(command.split(":")[1])
self.motorA_forward(speed)
elif command.startswith("motorA_reverse:"):
speed = int(command.split(":")[1])
self.motorA_reverse(speed)
elif command.startswith("motorB_forward:"):
speed = int(command.split(":")[1])
self.motorB_forward(speed)
elif command.startswith("motorB_reverse:"):
speed = int(command.split(":")[1])
self.motorB_reverse(speed)
elif command.startswith("servo_angle:"):
angle = int(command.split(":")[1])
self.servo.write_angle(angle)
elif command.startswith("neopixel_color:"):
color = tuple(map(int, command.split(":")[1].split(",")))
self.neopixel_color(color)
elif command == "show_time":
self.show_time()
elif command == "show_clock":
self.show_clock()
elif command == "take_photo":
self.take_photo()
elif command == "video_stream":
self.video_stream()
elif command.startswith("buzzer_tone:"):
frequency, duration = map(int, command.split(":")[1].split(","))
self.buzzer_tone(frequency, duration)
elif command.startswith("ir_send:"):
code = int(command.split(":")[1], 16)
self.ir_send(code)
def motorA_forward(self, speed):
self.pwm8.write_analog(speed)
self.dir5.write_digital(1)
def motorA_reverse(self, speed):
self.pwm8.write_analog(speed)
self.dir5.write_digital(0)
def motorB_forward(self, speed):
self.pwm16.write_analog(speed)
self.dir6.write_digital(1)
def motorB_reverse(self, speed):
self.pwm16.write_analog(speed)
self.dir6.write_digital(0)
def neopixel_color(self, color):
self.np.range_color(0, 2, color)
def show_time(self):
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.gui.clear()
self.gui.draw_text(x=120, y=160, text=current_time, origin="center", color="red", font_size=25)
def show_clock(self):
self.clk.config(
h=datetime.datetime.now().hour,
m=datetime.datetime.now().minute,
s=datetime.datetime.now().second
)
def take_photo(self):
ret, img = self.vd.read()
if ret:
img = cv2.resize(img, (240, 240))
img = cv2.flip(img, 0)
img = cv2.rotate(img, cv2.ROTATE_180)
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
self.gui.draw_image(image=img, x=0, y=0)
def video_stream(self):
ret, img = self.vd.read()
if ret:
img = cv2.resize(img, (240, 240))
img = cv2.flip(img, 0)
img = cv2.rotate(img, cv2.ROTATE_180)
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
self.gui.draw_image(image=img, x=0, y=0)
def buzzer_tone(self, frequency, duration):
self.buzzer.tone(frequency, duration)
def ir_send(self, code):
self.buzzer.send(code)
def ir_event(self, data):
print(f"Received IR code: {data}")
def stop(self):
self.running = False
self.sock.close()
self.vd.release()
if __name__ == "__main__":
manager = ResourceManager()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
manager.stop()
接下来,启动系统:
步骤1 启用小智MCP服务:python mcp_pipe.py calculator.py
步骤2 启用UDP转发服务:电脑UDP转发小智控制.py
步骤3 运行行空板程序:行空UDPAPI.py
步骤4 唤醒小智,用自然语言控制你的设备吧!
总结:
这个测试系统,测试了整个链路的通讯和设备的控制,主要有舵机,电机,LED灯,2812灯,蜂鸣器,红外发射,屏幕等,具备了这个能力后,后面准备做成RC_CAR的形态,再去执行指定的任务,应该是可行的。但目前测试下来会有指令丢包的情况,不知道哪个环节的问题,UDP转发肯定可以简化,后面有新进展再做分享。
评论