Categories
Python XPlane

Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

I revisited my Python X-Plane autopilot a few weeks ago because it was pretty clunky for how to adjust setpoints and such. The job I started 1.5 years ago is exclusively Python, so I wanted to redo a bit.

Quick aside: For the new PC I just built – Ryzen 9 7900x, 2x32GB 6000 MHz, etc, X-Plane 10 was the 2nd “game” I installed on it. The first was Factorio (I followed Nilaus’ megabase in a book and have got to 5k SPM). Haven’t tried the newer sims yet, but I think they’ll still be somewhat limited by my RTX 2080 Super.

Well imagine my surprise when I woke up to 6x the normal daily hits by 7am. I checked the weblogs and found that my post was trending on ycombinator.com (Hacker News). So I am going to skip pretty much all background and just post the updated code for now, and will go back and clean up this post at some point.

Without further ado: here’s what the super basic dashboard looks like

Screenshot showing HTML autopilot interface running x-plane via Python

I have it separated into two main running python programs, the file that interacts with X-Plane itself, and the Flask part.

Check here for the updated code / next iteration where I add track following and direct-to functionality – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot.

Here’s the adjusted autopilot code to check with Redis for the setpoints every loop execution:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID
import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
setpoints = {
"desired_roll": 0,
"desired_pitch": 2,
"desired_speed": 160,
"desired_alt": 8000.0,
"desired_hdg": 140,
"autopilot_enabled": 0
}
for key in setpoints:
# if the key exists in the redis db, use it
# otherwise, set it
if r.exists(key):
setpoints[key] = float(r.get(key))
else:
r.set(key, setpoints[key])
update_interval = 0.10 #seconds
update_frequency = 1/update_interval
P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0
roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]
pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]
altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]
speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]
heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0
DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
"sim/flightmodel/failures/onground_any",
"sim/flightmodel/misc/h_ind"]
def normalize(value, min=-1, max=1):
if (value > max):
return max
elif (value < min):
return min
else:
return value
def sleep_until_next_tick(update_frequency):
# Calculate the update interval from the frequency
update_interval = 1.0 / update_frequency
# Get the current time
current_time = time.time()
# Calculate the time remaining until the next tick
sleep_time = update_interval - (current_time % update_interval)
# Sleep for the remaining time
time.sleep(sleep_time)
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
r = (b2 - b1) % 360.0
# Python modulus has same sign as divisor, which is positive here,
# so no need to consider negative case
if r >= 180.0:
r -= 360.0
return r
# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
"""
Calculates the bearing between two points.
The formulae used is the following:
θ = atan2(sin(Δlong).cos(lat2),
cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
:Parameters:
- `pointA: The tuple representing the latitude/longitude for the
first point. Latitude and longitude must be in decimal degrees
- `pointB: The tuple representing the latitude/longitude for the
second point. Latitude and longitude must be in decimal degrees
:Returns:
The bearing in degrees
:Returns Type:
float
"""
if (type(pointA) != tuple) or (type(pointB) != tuple):
raise TypeError("Only tuples are supported as arguments")
lat1 = math.radians(pointA[0])
lat2 = math.radians(pointB[0])
diffLong = math.radians(pointB[1] - pointA[1])
x = math.sin(diffLong) * math.cos(lat2)
y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
* math.cos(lat2) * math.cos(diffLong))
initial_bearing = math.atan2(x, y)
# Now we have the initial bearing but math.atan2 return values
# from -180° to + 180° which is not what we want for a compass bearing
# The solution is to normalize the initial bearing as shown below
initial_bearing = math.degrees(initial_bearing)
compass_bearing = (initial_bearing + 360) % 360
return compass_bearing
# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
R = 6372800 # Earth radius in meters
lat1, lon1 = coord1
lat2, lon2 = coord2
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + \
math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))
KBJC_lat = 39.9088056
KBJC_lon = -105.1171944
def write_position_to_redis(position):
# position is a list of 7 floats
# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
position_str = ','.join([str(x) for x in position])
r.set('position', position_str)
for i in range(len(position_elements)):
r.set(f"position/{position_elements[i]}", position[i])
# position_str = ','.join([str(x) for x in position])
# r.publish('position_updates', position_str)
def get_setpoints_from_redis():
setpoints = {
"desired_roll": 0,
"desired_pitch": 2,
"desired_speed": 160,
"desired_alt": 8000.0,
"desired_hdg": 140
}
for key in setpoints:
# if the key exists in the redis db, use it
# otherwise, set it
if r.exists(key):
setpoints[key] = float(r.get(key))
else:
r.set(key, setpoints[key])
return setpoints
def get_autopilot_enabled_from_redis():
if r.exists("autopilot_enabled"):
return int(r.get("autopilot_enabled").decode('utf-8')) == 1
ele_positions = []
ail_positions = []
thr_positions = []
def update_control_position_history(ctrl):
ele_positions.append(ctrl[0])
ail_positions.append(ctrl[1])
thr_positions.append(ctrl[3])
# if the list is longer than 20, pop the first element
if len(ele_positions) > 20:
ele_positions.pop(0)
ail_positions.pop(0)
thr_positions.pop(0)
def monitor():
with xpc.XPlaneConnect() as client:
while True:
loop_start = datetime.now()
print(f"loop start - {loop_start}")
posi = client.getPOSI()
write_position_to_redis(posi)
ctrl = client.getCTRL()
bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
#desired_hdg = 116 #bearing_to_kbjc
multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2
current_roll = posi[4]
current_pitch = posi[3]
#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
current_hdg = multi_DREFs[1][0]
current_altitude = multi_DREFs[3][0]
current_asi = multi_DREFs[0][0]
onground = multi_DREFs[2][0]
# get the setpoints from redis
setpoints = get_setpoints_from_redis()
desired_hdg = setpoints["desired_hdg"]
desired_alt = setpoints["desired_alt"]
desired_speed = setpoints["desired_speed"]
# outer loops first
altitude_PID.SetPoint = desired_alt
altitude_PID.update(current_altitude)
heading_error = get_angle_difference(desired_hdg, current_hdg)
heading_error_PID.update(heading_error)
speed_PID.SetPoint = desired_speed
new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
# if new_pitch_from_altitude > 15:
# new_pitch_from_altitude = 15
# elif new_pitch_from_altitude < -15:
# new_pitch_from_altitude = -15
pitch_PID.SetPoint = new_pitch_from_altitude
roll_PID.SetPoint = new_roll_from_heading_error
roll_PID.update(current_roll)
speed_PID.update(current_asi)
pitch_PID.update(current_pitch)
new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)
previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0
# not currently functional - need to work on this
# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
# update the control positions
# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))
onground = -1
if onground == 1:
print("on ground, not sending controls")
else:
if get_autopilot_enabled_from_redis():
# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
client.sendCTRL(ctrl)
loop_end = datetime.now()
loop_duration = loop_end - loop_start
output = f"current values -- roll: {current_roll: 0.3f}, pitch: {current_pitch: 0.3f}, hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
output = output + "\n" + f"hdg error: {heading_error: 0.3f}"
output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f}, ele: {new_ele_ctrl: 0.4f}, thr: {new_thr_ctrl:0.4f}"
output = output + "\n" + f"PID outputs -- altitude: {altitude_PID.output: 0.4f}, pitch: {pitch_PID.output: 0.4f}, ail: {roll_PID.output: 0.3f}, hdg: {heading_error_PID.output: 0.3f}"
output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
print(output)
sleep_until_next_tick(update_frequency)
os.system('cls' if os.name == 'nt' else 'clear')
if __name__ == "__main__":
monitor()
# https://onion.io/2bt-pid-control-python/ # https://github.com/ivmech/ivPID import sys import os import xpc from datetime import datetime, timedelta import PID import time import math, numpy import redis r = redis.StrictRedis(host='localhost', port=6379, db=0) setpoints = { "desired_roll": 0, "desired_pitch": 2, "desired_speed": 160, "desired_alt": 8000.0, "desired_hdg": 140, "autopilot_enabled": 0 } for key in setpoints: # if the key exists in the redis db, use it # otherwise, set it if r.exists(key): setpoints[key] = float(r.get(key)) else: r.set(key, setpoints[key]) update_interval = 0.10 #seconds update_frequency = 1/update_interval P = 0.05 I = 0.01 D = 0 MAX_DEFLECTION_PER_SECOND = 2.0 roll_PID = PID.PID(P*2, I*2, D) roll_PID.SetPoint = setpoints["desired_roll"] pitch_PID = PID.PID(P, I, D) pitch_PID.SetPoint = setpoints["desired_pitch"] altitude_PID = PID.PID(P*2, P/2, D) altitude_PID.SetPoint = setpoints["desired_alt"] speed_PID = PID.PID(P, I, D) speed_PID.SetPoint = setpoints["desired_speed"] heading_error_PID = PID.PID(1,0.05,0.1) heading_error_PID.SetPoint = 0 # need heading error to be 0 DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot", "sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot", "sim/flightmodel/failures/onground_any", "sim/flightmodel/misc/h_ind"] def normalize(value, min=-1, max=1): if (value > max): return max elif (value < min): return min else: return value def sleep_until_next_tick(update_frequency): # Calculate the update interval from the frequency update_interval = 1.0 / update_frequency # Get the current time current_time = time.time() # Calculate the time remaining until the next tick sleep_time = update_interval - (current_time % update_interval) # Sleep for the remaining time time.sleep(sleep_time) # https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python def get_angle_difference(b1, b2): r = (b2 - b1) % 360.0 # Python modulus has same sign as divisor, which is positive here, # so no need to consider negative case if r >= 180.0: r -= 360.0 return r # https://gist.github.com/jeromer/2005586 def get_bearing(pointA, pointB): """ Calculates the bearing between two points. The formulae used is the following: θ = atan2(sin(Δlong).cos(lat2), cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong)) :Parameters: - `pointA: The tuple representing the latitude/longitude for the first point. Latitude and longitude must be in decimal degrees - `pointB: The tuple representing the latitude/longitude for the second point. Latitude and longitude must be in decimal degrees :Returns: The bearing in degrees :Returns Type: float """ if (type(pointA) != tuple) or (type(pointB) != tuple): raise TypeError("Only tuples are supported as arguments") lat1 = math.radians(pointA[0]) lat2 = math.radians(pointB[0]) diffLong = math.radians(pointB[1] - pointA[1]) x = math.sin(diffLong) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) * math.cos(lat2) * math.cos(diffLong)) initial_bearing = math.atan2(x, y) # Now we have the initial bearing but math.atan2 return values # from -180° to + 180° which is not what we want for a compass bearing # The solution is to normalize the initial bearing as shown below initial_bearing = math.degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 return compass_bearing # https://janakiev.com/blog/gps-points-distance-python/ def haversine(coord1, coord2): R = 6372800 # Earth radius in meters lat1, lon1 = coord1 lat2, lon2 = coord2 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlambda = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + \ math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2 return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a)) KBJC_lat = 39.9088056 KBJC_lon = -105.1171944 def write_position_to_redis(position): # position is a list of 7 floats # position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator] position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"] position_str = ','.join([str(x) for x in position]) r.set('position', position_str) for i in range(len(position_elements)): r.set(f"position/{position_elements[i]}", position[i]) # position_str = ','.join([str(x) for x in position]) # r.publish('position_updates', position_str) def get_setpoints_from_redis(): setpoints = { "desired_roll": 0, "desired_pitch": 2, "desired_speed": 160, "desired_alt": 8000.0, "desired_hdg": 140 } for key in setpoints: # if the key exists in the redis db, use it # otherwise, set it if r.exists(key): setpoints[key] = float(r.get(key)) else: r.set(key, setpoints[key]) return setpoints def get_autopilot_enabled_from_redis(): if r.exists("autopilot_enabled"): return int(r.get("autopilot_enabled").decode('utf-8')) == 1 ele_positions = [] ail_positions = [] thr_positions = [] def update_control_position_history(ctrl): ele_positions.append(ctrl[0]) ail_positions.append(ctrl[1]) thr_positions.append(ctrl[3]) # if the list is longer than 20, pop the first element if len(ele_positions) > 20: ele_positions.pop(0) ail_positions.pop(0) thr_positions.pop(0) def monitor(): with xpc.XPlaneConnect() as client: while True: loop_start = datetime.now() print(f"loop start - {loop_start}") posi = client.getPOSI() write_position_to_redis(posi) ctrl = client.getCTRL() bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon)) dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon)) #desired_hdg = 116 #bearing_to_kbjc multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2 current_roll = posi[4] current_pitch = posi[3] #current_hdg = posi[5] # this is true, need to use DREF to get mag '' current_hdg = multi_DREFs[1][0] current_altitude = multi_DREFs[3][0] current_asi = multi_DREFs[0][0] onground = multi_DREFs[2][0] # get the setpoints from redis setpoints = get_setpoints_from_redis() desired_hdg = setpoints["desired_hdg"] desired_alt = setpoints["desired_alt"] desired_speed = setpoints["desired_speed"] # outer loops first altitude_PID.SetPoint = desired_alt altitude_PID.update(current_altitude) heading_error = get_angle_difference(desired_hdg, current_hdg) heading_error_PID.update(heading_error) speed_PID.SetPoint = desired_speed new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10) new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25) # if new_pitch_from_altitude > 15: # new_pitch_from_altitude = 15 # elif new_pitch_from_altitude < -15: # new_pitch_from_altitude = -15 pitch_PID.SetPoint = new_pitch_from_altitude roll_PID.SetPoint = new_roll_from_heading_error roll_PID.update(current_roll) speed_PID.update(current_asi) pitch_PID.update(current_pitch) new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1) new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1) new_thr_ctrl = normalize(speed_PID.output, min=0, max=1) previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0 previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0 previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0 # not currently functional - need to work on this # new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # update the control positions # update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited)) update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl)) onground = -1 if onground == 1: print("on ground, not sending controls") else: if get_autopilot_enabled_from_redis(): # ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited] ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl] client.sendCTRL(ctrl) loop_end = datetime.now() loop_duration = loop_end - loop_start output = f"current values -- roll: {current_roll: 0.3f}, pitch: {current_pitch: 0.3f}, hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}" output = output + "\n" + f"hdg error: {heading_error: 0.3f}" output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f}, ele: {new_ele_ctrl: 0.4f}, thr: {new_thr_ctrl:0.4f}" output = output + "\n" + f"PID outputs -- altitude: {altitude_PID.output: 0.4f}, pitch: {pitch_PID.output: 0.4f}, ail: {roll_PID.output: 0.3f}, hdg: {heading_error_PID.output: 0.3f}" output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM" output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms" print(output) sleep_until_next_tick(update_frequency) os.system('cls' if os.name == 'nt' else 'clear') if __name__ == "__main__": monitor()
# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID

