Categories
Python XPlane

Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot

Continuing from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I have a bit of momentum going on the Python X-Plane Autopilot stuff. There were a couple of items I wanted to complete before declaring the project “done”. The first is a “takeoff” button, which isn’t yet done. The other is the ability to fly along a track. That is now complete as of last night.

It is one thing to fly a bearing from A to B. That works fine as long as there is no wind in any direction. Flying a heading set by bearing is easy, and is part of the heading select & hold feature built out in a previous iteration of the code. To do so requires a “desired heading” and a heading error PID. The goal is to minimize the heading error, so we set the setpoint to 0. This controls a “roll” PID controller, which controls an aileron PID controller.

Each have limits in place to prevent excessive movement. For example, the roll PID controller is limited to +/- 30 degrees. Pitch is +/- 15 degrees.

To take this to the next step requires a few things:

  • A “track”, which is commonly defined as a start point and an end point. Both are simply lat/lon coordinate sets.
  • A current location, which is current lat/lon
  • A cross track distance (error), which is the distance the current location is off the track.
  • More PID loops, namely a cross track distance PID control, which, like the heading error PID, has a setpoint of 0 (i.e. the goal is to minimize the cross track distance).

Additionally, to make something actually useful, we need a “database” of navigation points. I parsed the fixed-width delimited text files of X-Plane for this, which was not fun.

To tie it all together, the web interface needs a way to type in a nav point, and a Direct To (D->To) button. Direct to is common in aviation GPS units to set a track from the location when the button is pushed to some point (VOR, fix, airport, etc). I’ve emulated that functionality.

Here’s the screenshot showing the example aircraft navigating to DVV, which is the KDEN VOR, from somewhere near KBJC. It shows a cross track error of 0.056 km, or 56 meters. ChatGPT helpfully generated the cross track error function with a resultant number in meters. I am comfortable with many kinds of units so I’ll leave this for now. The red line on the right map view is the aircraft’s interpretation of the direct to set at the same time as I clicked my autopilot’s Direct To button. There is a 4 kt wind coming from 029. I tested with greater, somewhat constant crosswinds in the 40-50 kt range with gusts of +/- 5 kts.

screenshot showing python autopilot code controlling xplane, flying aircraft along a track

The cross track error settles down to < 10 m after a minute or so. It is a little “lazy”. If it is on a track that is due east, and I flip the track to due west, it’ll dutifully do the 180, then attempt to rejoin the track but it overshoots a bit and settles down after ~1 oscillation. I could probably turn up the P on the xte PID and that would help. Below is a track of tacking off from KBJC and the doing direct to DVV. The X is where I clicked Direct to back to the BJC VOR, it turned left and rejoined the track, overshooting, then settling back in nicely.

plot showing the lat/lon track of the aircraft doing almost a complete 180 to rejoin the track going the opposite direction with python autopilot code

The Python Autopilot Code

I’m not going to pretend I wrote the cross track distance code, nor will I pretend to understand it. It works. The sign of the result depends on something along the lines of which side of the great circle line you are on. Luckily, aircraft (and boats and other things that follow tracks) don’t typically go from B to A. They go from A to B so this is consistent no matter which direction the track is facing. If they do need to go back to the start, the start becomes the end, if that makes sense.

This is the glorious cross track distance code along with some test code. Using Google Earth, the distance from the KBJC control tower to the centerline of 30R/12L should be ~0.44 km.

def cross_track_distance(point, start, end):
    # Convert all latitudes and longitudes from degrees to radians
    point_lat, point_lon = math.radians(point[0]), math.radians(point[1])
    start_lat, start_lon = math.radians(start[0]), math.radians(start[1])
    end_lat, end_lon = math.radians(end[0]), math.radians(end[1])

    # Calculate the angular distance from start to point
    # Ensure the argument is within the domain of acos
    acos_argument = math.sin(start_lat) * math.sin(point_lat) + math.cos(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon)
    acos_argument = max(-1, min(1, acos_argument))  # Clamp the argument between -1 and 1
    delta_sigma = math.acos(acos_argument)

    # Calculate the bearing from start to point and start to end
    theta_point = math.atan2(math.sin(point_lon - start_lon) * math.cos(point_lat),
                             math.cos(start_lat) * math.sin(point_lat) - math.sin(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon))
    theta_end = math.atan2(math.sin(end_lon - start_lon) * math.cos(end_lat),
                           math.cos(start_lat) * math.sin(end_lat) - math.sin(start_lat) * math.cos(end_lat) * math.cos(end_lon - start_lon))

    # Calculate the cross track distance
    cross_track_dist = math.asin(math.sin(delta_sigma) * math.sin(theta_point - theta_end))

    # Convert cross track distance to kilometers by multiplying by the Earth's radius (6371 km)
    cross_track_dist = cross_track_dist * 6371

    return cross_track_dist

kbjc_runways = {
	"30R/12L": {
		"Runway 12L": {
			"Latitude": 39.91529286666667,
			"Longitude": -105.12841313333334
		},
		"Runway 30R": {
			"Latitude": 39.901373883333335,
			"Longitude": -105.10191808333333
		}
	}
}


kbjc_runway_30R_start = (kbjc_runways["30R/12L"]["Runway 30R"]["Latitude"], kbjc_runways["30R/12L"]["Runway 30R"]["Longitude"])
kbjc_runway_30R_end = (kbjc_runways["30R/12L"]["Runway 12L"]["Latitude"], kbjc_runways["30R/12L"]["Runway 12L"]["Longitude"])
kbjc_tower = (test_locations["kbjc_tower"]["lat"], test_locations["kbjc_tower"]["lon"])
def test_cross_track_distance():
	print(f"start lat: {kbjc_runway_30R_start[0]}, start lon: {kbjc_runway_30R_start[1]}")
	print(f"end lat: {kbjc_runway_30R_end[0]}, end lon: {kbjc_runway_30R_end[1]}")
	print(f"tower lat: {kbjc_tower[0]}, tower lon: {kbjc_tower[1]}")

	dist = cross_track_distance(kbjc_tower, kbjc_runway_30R_start, kbjc_runway_30R_end)
	print(f"cross track distance: {dist}")

test_cross_track_distance()

And the rest of the magic happens in this block. If you recall from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I am using Redis as a store to hold the setpoints from the web app controlling the autopilot. It is fast enough that I don’t need to worry about latency when running at 10 Hz (the loop durations are consistently less than 30 milliseconds, with the bulk of that time being used to get and set data from X-Plane itself).

