18511580320 发表于 2022-10-26 09:32:10

MAVLink系列——实战三:UDP实现MAVLink接收与发送 ...

MAVLink 实战三:UDP实现MAVLink接收与发送

参考

MAVLink GitHub源码地址:https://github.com/mavlink/mavlink
MAVLink官方文档:https://mavlink.io/en/messages/common.html
本节内容

通过前面两个实战,我们实现了简单的MAVLink消息的接收与发送,然而实际运行时收发应该是一体的,为了更好的工程化应用,接下来整合接收与发送代码,合并为一个类!
所有代码都在gitee上:https://gitee.com/wujinDrone/mavlink_tutorial#https://www.yuque.com/u21969096/cez3q8
MAVAPI类框架

为了工程化应用,我们希望将MAVLink通信相关封装为一个类,提供API接口供其他人使用。首先我们先构思一下框架,新建一个test_drone_udp.py文件,其主要内容有:

[*]初始化中实例化底层通信类和MAVLink类作为成员变量,这样才能够通过该成员变量进行消息收发;
[*]需要有一个循环,一直接收并解析消息;
[*]最好有一个循环,一直发送心跳包;
[*]其他必要的功能函数,如发送无人机解锁/上锁指令、起飞指令的实现,以便作为API接口函数供外部调用。
这样,一个最简单的代码框架如下:
#!/usr/bin/env python3
import time
import socket
from mavlink import *

class DeviceUdp(object):
    def __init__(self, local_addr, target_addr=None):
      pass
    def write(self, buf):
      pass
    def read(self, size):
      pass
class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
      # 必要的成员变量
      self.sys_id = 254
      self.cmp_id = 1
      # 实例化UDP底层通信类
      self.dev = DeviceUdp(local_addr, target_addr)
      # 实例化MAVLink通信类
      self.mav = MAVLink(self.dev, self.sys_id, self.cmp_id)

    def th_receive_fun(self):
      """
      一直接收数据并解析
      """
      while True:
            pass
    def th_send_heartbeat_fun(self):
      """
      以一定频率发送心跳包
      """
      while True:
            print("Send Heartbeat")
            time.sleep(1)
    def arm_disarm(self):
      """
      发送解锁上锁指令
      """
    def takeoff(self):
      """
      发送起飞指令
      """
      pass
    def exit(self):
      """
      退出,关闭UDP通信
      """
      self.mav.close()其中DeviceUDP类前面实战中已经实现,代码如下:
class DeviceUdp(object):
    """
    """
    def __init__(self, local_addr, target_addr=None):
      """
      local_port: local port to receive data
      For example, target_addr = ('192.168.4.1', 18750)
      """
      self.dev         = None
      self.local_addr= local_addr
      self.target_addr = target_addr

      self.dev = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      self.dev.bind(self.local_addr)

    def write(self, buf):
      send_callback = self.dev.sendto(buf, self.target_addr)
    def read(self, size):
      buf, source_addr = self.dev.recvfrom(size)
      return buf
    def close(self):
      try:
            self.dev.close()
      except Exception as e:
            print(e)通过线程实现循环

消息接收和解析循环

通过启动线程,注册回调函数,在函数中通过while循环实现消息的解析!
class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
      # 各类线程实例,并开启线程
      self.th_msg_receive    = threading.Thread(daemon=True, target=self.th_receive_fun)
      self.th_msg_receive.start()
    def th_receive_fun(self):
      """
      线程回调函数:一直接收数据并解析
      """
      while True:
            data = self.dev.read(256)
            if len(data)>0:
                # 收到数据,解析mavlink消息
                try:
                  self.cur_mav_msg = self.mav.parse_char(data)
                except Exception as e:
                  continue
                if isinstance(self.cur_mav_msg, MAVLink_message):
                  pass

                if isinstance(self.cur_mav_msg, MAVLink_heartbeat_message):
                  print(self.cur_mav_msg)
                  print("INFO: sys={}, cmp={}".format(self.cur_mav_msg.get_srcSystem(), self.cur_mav_msg.get_srcComponent()))
                  print("INFO: type={}, autopilot={}".format(self.cur_mav_msg.type, self.cur_mav_msg.autopilot))

                if isinstance(self.cur_mav_msg, MAVLink_attitude_message):
                  print("INFO: roll={:.3f}, pitch={:.3f}, yaw={:.3f}".format(self.cur_mav_msg.roll, self.cur_mav_msg.pitch, self.cur_mav_msg.yaw))
                  self.roll_NED= self.cur_mav_msg.roll
                  self.pitch_NED = self.cur_mav_msg.pitch
                  self.yaw_NED   = self.cur_mav_msg.yaw心跳包发送循环