import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints = {
	"desired_roll": 0,
	"desired_pitch": 2,
	"desired_speed": 160,
	"desired_alt": 8000.0,
	"desired_hdg": 140,
	"autopilot_enabled": 0
}

for key in setpoints:
	# if the key exists in the redis db, use it
	# otherwise, set it
	if r.exists(key):
		setpoints[key] = float(r.get(key))
	else:
		r.set(key, setpoints[key])

update_interval = 0.10 #seconds
update_frequency = 1/update_interval

P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0

roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]

pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]

altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]

speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]

heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def normalize(value, min=-1, max=1):
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

def sleep_until_next_tick(update_frequency):
    # Calculate the update interval from the frequency
    update_interval = 1.0 / update_frequency

    # Get the current time
    current_time = time.time()

    # Calculate the time remaining until the next tick
    sleep_time = update_interval - (current_time % update_interval)

    # Sleep for the remaining time
    time.sleep(sleep_time)
	
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
	r = (b2 - b1) % 360.0
	# Python modulus has same sign as divisor, which is positive here,
	# so no need to consider negative case
	if r >= 180.0:
		r -= 360.0
	return r

# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
    """
    Calculates the bearing between two points.
    The formulae used is the following:
        θ = atan2(sin(Δlong).cos(lat2),
                  cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
    :Parameters:
      - `pointA: The tuple representing the latitude/longitude for the
        first point. Latitude and longitude must be in decimal degrees
      - `pointB: The tuple representing the latitude/longitude for the
        second point. Latitude and longitude must be in decimal degrees
    :Returns:
      The bearing in degrees
    :Returns Type:
      float
    """
    if (type(pointA) != tuple) or (type(pointB) != tuple):
        raise TypeError("Only tuples are supported as arguments")

    lat1 = math.radians(pointA[0])
    lat2 = math.radians(pointB[0])

    diffLong = math.radians(pointB[1] - pointA[1])

    x = math.sin(diffLong) * math.cos(lat2)
    y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
            * math.cos(lat2) * math.cos(diffLong))

    initial_bearing = math.atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = math.degrees(initial_bearing)
    compass_bearing = (initial_bearing + 360) % 360

    return compass_bearing

# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
    R = 6372800  # Earth radius in meters
    lat1, lon1 = coord1
    lat2, lon2 = coord2
    
    phi1, phi2 = math.radians(lat1), math.radians(lat2) 
    dphi       = math.radians(lat2 - lat1)
    dlambda    = math.radians(lon2 - lon1)
    
    a = math.sin(dphi/2)**2 + \
        math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    
    return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))

KBJC_lat = 39.9088056
KBJC_lon = -105.1171944

def write_position_to_redis(position):
	# position is a list of 7 floats
	# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
	position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
	position_str = ','.join([str(x) for x in position])
	r.set('position', position_str)
	for i in range(len(position_elements)):
		r.set(f"position/{position_elements[i]}", position[i])
	
	# position_str = ','.join([str(x) for x in position])
	# r.publish('position_updates', position_str)
		
def get_setpoints_from_redis():
	setpoints = {
		"desired_roll": 0,
		"desired_pitch": 2,
		"desired_speed": 160,
		"desired_alt": 8000.0,
		"desired_hdg": 140
	}
	for key in setpoints:
		# if the key exists in the redis db, use it
		# otherwise, set it
		if r.exists(key):
			setpoints[key] = float(r.get(key))
		else:
			r.set(key, setpoints[key])
	return setpoints