# get the setpoints from redis
			setpoints = get_setpoints_from_redis()

			# check if we have just changed to direct-to mode and if so, update the direct to coords. same if the target waypoint has changed
			if (setpoints["hdg_mode"] == "d_to" and previous_nav_mode != "d_to") or (setpoints["target_wpt"] != previous_nav_target):
				print("reason for entering this block")
				print(f"previous nav mode: {previous_nav_mode}, setpoints hdg mode: {setpoints['hdg_mode']}, previous nav target: {previous_nav_target}, setpoints target wpt: {setpoints['target_wpt']}")
				
				# d_to_start_coords is the current position, in lat,lon tuple
				d_to_start_coords = (posi[0], posi[1])

				# this function does a lookup in the nav_points dataframe to get the lat, lon of the target waypoint
				# it could certainly be optimized to use something faster than a pandas dataframe
				d_to_target_coords = get_nav_point_lat_lon(setpoints["target_wpt"])

				# reset xte PID
				xte_PID.clear()
				print(f"setting d_to_start_coords to {d_to_start_coords}")

			# these are unchanged
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			if setpoints["hdg_mode"] == "hdg":
				# if we're in heading mode, just use the desired heading. this is mostly unchanged from the previous iteration
				desired_hdg = setpoints["desired_hdg"]
				heading_error = get_angle_difference(desired_hdg, current_hdg)
				heading_error_PID.update(heading_error)
				
			elif setpoints["hdg_mode"] == "d_to":
				# if we're in direct-to mode, calculate the cross-track error and update the xte_PID.
				# I am using xte to mean cross-track error/distance
				xte = cross_track_distance((posi[0], posi[1]), d_to_start_coords, d_to_target_coords)
				xte_PID.update(xte)

				# calculate the heading correction based on the xte_PID output
				heading_correction = xte_PID.output

				# this is essentially saying for 1 km of cross-track error, we want to correct by 30 degrees
				heading_correction = heading_correction * 30

				# limit the heading correction to -45 to 45 degrees
				heading_correction = normalize(heading_correction, -45, 45)

				# calculate the track heading to the target waypoint. the track heading is the heading we would
				# need to fly to get to the target waypoint from the current position. it is used as an initial heading
				track_heading = get_bearing((posi[0], posi[1]), d_to_target_coords)

				# adjust the desired heading by the heading correction
				adjusted_desired_hdg = track_heading + heading_correction

				# make sure the adjusted desired heading is between 0 and 360
				adjusted_desired_hdg = adjusted_desired_hdg % 360

				# calculate the heading error based on the adjusted desired heading, this is no different than the hdg mode
				adjusted_heading_error = get_angle_difference(adjusted_desired_hdg, current_hdg)
				heading_error_PID.update(adjusted_heading_error)

				# log the current values
				print(f"track hdg: {track_heading:.1f}, heading corr: {heading_correction:.1f}, adj desired hdg: {adjusted_desired_hdg:.1f}, adj heading err: {adjusted_heading_error:.1f}")

				# write to a log file so we can make nice plots for the blog post
				log_line = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]},{posi[0]},{posi[1]},{posi[2]},{xte},{xte_PID.output},{track_heading},{heading_correction},{adjusted_desired_hdg},{adjusted_heading_error}"
				with open(current_run_log_filename, "a") as log_file:
					log_file.write(log_line + "\n")

Getting nav data from X-Plane data files

If you looked at the code closely, you will see the d_to_target_coords is set via a function called get_nav_point_lat_lon(nav_point). This looks up lat/lon in a file that was generated by parsing the X-Plane navigation data. In my previous job, I dealt with fixed width data formats. It is not fun. I originally tried to split based on spaces, but some of the nav point names have more than one space in them. I suppose I could just ignore the name but this is already written. This code parses the earth_nav.dat file, specifically for type 3, which is VOR/DME-like.

import pandas as pd

nav_filepath = r"C:\Users\Austin\Desktop\X-Plane 10\Resources\default data\earth_nav.dat"

raw_file_data = open(nav_filepath, 'r').readlines()

# remove first 3 lines
raw_file_data = raw_file_data[3:]

# remove last line
raw_file_data = raw_file_data[:-1]

# remove new line characters
raw_file_data = [line.replace('\n', '') for line in raw_file_data]

# Adjusting the function based on the new column map provided
def parse_nav_info(line):
    column_map = {
        'type': {'start': 0, 'end': 1},
        'lat_sign': {'start': 2, 'end': 3},
        'latitude': {'start': 3, 'end': 15},
        'lon_sign': {'start': 15, 'end': 16},
        'longitude': {'start': 16, 'end': 28},
        'elevation': {'start': 29, 'end': 35},
        'frequency': {'start': 36, 'end': 41},
        'range': {'start': 42, 'end': 45},
        'unknown': {'start': 46, 'end': 52},
        'identifier': {'start': 53, 'end': 56},
        'name': {'start': 56}  # Assuming end is not needed; take till the end of the line
    }

    nav_info = {}
    for column, column_info in column_map.items():
        start = column_info['start']
        end = column_info.get('end', None)
        value = line[start:end].strip()
        # print(f"attempting to parse {column} with value {value}")
        if column == 'latitude':
            lat_sign = line[column_map['lat_sign']['start']:column_map['lat_sign']['end']]
            lat_sign = -1 if lat_sign == '-' else 1
            value = lat_sign * float(value)
        elif column == 'longitude':
            lon_sign = line[column_map['lon_sign']['start']:column_map['lon_sign']['end']]
            lon_sign = -1 if lon_sign == '-' else 1
            value = lon_sign * float(value)
        elif column == 'elevation':
            value = int(value)
        elif column == 'frequency':
            value = int(value)
        elif column == 'range':
            value = int(value)
        nav_info[column] = value

    return nav_info

i = 0
data = []
types = []
for line in raw_file_data:
    line_type = int(line[0:2])
    if line_type != 3:
        continue

    line_data = parse_nav_info(line)
    data.append(line_data)

df = pd.DataFrame(data)
columns_of_interest = ['identifier','latitude','longitude','elevation', 'frequency', 'range', 'name']
df = df[columns_of_interest]
df.head()

df.to_pickle('nav_data.pkl')

The code to read the file and import is at the beginning of the python_autopilot.py file and is fairly straightforward:

# nav_data.pkl is a pandas dataframe. yes, this should use a dict or something faster.
nav_points = pickle.load(open("nav_data.pkl", "rb"))

