# https://onion.io/2bt-pid-control-python/ # https://github.com/ivmech/ivPID import sys import os import xpc from datetime import datetime, timedelta import PID import time import math, numpy import redis import pickle import pandas r = redis.StrictRedis(host='localhost', port=6379, db=0) nav_points = pickle.load(open("nav_data.pkl", "rb")) nav_points_dict = nav_points.to_dict(orient='records') def get_nav_point_lat_lon(id): nav_point = nav_points[nav_points["identifier"] == id] return nav_point["latitude"].values[0], nav_point["longitude"].values[0] setpoints = { "desired_roll": 0, "desired_pitch": 2, "desired_speed": 160, "desired_alt": 8000.0, "desired_hdg": 140, "autopilot_enabled": 0 } kbjc_runways = { "30R/12L": { "Runway 12L": { "Latitude": 39.91529286666667, "Longitude": -105.12841313333334 }, "Runway 30R": { "Latitude": 39.901373883333335, "Longitude": -105.10191808333333 } } } test_locations = { "kbjc_tower": { "lat": 39.9064, "lon": -105.1208 }, } for key in setpoints: # if the key exists in the redis db, use it # otherwise, set it if r.exists(key): setpoints[key] = float(r.get(key)) else: r.set(key, setpoints[key]) update_interval = 0.10 #seconds update_frequency = 1/update_interval P = 0.05 I = 0.01 D = 0 MAX_DEFLECTION_PER_SECOND = 2.0 roll_PID = PID.PID(P*2, I*2, D) roll_PID.SetPoint = setpoints["desired_roll"] pitch_PID = PID.PID(P, I, D) pitch_PID.SetPoint = setpoints["desired_pitch"] altitude_PID = PID.PID(P*2, P/2, D) altitude_PID.SetPoint = setpoints["desired_alt"] speed_PID = PID.PID(P, I, D) speed_PID.SetPoint = setpoints["desired_speed"] heading_error_PID = PID.PID(1,0.05,0.1) heading_error_PID.SetPoint = 0 # need heading error to be 0 xte_PID = PID.PID(2,0.03,0.2) xte_PID.SetPoint = 0 DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot", "sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot", "sim/flightmodel/failures/onground_any", "sim/flightmodel/misc/h_ind"] def normalize(value, min=-1, max=1): if (value > max): return max elif (value < min): return min else: return value def sleep_until_next_tick(update_frequency): # Calculate the update interval from the frequency update_interval = 1.0 / update_frequency # Get the current time current_time = time.time() # Calculate the time remaining until the next tick sleep_time = update_interval - (current_time % update_interval) # Sleep for the remaining time time.sleep(sleep_time) # https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python def get_angle_difference(b1, b2): r = (b2 - b1) % 360.0 # Python modulus has same sign as divisor, which is positive here, # so no need to consider negative case if r >= 180.0: r -= 360.0 return r # https://gist.github.com/jeromer/2005586 def get_bearing(pointA, pointB): """ Calculates the bearing between two points. The formulae used is the following: θ = atan2(sin(Δlong).cos(lat2), cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong)) :Parameters: - `pointA: The tuple representing the latitude/longitude for the first point. Latitude and longitude must be in decimal degrees - `pointB: The tuple representing the latitude/longitude for the second point. Latitude and longitude must be in decimal degrees :Returns: The bearing in degrees :Returns Type: float """ if (type(pointA) != tuple) or (type(pointB) != tuple): raise TypeError("Only tuples are supported as arguments") lat1 = math.radians(pointA[0]) lat2 = math.radians(pointB[0]) diffLong = math.radians(pointB[1] - pointA[1]) x = math.sin(diffLong) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) * math.cos(lat2) * math.cos(diffLong)) initial_bearing = math.atan2(x, y) # Now we have the initial bearing but math.atan2 return values # from -180° to + 180° which is not what we want for a compass bearing # The solution is to normalize the initial bearing as shown below initial_bearing = math.degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 return compass_bearing # https://janakiev.com/blog/gps-points-distance-python/ def haversine(coord1, coord2): R = 6372800 # Earth radius in meters lat1, lon1 = coord1 lat2, lon2 = coord2 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlambda = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + \ math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2 return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a)) import math def cross_track_distance(point, start, end): # Convert all latitudes and longitudes from degrees to radians point_lat, point_lon = math.radians(point[0]), math.radians(point[1]) start_lat, start_lon = math.radians(start[0]), math.radians(start[1]) end_lat, end_lon = math.radians(end[0]), math.radians(end[1]) # Calculate the angular distance from start to point # Ensure the argument is within the domain of acos acos_argument = math.sin(start_lat) * math.sin(point_lat) + math.cos(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon) acos_argument = max(-1, min(1, acos_argument)) # Clamp the argument between -1 and 1 delta_sigma = math.acos(acos_argument) # Calculate the bearing from start to point and start to end theta_point = math.atan2(math.sin(point_lon - start_lon) * math.cos(point_lat), math.cos(start_lat) * math.sin(point_lat) - math.sin(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon)) theta_end = math.atan2(math.sin(end_lon - start_lon) * math.cos(end_lat), math.cos(start_lat) * math.sin(end_lat) - math.sin(start_lat) * math.cos(end_lat) * math.cos(end_lon - start_lon)) # Calculate the cross track distance cross_track_dist = math.asin(math.sin(delta_sigma) * math.sin(theta_point - theta_end)) # Convert cross track distance to kilometers by multiplying by the Earth's radius (6371 km) cross_track_dist = cross_track_dist * 6371 return cross_track_dist def test_cross_track_distance(): kbjc_runway_30R_start = (kbjc_runways["30R/12L"]["Runway 30R"]["Latitude"], kbjc_runways["30R/12L"]["Runway 30R"]["Longitude"]) kbjc_runway_30R_end = (kbjc_runways["30R/12L"]["Runway 12L"]["Latitude"], kbjc_runways["30R/12L"]["Runway 12L"]["Longitude"]) kbjc_tower = (test_locations["kbjc_tower"]["lat"], test_locations["kbjc_tower"]["lon"]) print(f"start lat: {kbjc_runway_30R_start[0]}, start lon: {kbjc_runway_30R_start[1]}") print(f"end lat: {kbjc_runway_30R_end[0]}, end lon: {kbjc_runway_30R_end[1]}") print(f"tower lat: {kbjc_tower[0]}, tower lon: {kbjc_tower[1]}") dist = cross_track_distance(kbjc_tower, kbjc_runway_30R_start, kbjc_runway_30R_end) print(f"cross track distance: {dist}") test_cross_track_distance() KBJC_lat = 39.9088056 KBJC_lon = -105.1171944 def write_position_to_redis(position): # position is a list of 7 floats # position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator] position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"] position_str = ','.join([str(x) for x in position]) r.set('position', position_str) for i in range(len(position_elements)): r.set(f"position/{position_elements[i]}", position[i]) # position_str = ','.join([str(x) for x in position]) # r.publish('position_updates', position_str) def get_setpoints_from_redis(): setpoints = { "desired_roll": 0, "desired_pitch": 2, "desired_speed": 160, "desired_alt": 8000.0, "desired_hdg": 140, "autopilot_enabled": 0, "hdg_mode": "hdg", "target_wpt": "BJC" } for key in setpoints: # if the key exists in the redis db, use it # otherwise, set it if r.exists(key): try: setpoints[key] = float(r.get(key)) except: setpoints[key] = r.get(key).decode('utf-8') else: r.set(key, setpoints[key]) return setpoints def get_autopilot_enabled_from_redis(): if r.exists("autopilot_enabled"): return int(r.get("autopilot_enabled").decode('utf-8')) == 1 ele_positions = [] ail_positions = [] thr_positions = [] def update_control_position_history(ctrl): ele_positions.append(ctrl[0]) ail_positions.append(ctrl[1]) thr_positions.append(ctrl[3]) # if the list is longer than 20, pop the first element if len(ele_positions) > 20: ele_positions.pop(0) ail_positions.pop(0) thr_positions.pop(0) def monitor(): with xpc.XPlaneConnect() as client: previous_nav_mode = None previous_nav_target = None d_to_start_coords = None d_to_target_coords = None current_run_start_time = datetime.now() current_run_start_time_str = current_run_start_time.strftime("%Y-%m-%d_%H-%M-%S") current_run_log_filename = f"{current_run_start_time_str}.log" log_file_header = "time,lat,lon,alt,xte_km,xte_pid_out,trk_hdg,hdg_corr,adj_des_hdg,adj_hdg_err" with open(current_run_log_filename, "w") as log_file: log_file.write(log_file_header + "\n") while True: loop_start = datetime.now() print(f"loop start - {loop_start}") posi = client.getPOSI() write_position_to_redis(posi) ctrl = client.getCTRL() bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon)) dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon)) #desired_hdg = 116 #bearing_to_kbjc multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2 current_roll = posi[4] current_pitch = posi[3] #current_hdg = posi[5] # this is true, need to use DREF to get mag '' current_hdg = multi_DREFs[1][0] current_altitude = multi_DREFs[3][0] current_asi = multi_DREFs[0][0] onground = multi_DREFs[2][0] # get the setpoints from redis setpoints = get_setpoints_from_redis() # check if we have just changed to direct-to mode and if so, update the direct to coords. same if the target waypoint has changed if (setpoints["hdg_mode"] == "d_to" and previous_nav_mode != "d_to") or (setpoints["target_wpt"] != previous_nav_target): print("reason for entering this block") print(f"previous nav mode: {previous_nav_mode}, setpoints hdg mode: {setpoints['hdg_mode']}, previous nav target: {previous_nav_target}, setpoints target wpt: {setpoints['target_wpt']}") # d_to_start_coords is the current position, in lat,lon tuple d_to_start_coords = (posi[0], posi[1]) # this function does a lookup in the nav_points dataframe to get the lat, lon of the target waypoint # it could certainly be optimized to use something faster than a pandas dataframe d_to_target_coords = get_nav_point_lat_lon(setpoints["target_wpt"]) # reset xte PID xte_PID.clear() print(f"setting d_to_start_coords to {d_to_start_coords}") # these are unchanged desired_alt = setpoints["desired_alt"] desired_speed = setpoints["desired_speed"] if setpoints["hdg_mode"] == "hdg": # if we're in heading mode, just use the desired heading. this is mostly unchanged from the previous iteration desired_hdg = setpoints["desired_hdg"] heading_error = get_angle_difference(desired_hdg, current_hdg) heading_error_PID.update(heading_error) elif setpoints["hdg_mode"] == "d_to": # if we're in direct-to mode, calculate the cross-track error and update the xte_PID. # I am using xte to mean cross-track error/distance xte = cross_track_distance((posi[0], posi[1]), d_to_start_coords, d_to_target_coords) xte_PID.update(xte) # calculate the heading correction based on the xte_PID output heading_correction = xte_PID.output # this is essentially saying for 1 km of cross-track error, we want to correct by 30 degrees heading_correction = heading_correction * 30 # limit the heading correction to -45 to 45 degrees heading_correction = normalize(heading_correction, -45, 45) # calculate the track heading to the target waypoint. the track heading is the heading we would # need to fly to get to the target waypoint from the current position. it is used as an initial heading track_heading = get_bearing((posi[0], posi[1]), d_to_target_coords) # adjust the desired heading by the heading correction adjusted_desired_hdg = track_heading + heading_correction # make sure the adjusted desired heading is between 0 and 360 adjusted_desired_hdg = adjusted_desired_hdg % 360 # calculate the heading error based on the adjusted desired heading, this is no different than the hdg mode adjusted_heading_error = get_angle_difference(adjusted_desired_hdg, current_hdg) heading_error_PID.update(adjusted_heading_error) # log the current values print(f"track hdg: {track_heading:.1f}, heading corr: {heading_correction:.1f}, adj desired hdg: {adjusted_desired_hdg:.1f}, adj heading err: {adjusted_heading_error:.1f}") # write to a log file so we can make nice plots for the blog post log_line = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]},{posi[0]},{posi[1]},{posi[2]},{xte},{xte_PID.output},{track_heading},{heading_correction},{adjusted_desired_hdg},{adjusted_heading_error}" with open(current_run_log_filename, "a") as log_file: log_file.write(log_line + "\n") # outer loops first altitude_PID.SetPoint = desired_alt altitude_PID.update(current_altitude) speed_PID.SetPoint = desired_speed new_pitch_from_altitude = normalize(altitude_PID.output, -15, 15) new_roll_from_heading_error = normalize(heading_error_PID.output, -35, 35) pitch_PID.SetPoint = new_pitch_from_altitude roll_PID.SetPoint = new_roll_from_heading_error roll_PID.update(current_roll) speed_PID.update(current_asi) pitch_PID.update(current_pitch) new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1) new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1) new_thr_ctrl = normalize(speed_PID.output, min=0, max=1) previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0 previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0 previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0 # not currently functional - need to work on this # new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency # update the control positions # update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited)) update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl)) onground = -1 if onground == 1: print("on ground, not sending controls") else: if get_autopilot_enabled_from_redis(): # ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited] ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl] client.sendCTRL(ctrl) loop_end = datetime.now() loop_duration = loop_end - loop_start output = f"current values -- roll: {current_roll: 0.3f}, pitch: {current_pitch: 0.3f}, hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}" # output = output + "\n" + f"hdg error: {heading_error: 0.3f}" output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f}, ele: {new_ele_ctrl: 0.4f}, thr: {new_thr_ctrl:0.4f}" output = output + "\n" + f"PID outputs -- altitude: {altitude_PID.output: 0.4f}, pitch: {pitch_PID.output: 0.4f}, ail: {roll_PID.output: 0.3f}, hdg: {heading_error_PID.output: 0.3f}" output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM" output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms" if setpoints["hdg_mode"] == "d_to": output = output + "\n" + f"cross track error: {xte:0.3f} km" print(output) previous_nav_mode = setpoints["hdg_mode"] previous_nav_target = setpoints["target_wpt"] sleep_until_next_tick(update_frequency) os.system('cls' if os.name == 'nt' else 'clear') if __name__ == "__main__": monitor()