def get_autopilot_enabled_from_redis():
	if r.exists("autopilot_enabled"):
		return int(r.get("autopilot_enabled").decode('utf-8')) == 1

ele_positions = []
ail_positions = []
thr_positions = []

def update_control_position_history(ctrl):
	ele_positions.append(ctrl[0])
	ail_positions.append(ctrl[1])
	thr_positions.append(ctrl[3])

	# if the list is longer than 20, pop the first element
	if len(ele_positions) > 20:
		ele_positions.pop(0)
		ail_positions.pop(0)
		thr_positions.pop(0)

def monitor():
	with xpc.XPlaneConnect() as client:
		while True:
			loop_start = datetime.now()
			print(f"loop start - {loop_start}")
			posi = client.getPOSI()
			write_position_to_redis(posi)

			ctrl = client.getCTRL()

			bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			#desired_hdg = 116 #bearing_to_kbjc

			multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2

			current_roll = posi[4]
			current_pitch = posi[3]
			#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
			current_hdg = multi_DREFs[1][0]
			current_altitude = multi_DREFs[3][0]
			current_asi = multi_DREFs[0][0]
			onground = multi_DREFs[2][0]

			# get the setpoints from redis
			setpoints = get_setpoints_from_redis()
			desired_hdg = setpoints["desired_hdg"]
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			# outer loops first
			altitude_PID.SetPoint = desired_alt
			altitude_PID.update(current_altitude)

			heading_error = get_angle_difference(desired_hdg, current_hdg)
			heading_error_PID.update(heading_error)

			speed_PID.SetPoint = desired_speed
			

			new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
			new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
			# if new_pitch_from_altitude > 15:
			# 	new_pitch_from_altitude = 15
			# elif new_pitch_from_altitude < -15:
			# 	new_pitch_from_altitude = -15
			
			pitch_PID.SetPoint = new_pitch_from_altitude
			roll_PID.SetPoint = new_roll_from_heading_error

			roll_PID.update(current_roll)
			speed_PID.update(current_asi)
			pitch_PID.update(current_pitch)

			new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
			new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
			new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)

			previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
			previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
			previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0

			# not currently functional - need to work on this 
			# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency

			# update the control positions
			# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
			update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))

			onground = -1
			if onground == 1:
				print("on ground, not sending controls")
			else:
				if get_autopilot_enabled_from_redis():
					# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
					ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
					client.sendCTRL(ctrl)

			loop_end = datetime.now()
			loop_duration = loop_end - loop_start

			output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f},    hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
			output = output + "\n" + f"hdg error:                 {heading_error: 0.3f}"
			output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f},    ele: {new_ele_ctrl: 0.4f},   thr: {new_thr_ctrl:0.4f}"
			output = output + "\n" + f"PID outputs --   altitude: {altitude_PID.output: 0.4f},  pitch: {pitch_PID.output: 0.4f},   ail: {roll_PID.output: 0.3f},  hdg: {heading_error_PID.output: 0.3f}"
			output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
			output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
			print(output)
			sleep_until_next_tick(update_frequency)
			os.system('cls' if os.name == 'nt' else 'clear')