def get_nav_point_lat_lon(id):
	nav_point = nav_points[nav_points["identifier"] == id]
	return nav_point["latitude"].values[0], nav_point["longitude"].values[0]

And for the Flask side of the house, we have index.html:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    function submitDirectTo() {
        const stationId = document.getElementById('target_wpt_input').value; // Grab the value from the input
        if (stationId) { // Check if the stationId is not empty
            adjustSetpoint('target_wpt', stationId); // Adjust the setpoint with the stationId as the value
            adjustSetpoint('hdg_mode', "d_to"); // Your existing function call
        } else {
            alert("Please enter a station ID.");
        }
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>
<style>
    body {
        font-family: Arial, sans-serif;
        margin: 20px;
        background-color: #f4f4f4;
        color: #333;
    }
    h1 {
        color: #005288;
    }
    ul {
        list-style-type: none;
        padding: 0;
    }
    ul li {
        margin: 10px 0;
    }
    button, input[type="text"] {
        padding: 10px;
        margin-top: 5px;
        border: 1px solid #ccc;
        border-radius: 5px;
        cursor: pointer;
        font-size: 16px;
    }
    button:hover {
        background-color: #ddd;
    }
    .button-group {
        margin-bottom: 20px;
    }
    #target_wpt_input {
        margin-right: 10px;
    }
</style>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
        <li>Heading Mode: <span id="hdg_mode">0</span></li>
        <li>Target Waypoint: <span id="target_wpt">BJC</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">OFF</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
        <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
        <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
        <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('hdg_mode', 'hdg')">Follow Heading</button>
        <input type="text" id="target_wpt_input" value="BJC">
        <button onclick="submitDirectTo()">Direct To</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
        <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>
    </div>

</body>
</html>

And the Flask app itself. I still think WebSockets are magic.

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_raw_value = r.get(label) if r.exists(label) else None
    if current_raw_value is not None:
        try:
            current_value = float(current_raw_value)
        except ValueError:
            current_value = current_raw_value

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    elif label == 'hdg_mode':
        new_value = adjustment
    elif label == 'target_wpt':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And here’s the full code of the autopilot itself. This will be transferred to GitHub for the next post. It is a bit long and needs to be split out into a number of separate files.

Conclusion

With a cross track distance known, it isn’t terribly difficult to convert that distance (error) into a heading adjustment. We now have a functioning autopilot that can control our aircraft to any VOR-like point. I could extend the X-Plane nav data parsing to read all points, but I’ll leave that as an exercise for the reader. The X-Plane Python Autopilot is almost complete – all that I have left on the checklist is a “takeoff” button. Hope you enjoyed the post!

References

This page was pretty helpful for realizing the code for determining the cross track distance would be complicated – http://www.movable-type.co.uk/scripts/latlong-vincenty.html. A good bit of the code was generated by ChatGPT as well.

Categories
Python XPlane

Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

I revisited my Python X-Plane autopilot a few weeks ago because it was pretty clunky for how to adjust setpoints and such. The job I started 1.5 years ago is exclusively Python, so I wanted to redo a bit.

Quick aside: For the new PC I just built – Ryzen 9 7900x, 2x32GB 6000 MHz, etc, X-Plane 10 was the 2nd “game” I installed on it. The first was Factorio (I followed Nilaus’ megabase in a book and have got to 5k SPM). Haven’t tried the newer sims yet, but I think they’ll still be somewhat limited by my RTX 2080 Super.

Well imagine my surprise when I woke up to 6x the normal daily hits by 7am. I checked the weblogs and found that my post was trending on ycombinator.com (Hacker News). So I am going to skip pretty much all background and just post the updated code for now, and will go back and clean up this post at some point.

Without further ado: here’s what the super basic dashboard looks like

Screenshot showing HTML autopilot interface running x-plane via Python

I have it separated into two main running python programs, the file that interacts with X-Plane itself, and the Flask part.

Check here for the updated code / next iteration where I add track following and direct-to functionality – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot.

Here’s the adjusted autopilot code to check with Redis for the setpoints every loop execution:

# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID

import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints = {
	"desired_roll": 0,
	"desired_pitch": 2,
	"desired_speed": 160,
	"desired_alt": 8000.0,
	"desired_hdg": 140,
	"autopilot_enabled": 0
}

for key in setpoints:
	# if the key exists in the redis db, use it
	# otherwise, set it
	if r.exists(key):
		setpoints[key] = float(r.get(key))
	else:
		r.set(key, setpoints[key])

update_interval = 0.10 #seconds
update_frequency = 1/update_interval

P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0

roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]

pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]

altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]

speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]

heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def normalize(value, min=-1, max=1):
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

def sleep_until_next_tick(update_frequency):
    # Calculate the update interval from the frequency
    update_interval = 1.0 / update_frequency

    # Get the current time
    current_time = time.time()

    # Calculate the time remaining until the next tick
    sleep_time = update_interval - (current_time % update_interval)

    # Sleep for the remaining time
    time.sleep(sleep_time)
	
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
	r = (b2 - b1) % 360.0
	# Python modulus has same sign as divisor, which is positive here,
	# so no need to consider negative case
	if r >= 180.0:
		r -= 360.0
	return r

# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
    """
    Calculates the bearing between two points.
    The formulae used is the following:
        θ = atan2(sin(Δlong).cos(lat2),
                  cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
    :Parameters:
      - `pointA: The tuple representing the latitude/longitude for the
        first point. Latitude and longitude must be in decimal degrees
      - `pointB: The tuple representing the latitude/longitude for the
        second point. Latitude and longitude must be in decimal degrees
    :Returns:
      The bearing in degrees
    :Returns Type:
      float
    """
    if (type(pointA) != tuple) or (type(pointB) != tuple):
        raise TypeError("Only tuples are supported as arguments")

    lat1 = math.radians(pointA[0])
    lat2 = math.radians(pointB[0])

    diffLong = math.radians(pointB[1] - pointA[1])

    x = math.sin(diffLong) * math.cos(lat2)
    y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
            * math.cos(lat2) * math.cos(diffLong))

    initial_bearing = math.atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = math.degrees(initial_bearing)
    compass_bearing = (initial_bearing + 360) % 360

    return compass_bearing

# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
    R = 6372800  # Earth radius in meters
    lat1, lon1 = coord1
    lat2, lon2 = coord2
    
    phi1, phi2 = math.radians(lat1), math.radians(lat2) 
    dphi       = math.radians(lat2 - lat1)
    dlambda    = math.radians(lon2 - lon1)
    
    a = math.sin(dphi/2)**2 + \
        math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    
    return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))

