查看: 728|回复: 0

MAVLink系列——实战三:UDP实现MAVLink接收与发送 ...

[复制链接]

37

主题

862

帖子

1710

积分

金牌飞友

Rank: 6Rank: 6

积分
1710
飞币
847
注册时间
2017-8-16
发表于 2022-10-26 09:32:10 | 显示全部楼层 |阅读模式
MAVLink 实战三:UDP实现MAVLink接收与发送

参考

MAVLink GitHub源码地址:https://github.com/mavlink/mavlink
MAVLink官方文档:https://mavlink.io/en/messages/common.html
本节内容

通过前面两个实战,我们实现了简单的MAVLink消息的接收与发送,然而实际运行时收发应该是一体的,为了更好的工程化应用,接下来整合接收与发送代码,合并为一个类!
所有代码都在gitee上:https://gitee.com/wujinDrone/mavlink_tutorial#https://www.yuque.com/u21969096/cez3q8
MAVAPI类框架

为了工程化应用,我们希望将MAVLink通信相关封装为一个类,提供API接口供其他人使用。首先我们先构思一下框架,新建一个test_drone_udp.py文件,其主要内容有:

  • 初始化中实例化底层通信类和MAVLink类作为成员变量,这样才能够通过该成员变量进行消息收发;
  • 需要有一个循环,一直接收并解析消息;
  • 最好有一个循环,一直发送心跳包;
  • 其他必要的功能函数,如发送无人机解锁/上锁指令、起飞指令的实现,以便作为API接口函数供外部调用。
这样,一个最简单的代码框架如下:
#!/usr/bin/env python3
import time
import socket
from mavlink import *

class DeviceUdp(object):
    def __init__(self, local_addr, target_addr=None):
        pass
    def write(self, buf):
        pass
    def read(self, size):
        pass
class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
        # 必要的成员变量
        self.sys_id = 254
        self.cmp_id = 1
        # 实例化UDP底层通信类
        self.dev = DeviceUdp(local_addr, target_addr)
        # 实例化MAVLink通信类
        self.mav = MAVLink(self.dev, self.sys_id, self.cmp_id)

    def th_receive_fun(self):
        """
        一直接收数据并解析
        """
        while True:
            pass
    def th_send_heartbeat_fun(self):
        """
        以一定频率发送心跳包
        """
        while True:
            print("Send Heartbeat")
            time.sleep(1)
    def arm_disarm(self):
        """
        发送解锁上锁指令
        """
    def takeoff(self):
        """
        发送起飞指令
        """
        pass
    def exit(self):
        """
        退出,关闭UDP通信
        """
        self.mav.close()其中DeviceUDP类前面实战中已经实现,代码如下:
class DeviceUdp(object):
    """
    """
    def __init__(self, local_addr, target_addr=None):
        """
        local_port: local port to receive data
        For example, target_addr = ('192.168.4.1', 18750)
        """
        self.dev         = None
        self.local_addr  = local_addr
        self.target_addr = target_addr

        self.dev = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.dev.bind(self.local_addr)

    def write(self, buf):
        send_callback = self.dev.sendto(buf, self.target_addr)
    def read(self, size):
        buf, source_addr = self.dev.recvfrom(size)
        return buf
    def close(self):
        try:
            self.dev.close()
        except Exception as e:
            print(e)通过线程实现循环

消息接收和解析循环

通过启动线程,注册回调函数,在函数中通过while循环实现消息的解析!
class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
        # 各类线程实例,并开启线程
        self.th_msg_receive    = threading.Thread(daemon=True, target=self.th_receive_fun)
        self.th_msg_receive.start()
    def th_receive_fun(self):
        """
        线程回调函数:一直接收数据并解析
        """
        while True:
            data = self.dev.read(256)
            if len(data)>0:
                # 收到数据,解析mavlink消息
                try:
                    self.cur_mav_msg = self.mav.parse_char(data)
                except Exception as e:
                    continue
                if isinstance(self.cur_mav_msg, MAVLink_message):
                    pass

                if isinstance(self.cur_mav_msg, MAVLink_heartbeat_message):
                    print(self.cur_mav_msg)
                    print("INFO: sys={}, cmp={}".format(self.cur_mav_msg.get_srcSystem(), self.cur_mav_msg.get_srcComponent()))
                    print("INFO: type={}, autopilot={}".format(self.cur_mav_msg.type, self.cur_mav_msg.autopilot))

                if isinstance(self.cur_mav_msg, MAVLink_attitude_message):
                    print("INFO: roll={:.3f}, pitch={:.3f}, yaw={:.3f}".format(self.cur_mav_msg.roll, self.cur_mav_msg.pitch, self.cur_mav_msg.yaw))
                    self.roll_NED  = self.cur_mav_msg.roll
                    self.pitch_NED = self.cur_mav_msg.pitch
                    self.yaw_NED   = self.cur_mav_msg.yaw心跳包发送循环

