回到首页 返回首页
回到顶部 回到顶部
返回上一页 返回上一页

行空板M10扩展板 —— 小智MCP(基础测试) 简单

头像 sky007 2025.06.25 35 0

行空板M10扩展板 —— 小智MCP(基础测试)

 

随着小智的出现,让大语言模型交互触手可及,又随着小智MCP的应用,使硬件更为智能,DIY的方式更加丰富。对于M10这套电机及电池扩展版,非常轻巧方便,我想让他成为一辆智能车的控制系统,接入小智,让他成为一辆受控的智能RC车。但在这之前,我们要做一些简单测试,后面再做进一步深入。

 

实现过程有些复杂,我们先直接上基础测试的视频来看一下效果。

首先,我们先做一个设备自检,确保设备都能正常运行:

确保设备没问题,然后接入小智,用小智控制硬件设备:

那么,这是如何实现的呢?我们先说一下整体框架:嘉立创ESP32实战派作为上位机,行空板作为下位机,经过小智的理解和MCP的控制,能够实现自然语言精准控制接入的硬件设备。以下是整个实现过程:

whiteboard_exported_image (2).png

接下来,我们来实操一下,先把基础测试打通,后续搭建成智能小车,再进一步探索.你需要准备如下工作:

1、接收端:

首先准备一套M10轻便套装,M10开发板+电池扩展版+电机扩展版,采购链接为https://www.dfrobot.com.cn/goods-4127.html

 

 

2、控制端:

可以是电脑的pyxiaozhi,可以是手机的app,也可以是各种智能派。还有虾哥开源的小智ESP32,具体可到小智官方github的项目去了解 https://github.com/78/xiaozhi-esp32/tree/main

 

image.png

我这次选用的是嘉立创的实战派,刷好对应的固件:

 

通过网盘分享的文件:v1.4.6_lichuang-dev.zip

链接: https://pan.baidu.com/s/1dNOaRMxORnXE3IErOomtTg?pwd=LCKF

提取码: LCKF

image.png

3、服务端:

还需要一台电脑,跑DIY的小智MCP服务,把这个项目下载到本地

https://github.com/78/mcp-calculator/tree/main

 

除了MCP服务,还要自己跑一个UDP转发的服务,把小智的消息转发给行空板,后续可以把他简化掉。

 

4、测试设备清单:

a)LED灯

b)TT 马达

c)SG90 舵机

d)USB摄像头

 

最后直接上干货,烧录固件或者运行代码,让体统能够正常运行起来:

1、控制端:

嘉立创ESP32S3实战派烧录好固件,配好网,在xiaozhi.me注册好,并获取MCP站点信息。

2、服务端1:

定义小智控制设备的功能实现,把下载好的MCP中的calculator.py进行修改,增加行空板M10的接入设备控制功能,替换掉原来的文件:

代码
from mcp.server.fastmcp import FastMCP
import sys
import logging
import socket
import json

logger = logging.getLogger('RC_Car')

# Fix UTF-8 encoding for Windows console
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# UDP Broadcast configuration
UDP_BROADCAST_PORT = 8089
UDP_BROADCAST_ADDR = '255.255.255.255'