KBJC_lat = 39.9088056
KBJC_lon = -105.1171944

def write_position_to_redis(position):
	# position is a list of 7 floats
	# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
	position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
	position_str = ','.join([str(x) for x in position])
	r.set('position', position_str)
	for i in range(len(position_elements)):
		r.set(f"position/{position_elements[i]}", position[i])
	
	# position_str = ','.join([str(x) for x in position])
	# r.publish('position_updates', position_str)
		
def get_setpoints_from_redis():
	setpoints = {
		"desired_roll": 0,
		"desired_pitch": 2,
		"desired_speed": 160,
		"desired_alt": 8000.0,
		"desired_hdg": 140
	}
	for key in setpoints:
		# if the key exists in the redis db, use it
		# otherwise, set it
		if r.exists(key):
			setpoints[key] = float(r.get(key))
		else:
			r.set(key, setpoints[key])
	return setpoints

def get_autopilot_enabled_from_redis():
	if r.exists("autopilot_enabled"):
		return int(r.get("autopilot_enabled").decode('utf-8')) == 1

ele_positions = []
ail_positions = []
thr_positions = []

def update_control_position_history(ctrl):
	ele_positions.append(ctrl[0])
	ail_positions.append(ctrl[1])
	thr_positions.append(ctrl[3])

	# if the list is longer than 20, pop the first element
	if len(ele_positions) > 20:
		ele_positions.pop(0)
		ail_positions.pop(0)
		thr_positions.pop(0)

def monitor():
	with xpc.XPlaneConnect() as client:
		while True:
			loop_start = datetime.now()
			print(f"loop start - {loop_start}")
			posi = client.getPOSI()
			write_position_to_redis(posi)

			ctrl = client.getCTRL()

			bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			#desired_hdg = 116 #bearing_to_kbjc

			multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2

			current_roll = posi[4]
			current_pitch = posi[3]
			#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
			current_hdg = multi_DREFs[1][0]
			current_altitude = multi_DREFs[3][0]
			current_asi = multi_DREFs[0][0]
			onground = multi_DREFs[2][0]

			# get the setpoints from redis
			setpoints = get_setpoints_from_redis()
			desired_hdg = setpoints["desired_hdg"]
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			# outer loops first
			altitude_PID.SetPoint = desired_alt
			altitude_PID.update(current_altitude)

			heading_error = get_angle_difference(desired_hdg, current_hdg)
			heading_error_PID.update(heading_error)

			speed_PID.SetPoint = desired_speed
			

			new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
			new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
			# if new_pitch_from_altitude > 15:
			# 	new_pitch_from_altitude = 15
			# elif new_pitch_from_altitude < -15:
			# 	new_pitch_from_altitude = -15
			
			pitch_PID.SetPoint = new_pitch_from_altitude
			roll_PID.SetPoint = new_roll_from_heading_error

			roll_PID.update(current_roll)
			speed_PID.update(current_asi)
			pitch_PID.update(current_pitch)

			new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
			new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
			new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)

			previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
			previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
			previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0

			# not currently functional - need to work on this 
			# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency

			# update the control positions
			# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
			update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))

			onground = -1
			if onground == 1:
				print("on ground, not sending controls")
			else:
				if get_autopilot_enabled_from_redis():
					# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
					ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
					client.sendCTRL(ctrl)

			loop_end = datetime.now()
			loop_duration = loop_end - loop_start

			output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f},    hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
			output = output + "\n" + f"hdg error:                 {heading_error: 0.3f}"
			output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f},    ele: {new_ele_ctrl: 0.4f},   thr: {new_thr_ctrl:0.4f}"
			output = output + "\n" + f"PID outputs --   altitude: {altitude_PID.output: 0.4f},  pitch: {pitch_PID.output: 0.4f},   ail: {roll_PID.output: 0.3f},  hdg: {heading_error_PID.output: 0.3f}"
			output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
			output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
			print(output)
			sleep_until_next_tick(update_frequency)
			os.system('cls' if os.name == 'nt' else 'clear')



if __name__ == "__main__":
	monitor()

And the flask backend/front end. WebSockets are super cool – never used them before this. I was thinking I’d have to make a bunch of endpoints for every type of autopilot change I need. But this handles it far nicer:

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_value = float(r.get(label)) if r.exists(label) else 0.0

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And the http template:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">0</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
    <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    <br>
    <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
    <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    <br>
    <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
    <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    <br>
    <br>
    <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
    <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>

</body>
</html>

This should be enough to get you going. I’ll come back and clean it up later (both my kids just woke up – 1.5 and 3.5 years!)

Categories
Python XPlane

Coding a pitch/roll/altitude autopilot in X-Plane with Python

Introduction

Sorry it has taken me so long to write this post! The last post on the blog was October 19th – almost 6 weeks ago. Life happens. We have a 15 month old running around and she is a handful!

March 2024 update – we now have a 1.5 year old AND the 15 month old is 3.5 years old! Link to the next post in this series – Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

Anyways, back to the current topic – coding a pitch/roll (2 axis) autopilot in X-Plane with Python with altitude and heading hold. Today we will be adding the following:

  • Real-time graphing for 6 parameters
  • Additional method to grab data out of X-Plane
  • A normalize function to limit outputs to reasonable values
  • Altitude preselect and hold function

The full code will be at the end of this post.

Video Link

coming soon

Contents

  1. Adding PyQtGraph
  2. Developing a normalize function
  3. Initializing the data structures to hold the graph values
  4. Defining the PyQtGraph window and parameters
  5. Getting more data points out of X-Plane
  6. Feeding the graph data structures with data
  7. Adding altitude preselect and hold

1 – Adding PyQtGraph

Pip is my preferred tool to manage Python packages. It is easy and works well. I initially tried graphing with MatPlotLib but it took 200-300ms to update, which really dragged down the control loop to the point of being unusable. Instead, we will be using PyQtGraph. Install it with Pip:

pip install pyqtgraph

2 – Developing a normalize function

This task is pretty straightforward. There are a couple places where we want to pass values that need to be within a certain range. The first example is to the client.sendCTRL() method to set the control surfaces in X-Plane. The documentation states values are expected to be from -1 to 1. I have got some really weird results sending values outside that range (specifically for throttle, if you send something like 4, you can end up with 400% throttle which is wayyy more than the engines can actually output).