class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
        # 各类线程实例,并开启线程
        self.th_msg_receive    = threading.Thread(daemon=True, target=self.th_receive_fun)
        self.th_send_heartbeat = threading.Thread(daemon=True, target=self.th_send_heartbeat_fun)
        self.th_msg_receive.start()
    def th_send_heartbeat_fun(self):
        """
        线程回调函数:以一定频率发送心跳包
        """
        while True:
            ### send as a onboard computer or GCS
            #msg = MAVLink_heartbeat_message(type=MAV_TYPE_GCS, autopilot=MAV_AUTOPILOT_INVALID, base_mode=0, custom_mode=0,
            #                                system_status=MAV_STATE_ACTIVE, mavlink_version=3)
            ### send as a drone simulator
            msg = MAVLink_heartbeat_message(type=MAV_TYPE_QUADROTOR, autopilot=MAV_AUTOPILOT_PX4, base_mode=MAV_MODE_PREFLIGHT, custom_mode=0x60000,
                                            system_status=MAV_STATE_ACTIVE, mavlink_version=3)

            self.mav.send(msg)
            time.sleep(1)发送控制指令

解锁/上锁

class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
        # 必要的成员变量
        self.tgt_system_id    = 1
        self.tgt_component_id = 1
    def arm_disarm(self, arm):
        """
        arm:
            - False: disarm
            - True: arm
        """
        arm = 1 if arm else 0
        msg_cmd = MAVLink_command_long_message(self.tgt_system_id, self.tgt_component_id, MAV_CMD_COMPONENT_ARM_DISARM, 0, arm, 0, 0, 0, 0, 0, 0)
        self.mav.send(msg_cmd)起飞/降落

class MAVAPI(object):
    """
    """
    def takeoff_land(self, cmd='takeoff'):
        """
        发送起飞指令,通过MAVLink SET_MODE(#11)消息实现原地起飞
        """
        base_mode   = 129
        if cmd == 'takeoff':
            custom_mode = 0x2040000
        elif cmd == 'land':
            custom_mode = 0x6040000

        msg = MAVLink_set_mode_message(self.tgt_system_id, base_mode=base_mode, custom_mode=custom_mode)
        self.mav.send(msg)测试

与GCS进行通信测试

启动QGC

如果QGC软件放在桌面上,通过如下命令启动
$ cd Desktop
$ ./QGroundControl.AppImage
注意:如果QGC存放路径不是Desktop,则需要进入具体路径后启动即可。
启动UDP测试程序

添加一段测试代码:
def test_gcs():
    local_addr  = ('', 14540)
    target_addr = ('127.0.0.1', 14550)
    mavapi = MAVAPI(local_addr, target_addr)
if __name__ == '__main__':
    test_gcs()
    while True:
        time.sleep(1)运行:
$ python3 test_drone_udp.py会看到UDP端会一直打印QGC发送的相关信息,进入QGC->Analyze Tools->MAVLink Inspector,可以看到UDP发送的heartbeat消息!
PX4 Simulator的通信测试

启动PX4 Simulator

这里我们使用PX4 gazebo仿真,需要下载PX4仓库(PX4-Autopilot)源码,具体配置方法可以参考官网:
进入PX4-Autopilot,运行如下命令启动:
$ make px4_sitl_default gazebo启动UDP测试程序

首先添加一段UDP测试程序,主要是调用mavapi中的解锁、起飞等控制接口,实现对无人机控制,如下:
def test_px4_sim():
    local_addr  = ('', 14540)
    target_addr = ('127.0.0.1', 14580)
    mavapi = MAVAPI(local_addr, target_addr)

    time.sleep(3)
    mavapi.arm_disarm(True)
    time.sleep(2)
    mavapi.takeoff_land('takeoff')
    time.sleep(5)
    mavapi.takeoff_land('land')

if __name__ == '__main__':
    test_px4_sim()
注意这里用到的端口号!如果端口设置错误,无法与仿真器通信。
运行:
$ python3 test_drone_udp.py会看到UDP端一直打印无人机姿态信息,gazebo仿真环境中无人机能够解锁、起飞、降落、上锁!
您需要登录后才可以回帖 登录 | 加入联盟

本版积分规则

快速回复 返回顶部 返回列表