def setup_udp_socket():
    """Set up a UDP socket for broadcasting"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    return sock

def broadcast_result(result_data):
    """Broadcast the result via UDP"""
    try:
        sock = setup_udp_socket()
        message = json.dumps(result_data).encode('utf-8')
        sock.sendto(message, (UDP_BROADCAST_ADDR, UDP_BROADCAST_PORT))
        sock.close()
        logger.info(f"Result broadcasted: {result_data}")
    except Exception as e:
        logger.error(f"Failed to broadcast result: {e}")

# Create an MCP server
mcp = FastMCP("RC_Car")

# LED Control Commands
@mcp.tool()
def led_on() -> dict:
    """
    Turn on the LED.
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info("Turning on LED")
    result_data = {
        "success": True,
        "action": "led_on"
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def led_off() -> dict:
    """
    Turn off the LED.
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info("Turning off LED")
    result_data = {
        "success": True,
        "action": "led_off"
    }
    broadcast_result(result_data)
    return result_data

# Motor Control Commands
@mcp.tool()
def motorA_forward(speed: int) -> dict:
    """
    Move motor A forward with specified speed.
    
    Args:
        speed: Speed value (0-1000)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Moving motor A forward at speed {speed}")
    result_data = {
        "success": True,
        "action": "motorA_forward",
        "speed": speed
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def motorA_reverse(speed: int) -> dict:
    """
    Move motor A in reverse with specified speed.
    
    Args:
        speed: Speed value (0-1000)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Moving motor A in reverse at speed {speed}")
    result_data = {
        "success": True,
        "action": "motorA_reverse",
        "speed": speed
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def motorB_forward(speed: int) -> dict:
    """
    Move motor B forward with specified speed.
    
    Args:
        speed: Speed value (0-1000)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Moving motor B forward at speed {speed}")
    result_data = {
        "success": True,
        "action": "motorB_forward",
        "speed": speed
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def motorB_reverse(speed: int) -> dict:
    """
    Move motor B in reverse with specified speed.
    
    Args:
        speed: Speed value (0-1000)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Moving motor B in reverse at speed {speed}")
    result_data = {
        "success": True,
        "action": "motorB_reverse",
        "speed": speed
    }
    broadcast_result(result_data)
    return result_data

# Servo Control Command
@mcp.tool()
def servo_angle(angle: int) -> dict:
    """
    Set servo to specified angle.
    
    Args:
        angle: Angle value (0-180)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Setting servo angle to {angle}")
    result_data = {
        "success": True,
        "action": "servo_angle",
        "angle": angle
    }
    broadcast_result(result_data)
    return result_data

# Neopixel Control Command
@mcp.tool()
def neopixel_color(red: int, green: int, blue: int) -> dict:
    """
    Set Neopixel color using RGB values.
    
    Args:
        red: Red value (0-255)
        green: Green value (0-255)
        blue: Blue value (0-255)
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Setting Neopixel color to R:{red} G:{green} B:{blue}")
    result_data = {
        "success": True,
        "action": "neopixel_color",
        "color": [red, green, blue]
    }
    broadcast_result(result_data)
    return result_data

# Display Commands
@mcp.tool()
def show_time() -> dict:
    """
    Display current time on screen.
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info("Displaying current time")
    result_data = {
        "success": True,
        "action": "show_time"
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def show_clock() -> dict:
    """
    Display analog clock on screen.
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info("Displaying analog clock")
    result_data = {
        "success": True,
        "action": "show_clock"
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def show_image(image_path: str) -> dict:
    """
    Display specified image on screen.
    
    Args:
        image_path: Path to image file
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Displaying image: {image_path}")
    result_data = {
        "success": True,
        "action": "show_image",
        "image_path": image_path
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def draw_arrow(direction: str) -> dict:
    """
    Draw arrow in specified direction.
    
    Args:
        direction: Arrow direction ("up", "down", "left", "right")
    
    Returns:
        Dictionary with operation status and details
    """
    valid_directions = ["up", "down", "left", "right"]
    if direction.lower() not in valid_directions:
        raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
    
    logger.info(f"Drawing arrow pointing {direction}")
    result_data = {
        "success": True,
        "action": "draw_arrow",
        "direction": direction
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def draw_polygon(points: list) -> dict:
    """
    Draw polygon with specified points.
    
    Args:
        points: List of (x,y) coordinate tuples
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Drawing polygon with points: {points}")
    result_data = {
        "success": True,
        "action": "draw_polygon",
        "points": points
    }
    broadcast_result(result_data)
    return result_data

# Camera Commands (take_photo already exists)
@mcp.tool()
def video_stream() -> dict:
    """
    Start video streaming from camera.
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info("Starting video stream")
    result_data = {
        "success": True,
        "action": "video_stream"
    }
    broadcast_result(result_data)
    return result_data

# Buzzer Control Command
@mcp.tool()
def buzzer_tone(frequency: int, duration: int) -> dict:
    """
    Play buzzer tone with specified frequency and duration.
    
    Args:
        frequency: Tone frequency in Hz
        duration: Duration in seconds
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Playing buzzer tone at {frequency}Hz for {duration}ms")
    result_data = {
        "success": True,
        "action": "buzzer_tone",
        "frequency": frequency,
        "duration": duration
    }
    broadcast_result(result_data)
    return result_data

# IR Signal Command
@mcp.tool()
def ir_send(code: str) -> dict:
    """
    Send IR signal with specified code.
    
    Args:
        code: IR code in hexadecimal format
    
    Returns:
        Dictionary with operation status and details
    """
    logger.info(f"Sending IR signal: {code}")
    result_data = {
        "success": True,
        "action": "ir_send",
        "code": code
    }
    broadcast_result(result_data)
    return result_data

# Original RC Car Control Functions (kept from previous version)
@mcp.tool()
def move_forward(distance: float) -> dict:
    """Move the RC car forward by a specified distance."""
    logger.info(f"Moving forward {distance} meters")
    result_data = {
        "success": True,
        "action": "move_forward",
        "distance": distance
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def move_backward(distance: float) -> dict:
    """Move the RC car backward by a specified distance."""
    logger.info(f"Moving backward {distance} meters")
    result_data = {
        "success": True,
        "action": "move_backward",
        "distance": distance
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def turn_left(angle: float) -> dict:
    """Turn the RC car left by a specified angle."""
    logger.info(f"Turning left {angle} degrees")
    result_data = {
        "success": True,
        "action": "turn_left",
        "angle": angle
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def turn_right(angle: float) -> dict:
    """Turn the RC car right by a specified angle."""
    logger.info(f"Turning right {angle} degrees")
    result_data = {
        "success": True,
        "action": "turn_right",
        "angle": angle
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def turn_signal(direction: str) -> dict:
    """Activate the turn signal (left or right) on the RC car."""
    valid_directions = ["left", "right"]
    if direction.lower() not in valid_directions:
        raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
    
    logger.info(f"Activating {direction} turn signal")
    result_data = {
        "success": True,
        "action": "turn_signal",
        "direction": direction
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def follow_line() -> dict:
    """Start the line-following mode on the RC car."""
    logger.info("Starting line-following mode")
    result_data = {
        "success": True,
        "action": "follow_line"
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def track_object() -> dict:
    """Start object tracking mode on the RC car."""
    logger.info("Starting object tracking mode")
    result_data = {
        "success": True,
        "action": "track_object"
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def get_current_direction() -> dict:
    """Get the current direction of the RC car."""
    logger.info("Getting current direction")
    direction = "north"  # Placeholder value
    result_data = {
        "success": True,
        "action": "get_current_direction",
        "direction": direction
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def get_current_temperature() -> dict:
    """Get the current temperature reading from the RC car's sensor."""
    logger.info("Getting current temperature")
    temperature = 25.0  # Placeholder value
    result_data = {
        "success": True,
        "action": "get_current_temperature",
        "temperature": temperature
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def get_current_light_intensity() -> dict:
    """Get the current light intensity reading from the RC car's sensor."""
    logger.info("Getting current light intensity")
    light_intensity = 500  # Placeholder value
    result_data = {
        "success": True,
        "action": "get_current_light_intensity",
        "light_intensity": light_intensity
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def draw_shape(shape: str) -> dict:
    """Draw a specified shape using the RC car's movement."""
    valid_shapes = ["circle", "square", "triangle"]
    if shape.lower() not in valid_shapes:
        raise ValueError(f"Invalid shape '{shape}'. Must be one of {valid_shapes}")
    
    logger.info(f"Drawing shape: {shape}")
    result_data = {
        "success": True,
        "action": "draw_shape",
        "shape": shape
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def turn_on_turn_signal(direction: str) -> dict:
    """Turn on the turn signal (left or right) on the RC car."""
    valid_directions = ["left", "right"]
    if direction.lower() not in valid_directions:
        raise ValueError(f"Invalid direction '{direction}'. Must be one of {valid_directions}")
    
    logger.info(f"Turning on {direction} turn signal")
    result_data = {
        "success": True,
        "action": "turn_on_turn_signal",
        "direction": direction
    }
    broadcast_result(result_data)
    return result_data

@mcp.tool()
def turn_on_color_light(color: str) -> dict:
    """Turn on the color light on the RC car."""
    valid_colors = ["red", "rainbow"]
    if color.lower() not in valid_colors:
        raise ValueError(f"Invalid color '{color}'. Must be one of {valid_colors}")
    
    logger.info(f"Turning on {color} color light")
    result_data = {
        "success": True,
        "action": "turn_on_color_light",
        "color": color
    }
    broadcast_result(result_data)
    return result_data

# Start the server
if __name__ == "__main__":
    mcp.run(transport="stdio")

3、服务端2:

负责转发MCP生成的指令,让行空板M10做成相应的动作。文件名为:电脑UDP转发小智控制.py

代码
import socket
import json
import time

def send_command(command):
    udp_ip = "192.168.31.134"
    udp_port = 5005
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(command.encode('utf-8'), (udp_ip, udp_port))
    print(f"已发送指令:{command}")
    sock.close()

def parse_and_send(json_message):
    try:
        data = json.loads(json_message)
        if not data.get("success", False):
            return
            
        action = data["action"]
        
        if action == "led_on":
            send_command("led_on")
        elif action == "led_off":
            send_command("led_off")
        elif action == "motorA_forward":
            speed = data.get("speed", 0)
            send_command(f"motorA_forward:{speed}")
        elif action == "motorA_reverse":
            speed = data.get("speed", 0)
            send_command(f"motorA_reverse:{speed}")
        elif action == "motorB_forward":
            speed = data.get("speed", 0)
            send_command(f"motorB_forward:{speed}")
        elif action == "motorB_reverse":
            speed = data.get("speed", 0)
            send_command(f"motorB_reverse:{speed}")
        elif action == "servo_angle":
            angle = data.get("angle", 90)
            send_command(f"servo_angle:{angle}")
        elif action == "neopixel_color":
            color = data.get("color", [0, 0, 0])
            send_command(f"neopixel_color:{','.join(map(str, color))}")
        elif action == "show_time":
            send_command("show_time")
        elif action == "show_clock":
            send_command("show_clock")
        elif action == "show_image":
            image_path = data.get("image_path", "")
            send_command(f"show_image:{image_path}")
        elif action == "draw_arrow":
            direction = data.get("direction", "up")
            send_command(f"draw_arrow:{direction}")
        elif action == "draw_polygon":
            points = data.get("points", [])
            points_str = "[" + ",".join([f"({x},{y})" for x, y in points]) + "]"
            send_command(f"draw_polygon:{points_str}")
        elif action == "buzzer_tone":
            frequency = data.get("frequency", 440)
            duration = data.get("duration", 500)
            send_command(f"buzzer_tone:{frequency},{duration}")
        elif action == "ir_send":
            code = data.get("code", "0x0")
            send_command(f"ir_send:{code}")
            
    except json.JSONDecodeError:
        print("Invalid JSON message")
    except KeyError as e:
        print(f"Missing key in message: {e}")

def udp_receive_and_forward():
    # 创建一个UDP套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    # 绑定地址和端口
    host = '0.0.0.0'  # 绑定到所有可用的网络接口
    port = 8089      # 监听的端口号
    sock.bind((host, port))
    
    print(f"UDP服务器启动,正在监听端口 {port}...")

    while True:
        # 接收数据
        data, addr = sock.recvfrom(1024)  # 缓冲区大小为1024字节
        log_message = data.decode('utf-8')  # 将接收到的数据解码为字符串
        print(f"收到原始消息: {log_message}")
        
        # 解析并转发消息
        parse_and_send(log_message)

if __name__ == "__main__":
    udp_receive_and_forward()

4、执行端:

最后就是行空板上需要运行的程序,实现多种设备的控制,把几个设备的驱动的示例进行融合,然后做成一个设备自检的程序,再把这个程序启用UDP接口,以便接入上位机的指令。实现基础测试的代码,文件名为:行空UDPAPI.py:

代码
import socket
import threading
from pinpong.board import Board, Pin, NeoPixel, Servo, IRRemote, IRRecv
from pinpong.extension.unihiker import *
import cv2
from PIL import Image
from unihiker import GUI
import time
import datetime

class ResourceManager:
    def __init__(self):
        Board().begin()
        self.led = Pin(Pin.P25, Pin.OUT)
        self.pwm8 = Pin(Pin.P8, Pin.PWM)
        self.pwm16 = Pin(Pin.P16, Pin.PWM)
        self.dir5 = Pin(Pin.P5, Pin.OUT)
        self.dir6 = Pin(Pin.P6, Pin.OUT)
        self.servo = Servo(Pin(Pin.D0))
        self.np = NeoPixel(Pin(Pin.P13), 2)
        self.gui = GUI()
        self.vd = cv2.VideoCapture(0)
        self.clk = self.gui.draw_clock(x=120, y=160, r=80, h=10, m=8, s=0, color="#0000FF")
        self.buzzer = IRRemote(Pin(Pin.P15))  # 假设蜂鸣器连接到P15
        self.ir_recv = IRRecv(Pin(Pin.P14), self.ir_event)  # 假设红外接收器连接到P14

        self.udp_ip = "127.0.0.1"
        self.udp_port = 5005
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind((self.udp_ip, self.udp_port))
        self.running = True

        threading.Thread(target=self.udp_listener).start()

    def udp_listener(self):
        while self.running:
            data, addr = self.sock.recvfrom(1024)
            command = data.decode('utf-8')
            print(f"Received command: {command}")
            self.handle_command(command)

    def handle_command(self, command):
        if command == "led_on":
            self.led.write_digital(1)
        elif command == "led_off":
            self.led.write_digital(0)
        elif command.startswith("motorA_forward:"):
            speed = int(command.split(":")[1])
            self.motorA_forward(speed)
        elif command.startswith("motorA_reverse:"):
            speed = int(command.split(":")[1])
            self.motorA_reverse(speed)
        elif command.startswith("motorB_forward:"):
            speed = int(command.split(":")[1])
            self.motorB_forward(speed)
        elif command.startswith("motorB_reverse:"):
            speed = int(command.split(":")[1])
            self.motorB_reverse(speed)
        elif command.startswith("servo_angle:"):
            angle = int(command.split(":")[1])
            self.servo.write_angle(angle)
        elif command.startswith("neopixel_color:"):
            color = tuple(map(int, command.split(":")[1].split(",")))
            self.neopixel_color(color)
        elif command == "show_time":
            self.show_time()
        elif command == "show_clock":
            self.show_clock()
        elif command == "take_photo":
            self.take_photo()
        elif command == "video_stream":
            self.video_stream()
        elif command.startswith("buzzer_tone:"):
            frequency, duration = map(int, command.split(":")[1].split(","))
            self.buzzer_tone(frequency, duration)
        elif command.startswith("ir_send:"):
            code = int(command.split(":")[1], 16)
            self.ir_send(code)

    def motorA_forward(self, speed):
        self.pwm8.write_analog(speed)
        self.dir5.write_digital(1)

    def motorA_reverse(self, speed):
        self.pwm8.write_analog(speed)
        self.dir5.write_digital(0)

    def motorB_forward(self, speed):
        self.pwm16.write_analog(speed)
        self.dir6.write_digital(1)

    def motorB_reverse(self, speed):
        self.pwm16.write_analog(speed)
        self.dir6.write_digital(0)

    def neopixel_color(self, color):
        self.np.range_color(0, 2, color)

    def show_time(self):
        current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        self.gui.clear()
        self.gui.draw_text(x=120, y=160, text=current_time, origin="center", color="red", font_size=25)

    def show_clock(self):
        self.clk.config(
            h=datetime.datetime.now().hour,
            m=datetime.datetime.now().minute,
            s=datetime.datetime.now().second
        )

    def take_photo(self):
        ret, img = self.vd.read()
        if ret:
            img = cv2.resize(img, (240, 240))
            img = cv2.flip(img, 0)
            img = cv2.rotate(img, cv2.ROTATE_180)
            img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            self.gui.draw_image(image=img, x=0, y=0)

    def video_stream(self):
        ret, img = self.vd.read()
        if ret:
            img = cv2.resize(img, (240, 240))
            img = cv2.flip(img, 0)
            img = cv2.rotate(img, cv2.ROTATE_180)
            img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            self.gui.draw_image(image=img, x=0, y=0)

    def buzzer_tone(self, frequency, duration):
        self.buzzer.tone(frequency, duration)

    def ir_send(self, code):
        self.buzzer.send(code)

    def ir_event(self, data):
        print(f"Received IR code: {data}")

    def stop(self):
        self.running = False
        self.sock.close()
        self.vd.release()

if __name__ == "__main__":
    manager = ResourceManager()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        manager.stop()

接下来,启动系统:

步骤1 启用小智MCP服务:python mcp_pipe.py calculator.py

步骤2 启用UDP转发服务:电脑UDP转发小智控制.py

步骤3 运行行空板程序:行空UDPAPI.py

步骤4 唤醒小智,用自然语言控制你的设备吧!

总结:

这个测试系统,测试了整个链路的通讯和设备的控制,主要有舵机,电机,LED灯,2812灯,蜂鸣器,红外发射,屏幕等,具备了这个能力后,后面准备做成RC_CAR的形态,再去执行指定的任务,应该是可行的。但目前测试下来会有指令丢包的情况,不知道哪个环节的问题,UDP转发肯定可以简化,后面有新进展再做分享。

评论

user-avatar