# this function takes an input and either passes it through or adjusts
# the input to fit within the specified max/min values
def normalize(value, min=-1, max=1):
	# if value = 700, and max = 20, return 20
	# if value = -200, and min = -20, return -20
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

3 – Initializing the graphing data structures

We need a couple of arrays to store the data for our graphs. We need (desired data to plot) + 1 arrays. The +1 is the x-axis, which will just store values like 0,1,2,3,etc. The others will be the y-values. We haven’t added the altitude stuff yet, so you can add them but they won’t be used yet.

x_axis_counters = [] #0, 1, 2, 3, etc. just basic x-axis values used for plotting
roll_history = []
pitch_history = []
#altitude_history = []
roll_setpoint_history = []
pitch_setpoint_history = []
#altitude_setpoint_history = []
plot_array_max_length = 100 # how many data points to hold in our arrays and graph
i = 1 # initialize x_axis_counter

4 – Defining the PyQtGraph window and parameters

Working with PyQtGraph more or less means we’ll be working with a full blown GUI (just stripped down).

# first the base app needs to be instantiated
app = pg.mkQApp("python xplane autopilot monitor")

# now the window itself is defined and sized
win = pg.GraphicsLayoutWidget(show=True)
win.resize(1000,600) #pixels
win.setWindowTitle("XPlane autopilot system control")

# we have 3 subplots
p1 = win.addPlot(title="roll",row=0,col=0)
p2 = win.addPlot(title="pitch",row=1,col=0)
p3 = win.addPlot(title="altitude", row=2, col=0)

# show the y grid lines to make it easier to interpret the graphs
p1.showGrid(y=True)
p2.showGrid(y=True)
p3.showGrid(y=True)

5 – Getting more data points out of X-Plane

The initial .getPOSI() method that came in the example has worked well for us so far. But at this point we need more data that isn’t available in the .getPOSI() method. We will be utilizing a different method called .getDREFs() which is short for ‘get data references’. We will need to construct a list of data references we want to retrieve, pass that list to the method, and then parse the output. It is more granular than .getPOSI(). I haven’t evaluated the performance but I don’t think it is a problem.

The DREFs we want are for indicated airspeed, magnetic heading (.getPOSI() has true heading, not magnetic), an indicator to show if we are on the ground or not, and height as understood by the flight model. Thus, we can define our DREFs as follows:

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

And we can get the data with client.getDREFs(DREFs). The returned object is a 2d array. We need to parse out our values of interest. The full data gathering code looks like this:

posi = client.getPOSI();
ctrl = client.getCTRL();
multi_DREFs = client.getDREFs(DREFs)

current_roll = posi[4]
current_pitch = posi[3]
current_hdg = multi_DREFs[1][0]
current_altitude = multi_DREFs[3][0]
current_asi = multi_DREFs[0][0]
onground = multi_DREFs[2][0]

With those data points, we have everything we need to start plotting the state of our aircraft and monitoring for PID tuning.

6 – Feeding the real-time graphs with data

Next up is actually adding data to be plotted. There are two scenarios to consider when adding data to the arrays: 1) the arrays have not yet reached the limit we set earlier (100 points), and 2) they have. Case 1 is easy. We just append the current values to the arrays:

x_axis_counters.append(i)
roll_history.append(current_roll)
roll_setpoint_history.append(desired_roll)
pitch_history.append(current_pitch)
pitch_setpoint_history.append(pitch_PID.SetPoint)
altitude_history.append(current_altitude)
altitude_setpoint_history.append(desired_altitude)

The above code will work perfectly fine if you want the arrays to grow infinitely large over time. Ain’t nobody got time for that so we need to check how long the arrays are and delete data. We’ll check the length of the x-axis array as a proxy for all the others and use that to determine what to do. Typing this code that looks very similar over and over again means it’s probably time to abstract it into classes or something else. The more you type something over and over again, the larger indication you have that you need to so something about it. But for now we’ll leave it like this for ease of reading and comprehension.

# if we reach our data limit set point, evict old data and add new.
# this helps keep the graph clean and prevents it from growing infinitely
if(len(x_axis_counters) > plot_array_max_length):
	x_axis_counters.pop(0)
	roll_history.pop(0)
	roll_setpoint_history.pop(0)
	pitch_history.pop(0)
	pitch_setpoint_history.pop(0)
	altitude_history.pop(0)
	altitude_setpoint_history.pop(0)

	x_axis_counters.append(i)
	roll_history.append(current_roll)
	roll_setpoint_history.append(desired_roll)
	pitch_history.append(current_pitch)
	pitch_setpoint_history.append(pitch_PID.SetPoint)
	altitude_history.append(current_altitude)
	altitude_setpoint_history.append(desired_altitude)
# else, just add new. we are not yet at limit.
else:
	x_axis_counters.append(i)
	roll_history.append(current_roll)
	roll_setpoint_history.append(desired_roll)
	pitch_history.append(current_pitch)
	pitch_setpoint_history.append(pitch_PID.SetPoint)
	altitude_history.append(current_altitude)
	altitude_setpoint_history.append(desired_altitude)
i = i + 1

You will notice that there are quite a few entries for altitude. We haven’t done anything with that yet so just set desired_altitude to an integer somewhere in the code so it doesn’t error out.

To complete the graphing portion, we need to actually plot the data. The clear=true in the below lines clears out the plot so we’re not replotting on top of the old data. We also need to process events to actually draw the graph:

# process events means draw the graphs
pg.QtGui.QApplication.processEvents()

# arguments are x values, y values, options
# pen is a different line in the plot
p1.plot(x_axis_counters, roll_history, pen=0, clear=True)
p1.plot(x_axis_counters, roll_setpoint_history, pen=1)

p2.plot(x_axis_counters, pitch_history, pen=0,clear=True)
p2.plot(x_axis_counters, pitch_setpoint_history, pen=1)

p3.plot(x_axis_counters, altitude_history, pen=0,clear=True)
p3.plot(x_axis_counters, altitude_setpoint_history, pen=1)

You can now run the code to see your graph populating with data!

PyQtGraph plotting the aircraft’s roll/pitch and desired roll/pitch

7 – Adding altitude autopilot (preselect and hold)

Ok so now that we have eye candy with the real-time graphs, we can make our autopilot do something useful: go to a selected altitude and hold it.

We already have the roll and pitch PIDs functioning as desired. How do we couple the pitch PID to get to the desired altitude? One cannot directly control altitude. Altitude is controlled via a combination of pitch and airspeed (and time).