if __name__ == "__main__":
	monitor()

And the flask backend/front end. WebSockets are super cool – never used them before this. I was thinking I’d have to make a bunch of endpoints for every type of autopilot change I need. But this handles it far nicer:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis
app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)
setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients
@app.route('/')
def index():
return render_template('index.html') # You'll need to create an HTML template
def update_setpoint(label, adjustment):
# This function can be adapted to update setpoints and then emit updates via WebSocket
current_value = float(r.get(label)) if r.exists(label) else 0.0
if label == 'desired_hdg':
new_value = (current_value + adjustment) % 360
elif label == 'autopilot_enabled':
new_value = adjustment
else:
new_value = current_value + adjustment
r.set(label, new_value)
# socketio.emit('update_setpoint', {label: new_value}) # Emit update to clients
return new_value
@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
label = json['label']
adjustment = json['adjustment']
# Your logic to adjust the setpoint in Redis and calculate new_value
new_value = update_setpoint(label, adjustment)
# Emit updated setpoint to all clients
emit('update_setpoint', {label: new_value}, broadcast=True)
@socketio.on('connect')
def handle_connect():
# Fetch initial setpoints from Redis
initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
# Emit the initial setpoints to the connected client
emit('update_setpoints', initial_setpoints)
if __name__ == '__main__':
socketio.run(app)
from flask import Flask, render_template from flask_socketio import SocketIO, emit import redis app = Flask(__name__) socketio = SocketIO(app) r = redis.StrictRedis(host='localhost', port=6379, db=0) setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed'] # get initial setpoints from Redis, send to clients @app.route('/') def index(): return render_template('index.html') # You'll need to create an HTML template def update_setpoint(label, adjustment): # This function can be adapted to update setpoints and then emit updates via WebSocket current_value = float(r.get(label)) if r.exists(label) else 0.0 if label == 'desired_hdg': new_value = (current_value + adjustment) % 360 elif label == 'autopilot_enabled': new_value = adjustment else: new_value = current_value + adjustment r.set(label, new_value) # socketio.emit('update_setpoint', {label: new_value}) # Emit update to clients return new_value @socketio.on('adjust_setpoint') def handle_adjust_setpoint(json): label = json['label'] adjustment = json['adjustment'] # Your logic to adjust the setpoint in Redis and calculate new_value new_value = update_setpoint(label, adjustment) # Emit updated setpoint to all clients emit('update_setpoint', {label: new_value}, broadcast=True) @socketio.on('connect') def handle_connect(): # Fetch initial setpoints from Redis initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest} # Emit the initial setpoints to the connected client emit('update_setpoints', initial_setpoints) if __name__ == '__main__': socketio.run(app)
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_value = float(r.get(label)) if r.exists(label) else 0.0

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And the http template:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<!DOCTYPE html>
<html>
<head>
<title>Autopilot Interface</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
var socket; // Declare socket globally
// Define adjustSetpoint globally
function adjustSetpoint(label, adjustment) {
socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
}
document.addEventListener('DOMContentLoaded', () => {
socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
socket.on('connect', () => {
console.log("Connected to WebSocket server.");
});
// Listen for update_setpoints event to initialize the UI with Redis values
socket.on('update_setpoints', function(setpoints) {
for (const [label, value] of Object.entries(setpoints)) {
const element = document.getElementById(label);
if (element) {
element.innerHTML = value;
}
}
});
// Listen for update_setpoint events from the server
socket.on('update_setpoint', data => {
// Assuming 'data' is an object like {label: new_value}
for (const [label, value] of Object.entries(data)) {
// Update the displayed value on the webpage
const element = document.getElementById(label);
if (element) {
element.innerHTML = value;
}
}
});
});
</script>
</head>
<body>
<h1>Autopilot Interface</h1>
<p>Current Setpoints:</p>
<ul>
<li>Heading: <span id="desired_hdg">0</span></li>
<li>Altitude: <span id="desired_alt">0</span></li>
<li>Speed: <span id="desired_speed">0</span></li>
</ul>
<p>Autopilot: <span id="autopilot_enabled">0</span></p>
<!-- Example buttons for adjusting setpoints -->
<button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
<button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
<br>
<button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
<button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
<br>
<button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
<button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
<br>
<br>
<button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
<button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>
</body>
</html>
<!DOCTYPE html> <html> <head> <title>Autopilot Interface</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script> <script type="text/javascript" charset="utf-8"> var socket; // Declare socket globally // Define adjustSetpoint globally function adjustSetpoint(label, adjustment) { socket.emit('adjust_setpoint', {label: label, adjustment: adjustment}); } document.addEventListener('DOMContentLoaded', () => { socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port); socket.on('connect', () => { console.log("Connected to WebSocket server."); }); // Listen for update_setpoints event to initialize the UI with Redis values socket.on('update_setpoints', function(setpoints) { for (const [label, value] of Object.entries(setpoints)) { const element = document.getElementById(label); if (element) { element.innerHTML = value; } } }); // Listen for update_setpoint events from the server socket.on('update_setpoint', data => { // Assuming 'data' is an object like {label: new_value} for (const [label, value] of Object.entries(data)) { // Update the displayed value on the webpage const element = document.getElementById(label); if (element) { element.innerHTML = value; } } }); }); </script> </head> <body> <h1>Autopilot Interface</h1> <p>Current Setpoints:</p> <ul> <li>Heading: <span id="desired_hdg">0</span></li> <li>Altitude: <span id="desired_alt">0</span></li> <li>Speed: <span id="desired_speed">0</span></li> </ul> <p>Autopilot: <span id="autopilot_enabled">0</span></p> <!-- Example buttons for adjusting setpoints --> <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button> <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button> <br> <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button> <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button> <br> <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button> <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button> <br> <br> <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button> <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button> </body> </html>
<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">0</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
    <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    <br>
    <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
    <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    <br>
    <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
    <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    <br>
    <br>
    <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
    <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>

</body>
</html>

This should be enough to get you going. I’ll come back and clean it up later (both my kids just woke up – 1.5 and 3.5 years!)

4 replies on “Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets”

I noticed that in the altitude PID you set
`altitude_PID = PID.PID(P*2, P/2, D)`
Is it intentional that you use P for the second parameter rather than I? Given the values, P/2 is only slightly greater than I*2, but just curious.

Leave a Reply

Your email address will not be published. Required fields are marked *