I revisited my Python X-Plane autopilot a few weeks ago because it was pretty clunky for how to adjust setpoints and such. The job I started 1.5 years ago is exclusively Python, so I wanted to redo a bit.
Quick aside: For the new PC I just built – Ryzen 9 7900x, 2x32GB 6000 MHz, etc, X-Plane 10 was the 2nd “game” I installed on it. The first was Factorio (I followed Nilaus’ megabase in a book and have got to 5k SPM). Haven’t tried the newer sims yet, but I think they’ll still be somewhat limited by my RTX 2080 Super.
Well imagine my surprise when I woke up to 6x the normal daily hits by 7am. I checked the weblogs and found that my post was trending on ycombinator.com (Hacker News). So I am going to skip pretty much all background and just post the updated code for now, and will go back and clean up this post at some point.
Without further ado: here’s what the super basic dashboard looks like

I have it separated into two main running python programs, the file that interacts with X-Plane itself, and the Flask part.
Check here for the updated code / next iteration where I add track following and direct-to functionality – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot.
Here’s the adjusted autopilot code to check with Redis for the setpoints every loop execution:
# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID
import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
setpoints = {
"desired_roll": 0,
"desired_pitch": 2,
"desired_speed": 160,
"desired_alt": 8000.0,
"desired_hdg": 140,
"autopilot_enabled": 0
}
for key in setpoints:
# if the key exists in the redis db, use it
# otherwise, set it
if r.exists(key):
setpoints[key] = float(r.get(key))
else:
r.set(key, setpoints[key])
update_interval = 0.10 #seconds
update_frequency = 1/update_interval
P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0
roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]
pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]
altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]
speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]
heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0
DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
"sim/flightmodel/failures/onground_any",
"sim/flightmodel/misc/h_ind"]
def normalize(value, min=-1, max=1):
if (value > max):
return max
elif (value < min):
return min
else:
return value
def sleep_until_next_tick(update_frequency):
# Calculate the update interval from the frequency
update_interval = 1.0 / update_frequency
# Get the current time
current_time = time.time()
# Calculate the time remaining until the next tick
sleep_time = update_interval - (current_time % update_interval)
# Sleep for the remaining time
time.sleep(sleep_time)
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
r = (b2 - b1) % 360.0
# Python modulus has same sign as divisor, which is positive here,
# so no need to consider negative case
if r >= 180.0:
r -= 360.0
return r
# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
"""
Calculates the bearing between two points.
The formulae used is the following:
θ = atan2(sin(Δlong).cos(lat2),
cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
:Parameters:
- `pointA: The tuple representing the latitude/longitude for the
first point. Latitude and longitude must be in decimal degrees
- `pointB: The tuple representing the latitude/longitude for the
second point. Latitude and longitude must be in decimal degrees
:Returns:
The bearing in degrees
:Returns Type:
float
"""
if (type(pointA) != tuple) or (type(pointB) != tuple):
raise TypeError("Only tuples are supported as arguments")
lat1 = math.radians(pointA[0])
lat2 = math.radians(pointB[0])
diffLong = math.radians(pointB[1] - pointA[1])
x = math.sin(diffLong) * math.cos(lat2)
y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
* math.cos(lat2) * math.cos(diffLong))
initial_bearing = math.atan2(x, y)
# Now we have the initial bearing but math.atan2 return values
# from -180° to + 180° which is not what we want for a compass bearing
# The solution is to normalize the initial bearing as shown below
initial_bearing = math.degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
return compass_bearing
# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
R = 6372800 # Earth radius in meters
lat1, lon1 = coord1
lat2, lon2 = coord2
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + \
math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))
KBJC_lat = 39.9088056
KBJC_lon = -105.1171944
def write_position_to_redis(position):
# position is a list of 7 floats
# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
position_str = ','.join([str(x) for x in position])
r.set('position', position_str)
for i in range(len(position_elements)):
r.set(f"position/{position_elements[i]}", position[i])
# position_str = ','.join([str(x) for x in position])
# r.publish('position_updates', position_str)
def get_setpoints_from_redis():
setpoints = {
"desired_roll": 0,
"desired_pitch": 2,
"desired_speed": 160,
"desired_alt": 8000.0,
"desired_hdg": 140
}
for key in setpoints:
# if the key exists in the redis db, use it
# otherwise, set it
if r.exists(key):
setpoints[key] = float(r.get(key))
else:
r.set(key, setpoints[key])
return setpoints
def get_autopilot_enabled_from_redis():
if r.exists("autopilot_enabled"):
return int(r.get("autopilot_enabled").decode('utf-8')) == 1
ele_positions = []
ail_positions = []
thr_positions = []
def update_control_position_history(ctrl):
ele_positions.append(ctrl[0])
ail_positions.append(ctrl[1])
thr_positions.append(ctrl[3])
# if the list is longer than 20, pop the first element
if len(ele_positions) > 20:
ele_positions.pop(0)
ail_positions.pop(0)
thr_positions.pop(0)
def monitor():
with xpc.XPlaneConnect() as client:
while True:
loop_start = datetime.now()
print(f"loop start - {loop_start}")
posi = client.getPOSI()
write_position_to_redis(posi)
ctrl = client.getCTRL()
bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
#desired_hdg = 116 #bearing_to_kbjc
multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2
current_roll = posi[4]
current_pitch = posi[3]
#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
current_hdg = multi_DREFs[1][0]
current_altitude = multi_DREFs[3][0]
current_asi = multi_DREFs[0][0]
onground = multi_DREFs[2][0]
# get the setpoints from redis
setpoints = get_setpoints_from_redis()
desired_hdg = setpoints["desired_hdg"]
desired_alt = setpoints["desired_alt"]
desired_speed = setpoints["desired_speed"]
# outer loops first
altitude_PID.SetPoint = desired_alt
altitude_PID.update(current_altitude)
heading_error = get_angle_difference(desired_hdg, current_hdg)
heading_error_PID.update(heading_error)
speed_PID.SetPoint = desired_speed
new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
# if new_pitch_from_altitude > 15:
# new_pitch_from_altitude = 15
# elif new_pitch_from_altitude < -15:
# new_pitch_from_altitude = -15
pitch_PID.SetPoint = new_pitch_from_altitude
roll_PID.SetPoint = new_roll_from_heading_error
roll_PID.update(current_roll)
speed_PID.update(current_asi)
pitch_PID.update(current_pitch)
new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)
previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0
# not currently functional - need to work on this
# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# update the control positions
# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))
onground = -1
if onground == 1:
print("on ground, not sending controls")
else:
if get_autopilot_enabled_from_redis():
# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
client.sendCTRL(ctrl)
loop_end = datetime.now()
loop_duration = loop_end - loop_start
output = f"current values -- roll: {current_roll: 0.3f}, pitch: {current_pitch: 0.3f}, hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
output = output + "\n" + f"hdg error: {heading_error: 0.3f}"
output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f}, ele: {new_ele_ctrl: 0.4f}, thr: {new_thr_ctrl:0.4f}"
output = output + "\n" + f"PID outputs -- altitude: {altitude_PID.output: 0.4f}, pitch: {pitch_PID.output: 0.4f}, ail: {roll_PID.output: 0.3f}, hdg: {heading_error_PID.output: 0.3f}"
output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
print(output)
sleep_until_next_tick(update_frequency)
os.system('cls' if os.name == 'nt' else 'clear')
if __name__ == "__main__":
monitor()
And the flask backend/front end. WebSockets are super cool – never used them before this. I was thinking I’d have to make a bunch of endpoints for every type of autopilot change I need. But this handles it far nicer:
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis
app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)
setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients
@app.route('/')
def index():
return render_template('index.html') # You'll need to create an HTML template
def update_setpoint(label, adjustment):
# This function can be adapted to update setpoints and then emit updates via WebSocket
current_value = float(r.get(label)) if r.exists(label) else 0.0
if label == 'desired_hdg':
new_value = (current_value + adjustment) % 360
elif label == 'autopilot_enabled':
new_value = adjustment
else:
new_value = current_value + adjustment
r.set(label, new_value)
# socketio.emit('update_setpoint', {label: new_value}) # Emit update to clients
return new_value
@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
label = json['label']
adjustment = json['adjustment']
# Your logic to adjust the setpoint in Redis and calculate new_value
new_value = update_setpoint(label, adjustment)
# Emit updated setpoint to all clients
emit('update_setpoint', {label: new_value}, broadcast=True)
@socketio.on('connect')
def handle_connect():
# Fetch initial setpoints from Redis
initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
# Emit the initial setpoints to the connected client
emit('update_setpoints', initial_setpoints)
if __name__ == '__main__':
socketio.run(app)
And the http template:
<!DOCTYPE html>
<html>
<head>
<title>Autopilot Interface</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
var socket; // Declare socket globally
// Define adjustSetpoint globally
function adjustSetpoint(label, adjustment) {
socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
}
document.addEventListener('DOMContentLoaded', () => {
socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
socket.on('connect', () => {
console.log("Connected to WebSocket server.");
});
// Listen for update_setpoints event to initialize the UI with Redis values
socket.on('update_setpoints', function(setpoints) {
for (const [label, value] of Object.entries(setpoints)) {
const element = document.getElementById(label);
if (element) {
element.innerHTML = value;
}
}
});
// Listen for update_setpoint events from the server
socket.on('update_setpoint', data => {
// Assuming 'data' is an object like {label: new_value}
for (const [label, value] of Object.entries(data)) {
// Update the displayed value on the webpage
const element = document.getElementById(label);
if (element) {
element.innerHTML = value;
}
}
});
});
</script>
</head>
<body>
<h1>Autopilot Interface</h1>
<p>Current Setpoints:</p>
<ul>
<li>Heading: <span id="desired_hdg">0</span></li>
<li>Altitude: <span id="desired_alt">0</span></li>
<li>Speed: <span id="desired_speed">0</span></li>
</ul>
<p>Autopilot: <span id="autopilot_enabled">0</span></p>
<!-- Example buttons for adjusting setpoints -->
<button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
<button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
<br>
<button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
<button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
<br>
<button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
<button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
<br>
<br>
<button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
<button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>
</body>
</html>
This should be enough to get you going. I’ll come back and clean it up later (both my kids just woke up – 1.5 and 3.5 years!)

4 replies on “Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets”
[…] March 2024 update – we now have a 1.5 year old AND the 15 month old is 3.5 years old! Link to the next post in this series – Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets […]
[…] Most recent post (added 2024-03-24) – Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets […]
[…] from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I have a bit of momentum going on the Python X-Plane Autopilot stuff. There were a couple of […]
I noticed that in the altitude PID you set
`altitude_PID = PID.PID(P*2, P/2, D)`
Is it intentional that you use P for the second parameter rather than I? Given the values, P/2 is only slightly greater than I*2, but just curious.