We will call the coupled PIDs an inner loop (pitch) and an outer loop (altitude). The outer loop runs and its output will feed the input of the inner loop. The altitude PID will be fed a desired altitude and current altitude. The output will then mostly be the error (desired altitude – current altitude) multiplied by our P setting. Of course I and D will have a say in the output but by and large it will be some proportion of the error.

Let’s start with defining the altitude PID and desired altitude:

altitude_PID = PID.PID(P, I, D)
desired_altitude = 8000
altitude_PID.SetPoint = desired_altitude

With those defined, we now move to the main loop. The outer loop needs to be updated first. From there, we will normalize the output from the altitude PID and use that to set the pitch PID. The pitch PID will also be normalized to keep values in a reasonable range:

# update outer loops first
altitude_PID.update(current_altitude)

# if alt=12000, setpoint = 10000, the error is 2000. if P=0.1, output will be 2000*0.1=200
pitch_PID.SetPoint = normalize(altitude_PID.output, min=-15, max=10)

# update PIDs
roll_PID.update(current_roll)
pitch_PID.update(current_pitch)

# update control outputs
new_ail_ctrl = normalize(roll_PID.output)
new_ele_ctrl = normalize(pitch_PID.output)

Now we just need to send those new control surface commands and we’ll be controlling the plane!

Outputting the control deflections should basically be the last part of the loop. We’ll put it right before the debug output:

# sending actual control values to XPlane
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
client.sendCTRL(ctrl)

Full code of pitch_roll_autopilot_with_graphing.py

import sys
import xpc
import PID
from datetime import datetime, timedelta
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
import time

def normalize(value, min=-1, max=1):
	# if value = 700, and max = 20, return 20
	# if value = -200, and min = -20, return -20
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

update_interval = 0.050 # seconds, 0.05 = 20 Hz
start = datetime.now()
last_update = start

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)
altitude_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2
desired_altitude = 8000

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch
altitude_PID.SetPoint = desired_altitude

x_axis_counters = [] #0, 1, 2, 3, etc. just basic x-axis values used for plotting
roll_history = []
pitch_history = []
altitude_history = []
roll_setpoint_history = []
pitch_setpoint_history = []
altitude_setpoint_history = []
plot_array_max_length = 300 # how many data points to hold in our arrays and graph
i = 1 # initialize x_axis_counter

# first the base app needs to be instantiated
app = pg.mkQApp("python xplane autopilot monitor")

# now the window itself is defined and sized
win = pg.GraphicsLayoutWidget(show=True)
win.resize(1000,600) #pixels
win.setWindowTitle("XPlane autopilot system control")

# we have 3 subplots
p1 = win.addPlot(title="roll",row=0,col=0)
p2 = win.addPlot(title="pitch",row=1,col=0)
p3 = win.addPlot(title="altitude", row=2, col=0)

# show the y grid lines to make it easier to interpret the graphs
p1.showGrid(y=True)
p2.showGrid(y=True)
p3.showGrid(y=True)

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def monitor():
	global i
	global last_update
	with xpc.XPlaneConnect() as client:
		while True:
			if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
				last_update = datetime.now()
				print(f"loop start - {datetime.now()}")

				posi = client.getPOSI();
				ctrl = client.getCTRL();
				multi_DREFs = client.getDREFs(DREFs)

				current_roll = posi[4]
				current_pitch = posi[3]
				current_hdg = multi_DREFs[1][0]
				current_altitude = multi_DREFs[3][0]
				current_asi = multi_DREFs[0][0]
				onground = multi_DREFs[2][0]

				# update the display
				pg.QtGui.QApplication.processEvents()

				# update outer loops first
				altitude_PID.update(current_altitude)

				# if alt=12000, setpoint = 10000, the error is 2000. if P=0.1, output will be 2000*0.1=200
				pitch_PID.SetPoint = normalize(altitude_PID.output, min=-15, max=10)

				# update PIDs
				roll_PID.update(current_roll)
				pitch_PID.update(current_pitch)

				# update control outputs
				new_ail_ctrl = normalize(roll_PID.output)
				new_ele_ctrl = normalize(pitch_PID.output)

				# if we reach our data limit set point, evict old data and add new.
				# this helps keep the graph clean and prevents it from growing infinitely
				if(len(x_axis_counters) > plot_array_max_length):
					x_axis_counters.pop(0)
					roll_history.pop(0)
					roll_setpoint_history.pop(0)
					pitch_history.pop(0)
					pitch_setpoint_history.pop(0)
					altitude_history.pop(0)
					altitude_setpoint_history.pop(0)

					x_axis_counters.append(i)
					roll_history.append(current_roll)
					roll_setpoint_history.append(desired_roll)
					pitch_history.append(current_pitch)
					pitch_setpoint_history.append(pitch_PID.SetPoint)
					altitude_history.append(0)
					altitude_setpoint_history.append(desired_altitude)
				# else, just add new. we are not yet at limit.
				else:
					x_axis_counters.append(i)
					roll_history.append(current_roll)
					roll_setpoint_history.append(desired_roll)
					pitch_history.append(current_pitch)
					pitch_setpoint_history.append(pitch_PID.SetPoint)
					altitude_history.append(0)
					altitude_setpoint_history.append(desired_altitude)
				i = i + 1

				p1.plot(x_axis_counters, roll_history, pen=0, clear=True)
				p1.plot(x_axis_counters, roll_setpoint_history, pen=1)

				p2.plot(x_axis_counters, pitch_history, pen=0,clear=True)
				p2.plot(x_axis_counters, pitch_setpoint_history, pen=1)

				p3.plot(x_axis_counters, altitude_history, pen=0,clear=True)
				p3.plot(x_axis_counters, altitude_setpoint_history, pen=1)

				# sending actual control values to XPlane
				ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
				client.sendCTRL(ctrl)

				output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"
				output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"
				output = output + "\n"
				print(output)

if __name__ == "__main__":
	monitor()

Using the autopilot / Conclusion

To use the autopilot, fire up XPlane, hop in a small-ish plane (gross weight less than 10k lb), take off, climb 1000′, then execute the code. Your plane should bring roll to 0 pretty quick and start the climb/descent to the desired altitude.

X-Plane Python autopilot leveling off at 8000' with pitch/roll/altitude graphed in real-time
X-Plane Python autopilot leveling off at 8000′ with pitch/roll/altitude graphed in real-time
Categories
Python XPlane

