|
MAVLink 实战三:UDP实现MAVLink接收与发送
参考
MAVLink GitHub源码地址:https://github.com/mavlink/mavlink
MAVLink官方文档:https://mavlink.io/en/messages/common.html
本节内容
通过前面两个实战,我们实现了简单的MAVLink消息的接收与发送,然而实际运行时收发应该是一体的,为了更好的工程化应用,接下来整合接收与发送代码,合并为一个类!
所有代码都在gitee上:https://gitee.com/wujinDrone/mavlink_tutorial#https://www.yuque.com/u21969096/cez3q8
MAVAPI类框架
为了工程化应用,我们希望将MAVLink通信相关封装为一个类,提供API接口供其他人使用。首先我们先构思一下框架,新建一个test_drone_udp.py文件,其主要内容有:
- 初始化中实例化底层通信类和MAVLink类作为成员变量,这样才能够通过该成员变量进行消息收发;
- 需要有一个循环,一直接收并解析消息;
- 最好有一个循环,一直发送心跳包;
- 其他必要的功能函数,如发送无人机解锁/上锁指令、起飞指令的实现,以便作为API接口函数供外部调用。
这样,一个最简单的代码框架如下:
#!/usr/bin/env python3
import time
import socket
from mavlink import *
class DeviceUdp(object):
def __init__(self, local_addr, target_addr=None):
pass
def write(self, buf):
pass
def read(self, size):
pass
class MAVAPI(object):
"""
"""
def __init__(self, local_addr, target_addr):
# 必要的成员变量
self.sys_id = 254
self.cmp_id = 1
# 实例化UDP底层通信类
self.dev = DeviceUdp(local_addr, target_addr)
# 实例化MAVLink通信类
self.mav = MAVLink(self.dev, self.sys_id, self.cmp_id)
def th_receive_fun(self):
"""
一直接收数据并解析
"""
while True:
pass
def th_send_heartbeat_fun(self):
"""
以一定频率发送心跳包
"""
while True:
print("Send Heartbeat")
time.sleep(1)
def arm_disarm(self):
"""
发送解锁上锁指令
"""
def takeoff(self):
"""
发送起飞指令
"""
pass
def exit(self):
"""
退出,关闭UDP通信
"""
self.mav.close()其中DeviceUDP类前面实战中已经实现,代码如下:
class DeviceUdp(object):
"""
"""
def __init__(self, local_addr, target_addr=None):
"""
local_port: local port to receive data
For example, target_addr = ('192.168.4.1', 18750)
"""
self.dev = None
self.local_addr = local_addr
self.target_addr = target_addr
self.dev = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.dev.bind(self.local_addr)
def write(self, buf):
send_callback = self.dev.sendto(buf, self.target_addr)
def read(self, size):
buf, source_addr = self.dev.recvfrom(size)
return buf
def close(self):
try:
self.dev.close()
except Exception as e:
print(e)通过线程实现循环
消息接收和解析循环
通过启动线程,注册回调函数,在函数中通过while循环实现消息的解析!
class MAVAPI(object):
"""
"""
def __init__(self, local_addr, target_addr):
# 各类线程实例,并开启线程
self.th_msg_receive = threading.Thread(daemon=True, target=self.th_receive_fun)
self.th_msg_receive.start()
def th_receive_fun(self):
"""
线程回调函数:一直接收数据并解析
"""
while True:
data = self.dev.read(256)
if len(data)>0:
# 收到数据,解析mavlink消息
try:
self.cur_mav_msg = self.mav.parse_char(data)
except Exception as e:
continue
if isinstance(self.cur_mav_msg, MAVLink_message):
pass
if isinstance(self.cur_mav_msg, MAVLink_heartbeat_message):
print(self.cur_mav_msg)
print("INFO: sys={}, cmp={}".format(self.cur_mav_msg.get_srcSystem(), self.cur_mav_msg.get_srcComponent()))
print("INFO: type={}, autopilot={}".format(self.cur_mav_msg.type, self.cur_mav_msg.autopilot))
if isinstance(self.cur_mav_msg, MAVLink_attitude_message):
print("INFO: roll={:.3f}, pitch={:.3f}, yaw={:.3f}".format(self.cur_mav_msg.roll, self.cur_mav_msg.pitch, self.cur_mav_msg.yaw))
self.roll_NED = self.cur_mav_msg.roll
self.pitch_NED = self.cur_mav_msg.pitch
self.yaw_NED = self.cur_mav_msg.yaw心跳包发送循环
class MAVAPI(object):
"""
"""
def __init__(self, local_addr, target_addr):
# 各类线程实例,并开启线程
self.th_msg_receive = threading.Thread(daemon=True, target=self.th_receive_fun)
self.th_send_heartbeat = threading.Thread(daemon=True, target=self.th_send_heartbeat_fun)
self.th_msg_receive.start()
def th_send_heartbeat_fun(self):
"""
线程回调函数:以一定频率发送心跳包
"""
while True:
### send as a onboard computer or GCS
#msg = MAVLink_heartbeat_message(type=MAV_TYPE_GCS, autopilot=MAV_AUTOPILOT_INVALID, base_mode=0, custom_mode=0,
# system_status=MAV_STATE_ACTIVE, mavlink_version=3)
### send as a drone simulator
msg = MAVLink_heartbeat_message(type=MAV_TYPE_QUADROTOR, autopilot=MAV_AUTOPILOT_PX4, base_mode=MAV_MODE_PREFLIGHT, custom_mode=0x60000,
system_status=MAV_STATE_ACTIVE, mavlink_version=3)
self.mav.send(msg)
time.sleep(1)发送控制指令
解锁/上锁
class MAVAPI(object):
"""
"""
def __init__(self, local_addr, target_addr):
# 必要的成员变量
self.tgt_system_id = 1
self.tgt_component_id = 1
def arm_disarm(self, arm):
"""
arm:
- False: disarm
- True: arm
"""
arm = 1 if arm else 0
msg_cmd = MAVLink_command_long_message(self.tgt_system_id, self.tgt_component_id, MAV_CMD_COMPONENT_ARM_DISARM, 0, arm, 0, 0, 0, 0, 0, 0)
self.mav.send(msg_cmd)起飞/降落
class MAVAPI(object):
"""
"""
def takeoff_land(self, cmd='takeoff'):
"""
发送起飞指令,通过MAVLink SET_MODE(#11)消息实现原地起飞
"""
base_mode = 129
if cmd == 'takeoff':
custom_mode = 0x2040000
elif cmd == 'land':
custom_mode = 0x6040000
msg = MAVLink_set_mode_message(self.tgt_system_id, base_mode=base_mode, custom_mode=custom_mode)
self.mav.send(msg)测试
与GCS进行通信测试
启动QGC
如果QGC软件放在桌面上,通过如下命令启动
$ cd Desktop
$ ./QGroundControl.AppImage 注意:如果QGC存放路径不是Desktop,则需要进入具体路径后启动即可。
启动UDP测试程序
添加一段测试代码:
def test_gcs():
local_addr = ('', 14540)
target_addr = ('127.0.0.1', 14550)
mavapi = MAVAPI(local_addr, target_addr)
if __name__ == '__main__':
test_gcs()
while True:
time.sleep(1)运行:
$ python3 test_drone_udp.py会看到UDP端会一直打印QGC发送的相关信息,进入QGC->Analyze Tools->MAVLink Inspector,可以看到UDP发送的heartbeat消息!
PX4 Simulator的通信测试
启动PX4 Simulator
这里我们使用PX4 gazebo仿真,需要下载PX4仓库(PX4-Autopilot)源码,具体配置方法可以参考官网:
进入PX4-Autopilot,运行如下命令启动:
$ make px4_sitl_default gazebo启动UDP测试程序
首先添加一段UDP测试程序,主要是调用mavapi中的解锁、起飞等控制接口,实现对无人机控制,如下:
def test_px4_sim():
local_addr = ('', 14540)
target_addr = ('127.0.0.1', 14580)
mavapi = MAVAPI(local_addr, target_addr)
time.sleep(3)
mavapi.arm_disarm(True)
time.sleep(2)
mavapi.takeoff_land('takeoff')
time.sleep(5)
mavapi.takeoff_land('land')
if __name__ == '__main__':
test_px4_sim() 注意这里用到的端口号!如果端口设置错误,无法与仿真器通信。
运行:
$ python3 test_drone_udp.py会看到UDP端一直打印无人机姿态信息,gazebo仿真环境中无人机能够解锁、起飞、降落、上锁! |
|