class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
      # 各类线程实例,并开启线程
      self.th_msg_receive    = threading.Thread(daemon=True, target=self.th_receive_fun)
      self.th_send_heartbeat = threading.Thread(daemon=True, target=self.th_send_heartbeat_fun)
      self.th_msg_receive.start()
    def th_send_heartbeat_fun(self):
      """
      线程回调函数:以一定频率发送心跳包
      """
      while True:
            ### send as a onboard computer or GCS
            #msg = MAVLink_heartbeat_message(type=MAV_TYPE_GCS, autopilot=MAV_AUTOPILOT_INVALID, base_mode=0, custom_mode=0,
            #                              system_status=MAV_STATE_ACTIVE, mavlink_version=3)
            ### send as a drone simulator
            msg = MAVLink_heartbeat_message(type=MAV_TYPE_QUADROTOR, autopilot=MAV_AUTOPILOT_PX4, base_mode=MAV_MODE_PREFLIGHT, custom_mode=0x60000,
                                          system_status=MAV_STATE_ACTIVE, mavlink_version=3)

            self.mav.send(msg)
            time.sleep(1)发送控制指令

解锁/上锁

class MAVAPI(object):
    """
    """
    def __init__(self, local_addr, target_addr):
      # 必要的成员变量
      self.tgt_system_id    = 1
      self.tgt_component_id = 1
    def arm_disarm(self, arm):
      """
      arm:
            - False: disarm
            - True: arm
      """
      arm = 1 if arm else 0
      msg_cmd = MAVLink_command_long_message(self.tgt_system_id, self.tgt_component_id, MAV_CMD_COMPONENT_ARM_DISARM, 0, arm, 0, 0, 0, 0, 0, 0)
      self.mav.send(msg_cmd)起飞/降落

class MAVAPI(object):
    """
    """
    def takeoff_land(self, cmd='takeoff'):
      """
      发送起飞指令,通过MAVLink SET_MODE(#11)消息实现原地起飞
      """
      base_mode   = 129
      if cmd == 'takeoff':
            custom_mode = 0x2040000
      elif cmd == 'land':
            custom_mode = 0x6040000

      msg = MAVLink_set_mode_message(self.tgt_system_id, base_mode=base_mode, custom_mode=custom_mode)
      self.mav.send(msg)测试

与GCS进行通信测试

启动QGC

如果QGC软件放在桌面上,通过如下命令启动
$ cd Desktop
$ ./QGroundControl.AppImage 注意:如果QGC存放路径不是Desktop,则需要进入具体路径后启动即可。
启动UDP测试程序

添加一段测试代码:
def test_gcs():
    local_addr= ('', 14540)
    target_addr = ('127.0.0.1', 14550)
    mavapi = MAVAPI(local_addr, target_addr)
if __name__ == '__main__':
    test_gcs()
    while True:
      time.sleep(1)运行:
$ python3 test_drone_udp.py会看到UDP端会一直打印QGC发送的相关信息,进入QGC->Analyze Tools->MAVLink Inspector,可以看到UDP发送的heartbeat消息!
PX4 Simulator的通信测试

启动PX4 Simulator

这里我们使用PX4 gazebo仿真,需要下载PX4仓库(PX4-Autopilot)源码,具体配置方法可以参考官网:
进入PX4-Autopilot,运行如下命令启动:
$ make px4_sitl_default gazebo启动UDP测试程序

首先添加一段UDP测试程序,主要是调用mavapi中的解锁、起飞等控制接口,实现对无人机控制,如下:
def test_px4_sim():
    local_addr= ('', 14540)
    target_addr = ('127.0.0.1', 14580)
    mavapi = MAVAPI(local_addr, target_addr)

    time.sleep(3)
    mavapi.arm_disarm(True)
    time.sleep(2)
    mavapi.takeoff_land('takeoff')
    time.sleep(5)
    mavapi.takeoff_land('land')

if __name__ == '__main__':
    test_px4_sim() 注意这里用到的端口号!如果端口设置错误,无法与仿真器通信。
运行:
$ python3 test_drone_udp.py会看到UDP端一直打印无人机姿态信息,gazebo仿真环境中无人机能够解锁、起飞、降落、上锁!
页: [1]
查看完整版本: MAVLink系列——实战三:UDP实现MAVLink接收与发送 ...