Coding a wing leveler autopilot in X-Plane with Python

Introduction

Continuing from the first post, where we hooked up X-Plane to our Python code, we will build a wing leveler today. The first post was just about making sure we could get data into and out of X-Plane. Today will add a few features to our XPlane autopilot written in Python.

  • A control loop timer (we will be targeting a loop frequency of 10 Hz, or 10 updates per second)
  • Additional data feeds from X-Plane
  • Two PID controllers, one for roll, one for pitch
  • Some debugging output to keep track of what the PIDs are doing

The full code will be at the end of this post.

Video Link

Python Tutorial: code a wing leveler in X-Plane using PID loops

Contents

  1. Control loop timer/limiter
  2. Obtaining current pitch/roll values from X-Plane
  3. Initializing the PID controllers
  4. Feeding the PID controllers within the control loop
  5. Controlling the aircraft with the new PID output
  6. Monitoring the control loops via debug prints

1 – Control loop timer/limiter

Since we will be targeting a 10 Hz update rate, we need to develop a method to ensure the loop does not run more frequent than once every 100 milliseconds. We do not want the loop running uninhibited, because that will result in variable loop execution times and we like to keep those things constant. It could potentially execute thousands of times per second, which is entirely unnecessary. Most control loop algorithms run in the 10-100 Hz range (10-100 times per second). For reference, my RC plane running ArduPlane on Pixhawk uses 50 Hz as a standard update frequency.

To accomplish this task, we need to set up some timing variables.

First of all, add an import statment for datetime and timedelta:

from datetime import datetime, timedelta

Next, define the timing variables:

update_interval = 0.100 # this value is in seconds. 1/0.100 = 10 which is the update interval in Hz

# start is set to the time the line of code is executed, which is essential when the program started
start = datetime.now()

# last_update needs to be set to start for the first execution of the loop to successfully run.
# alternatively, we could've set start to something in the past.
last_update = start

# last update needs to be defined as a global within the monitor() function:
def monitor():
	global last_update

That handles the variables. Now we need to limit the loop execution. To do so requires wrapping all of the loop code into a new if statement that evaluates the time and only executes if the current time is 100 milliseconds greater than the last loop execution:

# loop is the "while True:" statement
while True:
	# this if statement is evaluated with every loop execution
	if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
		# when the if statement evaluates to true, the first thing we'll do is set the last update to the current time so the next iteration fires at the correct time
		last_update = datetime.now()

		# rest of the loop code goes here

2 – Obtaining current roll/pitch values from X-Plane

This task is pretty straightforward. In the first post, the monitorExample.py code included obtaining the position with posi = client.getPOSI(). There are 7 elements returned with that method, and two of them are roll and pitch.

Put the below after the .getPOSI() call.

current_roll = posi[4]
current_pitch = posi[3]

3 – Initializing the PID controllers

First you need to get the PID control file from the Ivmech (Ivmech Mechatronics Ltd.) GitHub page. Direct link PID.py file here. Put the file in the same working directory as everything else then import it with import PID at the top with the rest of the imports.

Then we can initialize the control instances. PID controllers are magic. But they do need to be set with smart values for the initial run. The default values with the PID library are P=0.2, I=0, D=0. This essentially means make the output be 20% of the error between the setpoint and the current value. For example, if the aircraft has a roll of 10 degrees to the left (-10), and the P=0.2, the output from the PID controller will be -2.

When setting PID values, it is almost always a good idea to start small and work your way up. If your gains are too high, you could get giant oscillations and other undesirable behaviors. In the YouTube video, I talk about the various types of PID controllers. They will basically always have a P (proportional) set. Many will also have an I (integral) value set, but not many use the D term (derivative).

Going with the “less is more” truism with PID controllers, I started with a P value of 0.1, and an I value of 0.01 (P/10). The I term (integral) is meant to take care of accumulated errors (which are usually long term errors). An example is your car’s cruise control going up a hill. If your cruise control is set to 65 mph, it will hold 65 no problem on flat roads. If you start going up a hill, the controller will slowly apply more throttle. With an I of 0, your car would never get to 65. It would probably stay around 62 or so. With the integrator going, the error will accumulate and boost the output (throttle) to get you back up to your desired 65. In the linked YouTube video, I show what happens when the I value is set to 0 and why it is necessary to correct long-term errors.

These values (P=0.1, I=0.01, D=0) turned out to work perfectly.

Place the following before the monitor function:

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing both PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch

4 – Updating the PID controllers within the control loop

With the PIDs created, they will need to be updated with the new, current pitch and roll values with every loop execution.

Place the following after current_roll and current_pitch are assigned:

roll_PID.update(current_roll)
pitch_PID.update(current_pitch)

5 – Controlling the aircraft with the new PID output

Updating the PID controller instances will generate new outputs. We can use those outputs to set the control surfaces. You can place these two lines directly below the update lines:

new_ail_ctrl = roll_PID.output
new_ele_ctrl = pitch_PID.output

So we have new control surface values – now we need to actually move the control surfaces. This is accomplished by sending an array of 4 floats with .sendCTRL():

# ele, ail, rud, thr. -998 means don't set/change
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998]
client.sendCTRL(ctrl)

6 – Monitoring the control loops via debug prints

The last bit to tie a bunch of this together is printing out values with every loop execution to ensure things are heading the right direction. We will turn these into graphs in the next post or two.

We will be concatenating strings because it’s easy and we aren’t working with enough strings for it to be a performance problem.

In newer Python versions (3.6+), placing ‘f’ before the quotes in a print statement (f””) means the string is interpolated. This means you can basically put code in the print statement, which makes creating print statements much easier and cleaner.

The first line will print out the current roll and pitch value (below). We are using current_roll and current_pitch interpolated. The colon, then blank space with 0.3f is a string formatter. It rounds the value to 3 decimal places and leaves space for a negative. It results in things being lined up quite nicely.

output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"

The next code statement will add a new line to the previous output, and also add the PID outputs for reference:

output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"

The final line will just add another new line to keep each loop execution’s print statements grouped together:

output = output + "\n"

Finally, we print the output with print(output), which will look like this:

loop start - 2021-10-19 14:24:26.208945
current values --    roll:  0.000,  pitch:  1.994
PID outputs    --    roll: -0.000,  pitch:  0.053

Full code of pitch_roll_autopilot.py

import sys
import xpc
import PID
from datetime import datetime, timedelta

update_interval = 0.100 # seconds
start = datetime.now()
last_update = start

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing both PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch

def monitor():
	global last_update
	with xpc.XPlaneConnect() as client:
		while True:
			if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
				last_update = datetime.now()
				print(f"loop start - {datetime.now()}")

				posi = client.getPOSI();
				ctrl = client.getCTRL();

				current_roll = posi[4]
				current_pitch = posi[3]

				roll_PID.update(current_roll)
				pitch_PID.update(current_pitch)

				new_ail_ctrl = roll_PID.output
				new_ele_ctrl = pitch_PID.output

				ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
				client.sendCTRL(ctrl)

				output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"
				output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"
				output = output + "\n"
				print(output)

if __name__ == "__main__":
	monitor()

Using the autopilot / Conclusion

To use the autopilot, fire up XPlane, hop in a small-ish plane (gross weight less than 10k lb), take off, climb 1000′, then execute the code. Your plane should level off within a second or two in both axis.

Here is a screenshot of the output after running for a few seconds:

Screenshot showing current pitch and roll values along with their respective PID outputs

The linked YouTube video shows the aircraft snapping to the desired pitch and roll angles very quickly from a diving turn. The plane is righted to within 0.5 degrees of the setpoints within 2 seconds.

A note about directly setting the control surfaces

If you are familiar with PIDs and/or other parts of what I’ve discussed here, you’ll realize that we could be setting large values for the control surfaces (i.e. greater than 1 or less than -1). We will address that next post with a normalization function. It will quickly become a problem when a pitch of 100+ is commanded for altitude hold. I have found that XPlane will allow throttle values of more than 100% (I’ve seen as high as ~430%) if sent huge throttle values.

Categories
Python XPlane

Creating an autopilot in X-Plane using Python – part 1

Introduction

Today’s post will take us in a slightly different direction than the last few. Today’s post will be about hooking up some Python code to the X-Plane flight simulator to enable development of an autopilot using PID (proportional-integral-derivative) controllers. I’ve been a fan of flight simulators for quite some time (I distinctly remember getting Microsoft Flight Simulator 98 for my birthday when I was like 8 or 9) but have only recently started working with interfacing them to code. X-Plane is a well-known flight simulator developed by another Austin – Austin Meyer. It is regarded as having one of the best flight models and has tons of options for getting data into/out of the simulator. More than one FAA-certified simulator setups are running X-Plane as the primary driver software.

I got started thinking about writing some code for X-Plane while playing another game, Factorio. I drive a little plane or car in the game to get around my base and I just added a plug-in that “snaps” the vehicle to a heading, which makes it easier to go in straight lines. I thought – “hmm how hard could this be to duplicate in a flight sim?”. So here we are.

This post will get X-Plane hooked up to Python. The real programming will start with the next post.

2nd most recent post (added 2024-03-24) – Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

Most recent post – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot

Video Link

Contents

  1. Download and install X-Plane (I used X-Plane 10 because it uses less resources than X-Plane 11 and we don’t need the graphics/scenery to look super pretty to do coding. It also loads faster.)
  2. Download and install NASA’s XPlaneConnect X-Plane plug-in to X-Plane
  3. Verify the XPlaneConnect plug-in is active in X-Plane
  4. Download sample code from XPlaneConnect’s GitHub page
  5. Run the sample script to verify data is being transmitted from X-Plane via UDP to the XPlaneConnect code

1 – Download and install X-Plane 10 or X-Plane 11

I’ll leave this one up to you. X-Plane 10 is hard to find these days I just discovered. X-Plane 11 is available on Steam for $59.99 as of writing. I just tested and the plug-in/code works fine on X-Plane 11 (but the flight models are definitely different and will need different PID values). My screenshots might jump between the two versions but the content/message will be the same.

2 – Download and install NASA’s XPlaneConnect plug-in

NASA (yes, that NASA, the National Aeronautics and Space Administration) has wrote a bunch of code to interface with X-Plane. They have adapters for C, C++, Java, Matlab, and Python. They work with X-Plane 9, 10, and 11.

  1. Download the latest version from the XPlaneConnect GitHub releases page, 1.3 RC6 as of writing
  2. Open the .zip and place the contents in the [X-Plane directory]/Resources/plugins folder. There are few other folders already present in this directory. Mine looked like this after adding the XPlaneConnect folder:
Screenshot of X-Plane 10 plugins directory with XPlaneConnect folder added
Screenshot of X-Plane 11 plugins directory with XPlaneConnect folder added

3 – Verify XPlaneConnect is active in X-Plane

Now we’ll load up X-Plane and check the plug-ins to verify XPlaneConnect is enabled. Go to the top menu and select Plugins -> Plugin Admin. You should see X-Plane Connect checked in the enabled column:

Screenshot showing XPlaneConnect plug-in active in X-Plane 11
Screenshot showing XPlaneConnect plug-in active in X-Plane 10

4 – Download sample code from XPlaneConnect’s GitHub page

From the Python3 portion of the GitHub code, download xpc.py and monitorExample.py and stick them in your working directory (doesn’t matter where). For me, I just downloaded the entire git structure so the code is at C:\Users\Austin\source\repos\XPlaneConnect\Python3\src:

Screenshot showing xpc.py and monitorExample.py in my working directory

5 – Run sample code to verify data is making it from X-Plane to our code

With X-Plane running with a plane on a runway (or anywhere really), go ahead and run monitorExample.py! I will be using Visual Studio Code to program this XPlane Python autopilot stuff so that’s where I’ll run it from.

You will start seeing lines scroll by very fast with 6 pieces of information – latitude, longitude, elevation (in meters), and the control deflections for aileron, elevator, and rudder (normalized from -1 to 1, with 0 being centered). In the below screenshot, we see a lat/lon of 39.915, -105.128, with an elevation of 1719m. First one to tell me in the comments what runway that is wins internet points!

Screenshot showing Visual Studio Code running monitorExample.py in front of X-Plane 10 and the output scrolling by.

Conclusion

In this post, we have successfully downloaded the XPlaneConnect plug-in, and demonstrated that it can successfully interface with Python code in both X-Plane 10 and X-Plane 11.

Next up is to start controlling the plane with a basic wing leveler. As of writing this post, I have the following completely functional:

  • Pitch / roll hold at reasonable angles (-25 to 25)
  • Altitude set and hold
  • Heading set and hold
  • Airspeed set and hold
  • Navigate directly to a lat/lon point

See you at the next post! Next post – Coding a wing leveler autopilot in X-Plane with Python