Categories
Chrony Linux NTP Raspberry Pi

World’s Most Stable Raspberry Pi? 81% Better NTP with Thermal Management

I’ve written before about building microsecond-accurate NTP servers with Raspberry Pi and GPS PPS, and more recently about revisiting the setup in 2025. Both posts focused on the hardware setup and basic configuration to achieve sub-microsecond time synchronization using GPS Pulse Per Second (PPS) signals.

But there was a problem. Despite having a stable PPS reference, my NTP server’s frequency drift was exhibiting significant variation over time. After months (years) of monitoring the system with Grafana dashboards, I noticed something interesting: the frequency oscillations seemed to correlate with CPU temperature changes. The frequency would drift as the CPU heated up during the day and cooled down at night, even though the PPS reference remained rock-solid.

Like clockwork (no pun intended), I somehow get sucked back into trying to improve my setup every 6-8 weeks. This post is the latest on that never-ending quest.

This post details how I achieved an 81% reduction in frequency variability and 77% reduction in frequency standard deviation through a combination of CPU core pinning and thermal stabilization. Welcome to Austin’s Nerdy Things, where we solve problems that 99.999% of people (and 99% of datacenters) don’t have.

The Problem: Thermal-Induced Timing Jitter

Modern CPUs, including those in Raspberry Pis, use dynamic frequency scaling to save power and manage heat. When the CPU is idle, it runs at a lower frequency (and voltage). When load increases, it scales up. This is great for power efficiency, but terrible for precision timekeeping.

Why? Because timekeeping (with NTP/chronyd/others) relies on a stable system clock to discipline itself against reference sources. If the CPU frequency is constantly changing, the system clock’s tick rate varies, introducing jitter into the timing measurements. Even though my PPS signal was providing a mostly perfect 1-pulse-per-second reference, the CPU’s frequency bouncing around made it harder for chronyd to maintain a stable lock.

But here’s the key insight: the system clock is ultimately derived from a crystal oscillator, and crystal oscillator frequency is temperature-dependent. The oscillator sits on the board near the CPU, and as the CPU heats up and cools down throughout the day, so does the crystal. Even a few degrees of temperature change can shift the oscillator’s frequency by parts per million – exactly what I was seeing in my frequency drift graphs. The CPU frequency scaling was one factor, but the underlying problem was that temperature changes were affecting the crystal oscillator itself. By stabilizing the CPU temperature, I could stabilize the thermal environment for the crystal oscillator, keeping its frequency consistent.

Looking at my Grafana dashboard, I could see the frequency offset wandering over a range of about 1 PPM (parts per million) as the Pi warmed up and cooled down throughout the day. The RMS offset was averaging around 86 nanoseconds, which isn’t terrible (it’s actually really, really, really good), but I knew it could be better.

The Discovery

After staring at graphs for longer than I’d like to admit, I had an idea: what if I could keep the CPU at a constant temperature? If the temperature (and therefore the frequency) stayed stable, maybe the timing would stabilize too.

The solution came in two parts:

1. CPU core isolation – Dedicate CPU 0 exclusively to timing-critical tasks (chronyd and PPS interrupts) 2. Thermal stabilization – Keep the other CPUs busy to maintain a constant temperature, preventing frequency scaling

Here’s what happened when I turned on the thermal stabilization system on November 17, 2025 at 09:10 AM:

NTP Frequency Stability

Same ish graph but with CPU temp also plotted:

That vertical red line marks on the first plot when I activated the “time burner” process. Notice how the frequency oscillations immediately dampen and settle into a much tighter band? Let’s dive into how this works.

EDIT: 2025-11-25 I didn’t expect to wake up and see this at #2 on Hacker News – https://news.ycombinator.com/item?id=46042946

The Solution Part 1: CPU Core Pinning and Real-Time Priority

The first step is isolating timing-critical operations onto a dedicated CPU core. On a Raspberry Pi (4-core ARM), this means:

  • CPU 0: Reserved for chronyd and PPS interrupts
  • CPUs 1-3: Everything else, including our thermal load

I had AI (probably Claude Sonnet 4 ish, maybe 4.5) create a boot optimization script that runs at system startup:

#!/bin/bash
# PPS NTP Server Performance Optimization Script
# Sets CPU affinity, priorities, and performance governor at boot

set -e

echo "Setting up PPS NTP server performance optimizations..."

# Wait for system to be ready
sleep 5

# Set CPU governor to performance mode
echo "Setting CPU governor to performance..."
cpupower frequency-set -g performance

# Pin PPS interrupt to CPU0 (may fail if already pinned, that's OK)
echo "Configuring PPS interrupt affinity..."
echo 1 > /proc/irq/200/smp_affinity 2>/dev/null || echo "PPS IRQ already configured"

# Wait for chronyd to start
echo "Waiting for chronyd to start..."
timeout=30
while [ $timeout -gt 0 ]; do
    chronyd_pid=$(pgrep chronyd 2>/dev/null || echo "")
    if [ -n "$chronyd_pid" ]; then
        echo "Found chronyd PID: $chronyd_pid"
        break
    fi
    sleep 1
    ((timeout--))
done

if [ -z "$chronyd_pid" ]; then
    echo "Warning: chronyd not found after 30 seconds"
else
    # Set chronyd to real-time priority and pin to CPU 0
    echo "Setting chronyd to real-time priority and pinning to CPU 0..."
    chrt -f -p 50 $chronyd_pid
    taskset -cp 0 $chronyd_pid
fi

# Boost ksoftirqd/0 priority
echo "Boosting ksoftirqd/0 priority..."
ksoftirqd_pid=$(ps aux | grep '\[ksoftirqd/0\]' | grep -v grep | awk '{print $2}')
if [ -n "$ksoftirqd_pid" ]; then
    renice -n -10 $ksoftirqd_pid
    echo "ksoftirqd/0 priority boosted (PID: $ksoftirqd_pid)"
else
    echo "Warning: ksoftirqd/0 not found"
fi

echo "PPS NTP optimization complete!"

# Log current status
echo "=== Current Status ==="
echo "CPU Governor: $(cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor)"
echo "PPS IRQ Affinity: $(cat /proc/irq/200/effective_affinity_list 2>/dev/null || echo 'not readable')"
if [ -n "$chronyd_pid" ]; then
    echo "chronyd Priority: $(chrt -p $chronyd_pid)"
fi
echo "======================"

What this does:

  1. Performance Governor: Forces all CPUs to run at maximum frequency, disabling frequency scaling
  2. PPS IRQ Pinning: Ensures PPS interrupt (IRQ 200) is handled exclusively by CPU 0
  3. Chronyd Real-Time Priority: Sets chronyd to SCHED_FIFO priority 50, giving it preferential CPU scheduling
  4. Chronyd CPU Affinity: Pins chronyd to CPU 0 using taskset
  5. ksoftirqd Priority Boost: Improves priority of the kernel softirq handler on CPU 0

This script can be added to /etc/rc.local or as a systemd service to run at boot.

The Solution Part 2: PID-Controlled Thermal Stabilization

Setting the performance governor helps, but on a Raspberry Pi, even at max frequency, the CPU temperature will still vary based on ambient conditions and load. Temperature changes affect the CPU’s actual operating frequency due to thermal characteristics of the silicon.

The solution? Keep the CPU at a constant temperature using a PID-controlled thermal load. I call it the “time burner” (inspired by CPU burn-in tools, but with precise temperature control).

As a reminder of what we’re really doing here: we’re maintaining a stable thermal environment for the crystal oscillator. The RPi 3B’s 19.2 MHz oscillator is physically located near the CPU on the Raspberry Pi board, so by actively controlling CPU temperature, we’re indirectly controlling the oscillator’s temperature. Since the oscillator’s frequency is temperature-dependent (this is basic physics of quartz crystals), keeping it at a constant temperature means keeping its frequency stable – which is exactly what we need for precise timekeeping.

Here’s how it works:

  1. Read CPU temperature from /sys/class/thermal/thermal_zone0/temp
  2. PID controller calculates how much CPU time to burn to maintain target temperature (I chose 54°C)
  3. Three worker processes run on CPUs 1, 2, and 3 (avoiding CPU 0)
  4. Each worker alternates between busy-loop (MD5 hashing) and sleeping based on PID output
  5. Temperature stabilizes at the setpoint, preventing thermal drift

Here’s the core implementation (simplified for readability):

#!/usr/bin/env python3
import time
import argparse
import multiprocessing
import hashlib
import os
from collections import deque

class PIDController:
    """Simple PID controller with output clamping and anti-windup."""
    def __init__(self, Kp, Ki, Kd, setpoint, output_limits=(0, 1), sample_time=1.0):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.setpoint = setpoint
        self.output_limits = output_limits
        self.sample_time = sample_time
        self._last_time = time.time()
        self._last_error = 0.0
        self._integral = 0.0
        self._last_output = 0.0

    def update(self, measurement):
        """Compute new output of PID based on measurement."""
        now = time.time()
        dt = now - self._last_time

        if dt < self.sample_time:
            return self._last_output

        error = self.setpoint - measurement

        # Proportional
        P = self.Kp * error

        # Integral with anti-windup
        self._integral += error * dt
        I = self.Ki * self._integral

        # Derivative
        derivative = (error - self._last_error) / dt if dt > 0 else 0.0
        D = self.Kd * derivative

        # Combine and clamp
        output = P + I + D
        low, high = self.output_limits
        output = max(low, min(high, output))

        self._last_output = output
        self._last_error = error
        self._last_time = now

        return output

def read_cpu_temperature(path='/sys/class/thermal/thermal_zone0/temp'):
    """Return CPU temperature in Celsius."""
    with open(path, 'r') as f:
        temp_str = f.read().strip()
    return float(temp_str) / 1000.0

def burn_cpu(duration):
    """Busy-loop hashing for 'duration' seconds."""
    end_time = time.time() + duration
    m = hashlib.md5()
    while time.time() < end_time:
        m.update(b"burning-cpu")

def worker_loop(worker_id, cmd_queue, done_queue):
    """
    Worker process:
    - Pins itself to CPUs 1, 2, or 3 (avoiding CPU 0)
    - Burns CPU based on commands from main process
    """
    available_cpus = [1, 2, 3]
    cpu_to_use = available_cpus[worker_id % len(available_cpus)]
    os.sched_setaffinity(0, {cpu_to_use})
    print(f"Worker {worker_id} pinned to CPU {cpu_to_use}")

    while True:
        cmd = cmd_queue.get()
        if cmd is None:
            break

        burn_time, sleep_time = cmd
        burn_cpu(burn_time)
        time.sleep(sleep_time)
        done_queue.put(worker_id)

# Main control loop (simplified)
def main():
    target_temp = 54.0  # degrees Celsius
    control_window = 0.20  # 200ms cycle time

    pid = PIDController(Kp=0.05, Ki=0.02, Kd=0.0,
                        setpoint=target_temp,
                        sample_time=0.18)

    # Start 3 worker processes
    workers = []
    cmd_queues = []
    done_queue = multiprocessing.Queue()

    for i in range(3):
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=worker_loop, args=(i, q, done_queue))
        p.start()
        workers.append(p)
        cmd_queues.append(q)

    try:
        while True:
            # Measure temperature
            current_temp = read_cpu_temperature()

            # PID control: output is fraction of time to burn (0.0 to 1.0)
            output = pid.update(current_temp)

            # Convert to burn/sleep times
            burn_time = output * control_window
            sleep_time = control_window - burn_time

            # Send command to all workers
            for q in cmd_queues:
                q.put((burn_time, sleep_time))

            # Wait for workers to complete
            for _ in range(3):
                done_queue.get()

            print(f"Temp={current_temp:.2f}C, Output={output:.2f}, "
                  f"Burn={burn_time:.2f}s")

    except KeyboardInterrupt:
        for q in cmd_queues:
            q.put(None)
        for p in workers:
            p.join()

if __name__ == '__main__':
    main()

The full implementation includes a temperature filtering system to smooth out sensor noise and command-line arguments for tuning the PID parameters.

PID Tuning Notes:

  • Kp=0.05: Proportional gain – responds to current error
  • Ki=0.02: Integral gain – eliminates steady-state error
  • Kd=0.0: Derivative gain – set to zero because temperature changes slowly

The target temperature of 54°C was chosen empirically – high enough to keep the CPU from idling down, but low enough to avoid thermal throttling (which starts around 80°C on Raspberry Pi).

The Results: Numbers Don’t Lie

The improvement was immediately visible. Here are the statistics comparing performance before and after the optimization:

A note on ambient conditions: The Raspberry Pi lives in a project enclosure in our master bedroom (chosen for its decent GPS reception and ADS-B coverage for a new aircraft AR overlay app idea I’m working on also running on this Pi). While the time burner maintains the CPU die temperature at 54°C, the enclosure is still subject to ambient temperature swings. Room temperature cycles from a low of 66°F (18.9°C) at 5:15 AM to a peak of 72°F (22.2°C) at 11:30 AM – a 6°F daily swing from our heating schedule. The fact that we see such dramatic frequency stability improvements despite this ambient variation speaks to how effective the thermal control is. The CPU’s active heating overwhelms the environmental changes, maintaining consistent silicon temperature where it matters most.

Frequency Stability

Frequency Variability
MetricBeforeAfterImprovement
Mean RMS Offset85.44 ns43.54 ns49.0% reduction
Median RMS Offset80.13 ns37.93 ns52.7% reduction

The RMS offset is chronyd’s estimate of the timing uncertainty. Cutting this nearly in half means the system is maintaining significantly better time accuracy.

Setup Instructions

Want to replicate this? Here’s the step-by-step process:

Prerequisites

You need a working GPS PPS NTP server setup. If you don’t have one yet, follow my 2025 NTP guide first.

Step 0: Install Required Tools

sudo apt-get update
sudo apt-get install linux-cpupower python3 util-linux

Step 1: Create the Boot Optimization Script

Save the optimization script from earlier as /usr/local/bin/pps-optimize.sh:

sudo nano /usr/local/bin/pps-optimize.sh
# Paste the script content
sudo chmod +x /usr/local/bin/pps-optimize.sh

Step 2: Create Systemd Service for Boot Script

Create /etc/systemd/system/pps-optimize.service:

[Unit]
Description=PPS NTP Performance Optimization
After=chronyd.service
Requires=chronyd.service

[Service]
Type=oneshot
ExecStart=/usr/local/bin/pps-optimize.sh
RemainAfterExit=yes

[Install]
WantedBy=multi-user.target

Enable it:

sudo systemctl enable pps-optimize.service

Step 3: Install the Time Burner Script

Save the time burner Python script as /usr/local/bin/time_burner.py:

sudo nano /usr/local/bin/time_burner.py
# Paste the full time burner script
sudo chmod +x /usr/local/bin/time_burner.py

Step 4: Create Systemd Service for Time Burner

Create /etc/systemd/system/time-burner.service:

[Unit]
Description=CPU Thermal Stabilization for NTP
After=network.target

[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /usr/local/bin/time_burner.py -t 54.0 -n 3
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Enable and start it:

sudo systemctl enable time-burner.service
sudo systemctl start time-burner.service

Step 5: Verify the Setup

Check that everything is running:

# Verify CPU governor
cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor
# Should output: performance

# Check chronyd CPU affinity and priority
ps -eo pid,comm,psr,ni,rtprio | grep chronyd
# Should show psr=0 (CPU 0) and rtprio=50

# Check time burner processes
ps aux | grep time_burner
# Should show 4 processes (1 main + 3 workers)

# Monitor NTP performance
chronyc tracking

Example output from chronyc tracking:

Reference ID    : 50505300 (PPS)
Stratum         : 1
Ref time (UTC)  : Sun Nov 24 16:45:23 2025
System time     : 0.000000038 seconds fast of NTP time
Last offset     : -0.000000012 seconds
RMS offset      : 0.000000035 seconds
Frequency       : 1.685 ppm slow
Residual freq   : -0.001 ppm
Skew            : 0.002 ppm
Root delay      : 0.000000001 seconds
Root dispersion : 0.000010521 seconds
Update interval : 16.0 seconds
Leap status     : Normal

Notice the RMS offset of 35 nanoseconds – this is the kind of accuracy you can achieve with thermal stabilization.

Step 6: Monitor Over Time

(Topic for a future post)

Set up Grafana dashboards to monitor:

  • Frequency offset (PPM)
  • RMS offset (nanoseconds)
  • CPU temperature
  • System time offset

You’ll see the frequency stabilize within a few hours as the PID controller locks onto the target temperature.

Monitoring and Troubleshooting

Real-Time Monitoring

Watch chronyd tracking in real-time:

watch -n 1 "chronyc tracking"

Check time burner status:

sudo systemctl status time-burner.service

View time burner output:

sudo journalctl -u time-burner.service -f

Common Issues

Temperature overshoots or oscillates:

  • Adjust PID gains – reduce Kp if oscillating, increase Ki if steady-state error
  • Try different target temperatures (50-60°C range)

High CPU usage (obviously):

  • This is intentional – the time burner uses ~90% of 3 cores
  • Not suitable for Pis running other workloads

Chronyd not pinned to CPU 0:

  • Check that the optimization script runs after chronyd starts
  • Adjust the timing in the systemd service dependencies

Trade-offs and Considerations

Let’s be honest about the downsides:

Power Consumption

The time burner keeps 3 cores at ~30% average utilization. My Pi now draws about 3-4W continuously (vs 1-2W idle). Over a year, that’s an extra 15-25 kWh, or about $2-3 in electricity (depending on your rates).

Heat

Running at 54°C means the Pi is warm to the touch. This is well within safe operating temperature (thermal throttling doesn’t start until 80°C), but you might want to ensure adequate ventilation. I added a small heatsink just to be safe.

CPU Resources

You’re dedicating 3 of 4 cores to burning cycles. This is fine for a dedicated NTP server, but not suitable if you’re running other services on the same Pi. That said, I am also running the feeder to my new ADS-B aircraft visualization app on it. My readsb instance regularly gets to 1200 msg/s with 200+ aircraft.

Is It Worth It?

For 99.999% of use cases: absolutely not.

Most applications don’t need better than millisecond accuracy, let alone the 35-nanosecond RMS offset I’m achieving. Even for distributed systems, microsecond-level accuracy is typically overkill.

When this might make sense:

  • Precision timing applications (scientific instrumentation, radio astronomy)
  • Distributed systems research requiring tight clock synchronization
  • Network testing where timing precision affects results
  • Because you can (the best reason for any homelab project)

For me, this falls squarely in the “because you can” category. I had the monitoring infrastructure in place, noticed the thermal correlation, and couldn’t resist solving the problem. Plus, I learned a lot about PID control, CPU thermal characteristics, and Linux real-time scheduling.

Future Improvements

Some ideas I’m considering:

Adaptive PID Tuning

The current PID gains are hand-tuned for a specific ambient temperature range. The fairly low P value is to avoid spikes when some load on the Pi kicks up the temp. The I is a balance to keep long term “burn” relatively consistent. Implementing an auto-tuning algorithm (like Ziegler-Nichols) or adaptive PID could handle seasonal temperature variations better.

Hardware Thermal Control

Instead of software thermal control, I could add an actively cooled heatsink with PWM fan control. This might achieve similar temperature stability while using less power overall.

Oven-Controlled Crystal Oscillator (OCXO)

For the ultimate in frequency stability, replacing the Pi’s crystal with a temperature-controlled OCXO would eliminate thermal drift at the source. This is how professional timing equipment works. I do have a BH3SAP GPSDO sitting next to me (subject to a future post)… Then again, I’m the person who just wrote 4000 words about optimizing a $50 time server, so who am I kidding?

Conclusions

Through a combination of CPU core isolation and PID-controlled thermal stabilization, I achieved:

  • 81% reduction in frequency variability
  • 77% reduction in frequency standard deviation
  • 74% reduction in frequency range
  • 49% reduction in RMS offset

The system now maintains 38-nanosecond median RMS offset from the GPS PPS reference, with frequency drift that’s barely detectable in the noise. The CPU runs at a constant 54°C, and in steady state, the frequency offset stays within a tight ±0.14 PPM band (compared to ±0.52 PPM before optimization).

Was this necessary? No. Did I learn a bunch about thermal management, PID control, and Linux real-time scheduling? Yes. Would I do it again? Absolutely.

Resource

I did come across a “burn” script that was the basis for this thermal management. I can’t find it at the moment, but when I do I’ll link it here.

Related Posts

Further Reading

Have questions or suggestions? Drop a comment below. I’m particularly interested to hear if anyone has tried alternative thermal management approaches or has experience with OCXO modules for Raspberry Pi timing applications.

Thanks for reading, and happy timekeeping!

Categories
Python XPlane

Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot

Continuing from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I have a bit of momentum going on the Python X-Plane Autopilot stuff. There were a couple of items I wanted to complete before declaring the project “done”. The first is a “takeoff” button, which isn’t yet done. The other is the ability to fly along a track. That is now complete as of last night.

It is one thing to fly a bearing from A to B. That works fine as long as there is no wind in any direction. Flying a heading set by bearing is easy, and is part of the heading select & hold feature built out in a previous iteration of the code. To do so requires a “desired heading” and a heading error PID. The goal is to minimize the heading error, so we set the setpoint to 0. This controls a “roll” PID controller, which controls an aileron PID controller.

Each have limits in place to prevent excessive movement. For example, the roll PID controller is limited to +/- 30 degrees. Pitch is +/- 15 degrees.

To take this to the next step requires a few things:

  • A “track”, which is commonly defined as a start point and an end point. Both are simply lat/lon coordinate sets.
  • A current location, which is current lat/lon
  • A cross track distance (error), which is the distance the current location is off the track.
  • More PID loops, namely a cross track distance PID control, which, like the heading error PID, has a setpoint of 0 (i.e. the goal is to minimize the cross track distance).

Additionally, to make something actually useful, we need a “database” of navigation points. I parsed the fixed-width delimited text files of X-Plane for this, which was not fun.

To tie it all together, the web interface needs a way to type in a nav point, and a Direct To (D->To) button. Direct to is common in aviation GPS units to set a track from the location when the button is pushed to some point (VOR, fix, airport, etc). I’ve emulated that functionality.

Here’s the screenshot showing the example aircraft navigating to DVV, which is the KDEN VOR, from somewhere near KBJC. It shows a cross track error of 0.056 km, or 56 meters. ChatGPT helpfully generated the cross track error function with a resultant number in meters. I am comfortable with many kinds of units so I’ll leave this for now. The red line on the right map view is the aircraft’s interpretation of the direct to set at the same time as I clicked my autopilot’s Direct To button. There is a 4 kt wind coming from 029. I tested with greater, somewhat constant crosswinds in the 40-50 kt range with gusts of +/- 5 kts.

screenshot showing python autopilot code controlling xplane, flying aircraft along a track

The cross track error settles down to < 10 m after a minute or so. It is a little “lazy”. If it is on a track that is due east, and I flip the track to due west, it’ll dutifully do the 180, then attempt to rejoin the track but it overshoots a bit and settles down after ~1 oscillation. I could probably turn up the P on the xte PID and that would help. Below is a track of tacking off from KBJC and the doing direct to DVV. The X is where I clicked Direct to back to the BJC VOR, it turned left and rejoined the track, overshooting, then settling back in nicely.

plot showing the lat/lon track of the aircraft doing almost a complete 180 to rejoin the track going the opposite direction with python autopilot code

The Python Autopilot Code

I’m not going to pretend I wrote the cross track distance code, nor will I pretend to understand it. It works. The sign of the result depends on something along the lines of which side of the great circle line you are on. Luckily, aircraft (and boats and other things that follow tracks) don’t typically go from B to A. They go from A to B so this is consistent no matter which direction the track is facing. If they do need to go back to the start, the start becomes the end, if that makes sense.

This is the glorious cross track distance code along with some test code. Using Google Earth, the distance from the KBJC control tower to the centerline of 30R/12L should be ~0.44 km.

def cross_track_distance(point, start, end):
    # Convert all latitudes and longitudes from degrees to radians
    point_lat, point_lon = math.radians(point[0]), math.radians(point[1])
    start_lat, start_lon = math.radians(start[0]), math.radians(start[1])
    end_lat, end_lon = math.radians(end[0]), math.radians(end[1])

    # Calculate the angular distance from start to point
    # Ensure the argument is within the domain of acos
    acos_argument = math.sin(start_lat) * math.sin(point_lat) + math.cos(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon)
    acos_argument = max(-1, min(1, acos_argument))  # Clamp the argument between -1 and 1
    delta_sigma = math.acos(acos_argument)

    # Calculate the bearing from start to point and start to end
    theta_point = math.atan2(math.sin(point_lon - start_lon) * math.cos(point_lat),
                             math.cos(start_lat) * math.sin(point_lat) - math.sin(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon))
    theta_end = math.atan2(math.sin(end_lon - start_lon) * math.cos(end_lat),
                           math.cos(start_lat) * math.sin(end_lat) - math.sin(start_lat) * math.cos(end_lat) * math.cos(end_lon - start_lon))

    # Calculate the cross track distance
    cross_track_dist = math.asin(math.sin(delta_sigma) * math.sin(theta_point - theta_end))

    # Convert cross track distance to kilometers by multiplying by the Earth's radius (6371 km)
    cross_track_dist = cross_track_dist * 6371

    return cross_track_dist

kbjc_runways = {
	"30R/12L": {
		"Runway 12L": {
			"Latitude": 39.91529286666667,
			"Longitude": -105.12841313333334
		},
		"Runway 30R": {
			"Latitude": 39.901373883333335,
			"Longitude": -105.10191808333333
		}
	}
}


kbjc_runway_30R_start = (kbjc_runways["30R/12L"]["Runway 30R"]["Latitude"], kbjc_runways["30R/12L"]["Runway 30R"]["Longitude"])
kbjc_runway_30R_end = (kbjc_runways["30R/12L"]["Runway 12L"]["Latitude"], kbjc_runways["30R/12L"]["Runway 12L"]["Longitude"])
kbjc_tower = (test_locations["kbjc_tower"]["lat"], test_locations["kbjc_tower"]["lon"])
def test_cross_track_distance():
	print(f"start lat: {kbjc_runway_30R_start[0]}, start lon: {kbjc_runway_30R_start[1]}")
	print(f"end lat: {kbjc_runway_30R_end[0]}, end lon: {kbjc_runway_30R_end[1]}")
	print(f"tower lat: {kbjc_tower[0]}, tower lon: {kbjc_tower[1]}")

	dist = cross_track_distance(kbjc_tower, kbjc_runway_30R_start, kbjc_runway_30R_end)
	print(f"cross track distance: {dist}")

test_cross_track_distance()

And the rest of the magic happens in this block. If you recall from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I am using Redis as a store to hold the setpoints from the web app controlling the autopilot. It is fast enough that I don’t need to worry about latency when running at 10 Hz (the loop durations are consistently less than 30 milliseconds, with the bulk of that time being used to get and set data from X-Plane itself).

# get the setpoints from redis
			setpoints = get_setpoints_from_redis()

			# check if we have just changed to direct-to mode and if so, update the direct to coords. same if the target waypoint has changed
			if (setpoints["hdg_mode"] == "d_to" and previous_nav_mode != "d_to") or (setpoints["target_wpt"] != previous_nav_target):
				print("reason for entering this block")
				print(f"previous nav mode: {previous_nav_mode}, setpoints hdg mode: {setpoints['hdg_mode']}, previous nav target: {previous_nav_target}, setpoints target wpt: {setpoints['target_wpt']}")
				
				# d_to_start_coords is the current position, in lat,lon tuple
				d_to_start_coords = (posi[0], posi[1])

				# this function does a lookup in the nav_points dataframe to get the lat, lon of the target waypoint
				# it could certainly be optimized to use something faster than a pandas dataframe
				d_to_target_coords = get_nav_point_lat_lon(setpoints["target_wpt"])

				# reset xte PID
				xte_PID.clear()
				print(f"setting d_to_start_coords to {d_to_start_coords}")

			# these are unchanged
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			if setpoints["hdg_mode"] == "hdg":
				# if we're in heading mode, just use the desired heading. this is mostly unchanged from the previous iteration
				desired_hdg = setpoints["desired_hdg"]
				heading_error = get_angle_difference(desired_hdg, current_hdg)
				heading_error_PID.update(heading_error)
				
			elif setpoints["hdg_mode"] == "d_to":
				# if we're in direct-to mode, calculate the cross-track error and update the xte_PID.
				# I am using xte to mean cross-track error/distance
				xte = cross_track_distance((posi[0], posi[1]), d_to_start_coords, d_to_target_coords)
				xte_PID.update(xte)

				# calculate the heading correction based on the xte_PID output
				heading_correction = xte_PID.output

				# this is essentially saying for 1 km of cross-track error, we want to correct by 30 degrees
				heading_correction = heading_correction * 30

				# limit the heading correction to -45 to 45 degrees
				heading_correction = normalize(heading_correction, -45, 45)

				# calculate the track heading to the target waypoint. the track heading is the heading we would
				# need to fly to get to the target waypoint from the current position. it is used as an initial heading
				track_heading = get_bearing((posi[0], posi[1]), d_to_target_coords)

				# adjust the desired heading by the heading correction
				adjusted_desired_hdg = track_heading + heading_correction

				# make sure the adjusted desired heading is between 0 and 360
				adjusted_desired_hdg = adjusted_desired_hdg % 360

				# calculate the heading error based on the adjusted desired heading, this is no different than the hdg mode
				adjusted_heading_error = get_angle_difference(adjusted_desired_hdg, current_hdg)
				heading_error_PID.update(adjusted_heading_error)

				# log the current values
				print(f"track hdg: {track_heading:.1f}, heading corr: {heading_correction:.1f}, adj desired hdg: {adjusted_desired_hdg:.1f}, adj heading err: {adjusted_heading_error:.1f}")

				# write to a log file so we can make nice plots for the blog post
				log_line = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]},{posi[0]},{posi[1]},{posi[2]},{xte},{xte_PID.output},{track_heading},{heading_correction},{adjusted_desired_hdg},{adjusted_heading_error}"
				with open(current_run_log_filename, "a") as log_file:
					log_file.write(log_line + "\n")

Getting nav data from X-Plane data files

If you looked at the code closely, you will see the d_to_target_coords is set via a function called get_nav_point_lat_lon(nav_point). This looks up lat/lon in a file that was generated by parsing the X-Plane navigation data. In my previous job, I dealt with fixed width data formats. It is not fun. I originally tried to split based on spaces, but some of the nav point names have more than one space in them. I suppose I could just ignore the name but this is already written. This code parses the earth_nav.dat file, specifically for type 3, which is VOR/DME-like.

import pandas as pd

nav_filepath = r"C:\Users\Austin\Desktop\X-Plane 10\Resources\default data\earth_nav.dat"

raw_file_data = open(nav_filepath, 'r').readlines()

# remove first 3 lines
raw_file_data = raw_file_data[3:]

# remove last line
raw_file_data = raw_file_data[:-1]

# remove new line characters
raw_file_data = [line.replace('\n', '') for line in raw_file_data]

# Adjusting the function based on the new column map provided
def parse_nav_info(line):
    column_map = {
        'type': {'start': 0, 'end': 1},
        'lat_sign': {'start': 2, 'end': 3},
        'latitude': {'start': 3, 'end': 15},
        'lon_sign': {'start': 15, 'end': 16},
        'longitude': {'start': 16, 'end': 28},
        'elevation': {'start': 29, 'end': 35},
        'frequency': {'start': 36, 'end': 41},
        'range': {'start': 42, 'end': 45},
        'unknown': {'start': 46, 'end': 52},
        'identifier': {'start': 53, 'end': 56},
        'name': {'start': 56}  # Assuming end is not needed; take till the end of the line
    }

    nav_info = {}
    for column, column_info in column_map.items():
        start = column_info['start']
        end = column_info.get('end', None)
        value = line[start:end].strip()
        # print(f"attempting to parse {column} with value {value}")
        if column == 'latitude':
            lat_sign = line[column_map['lat_sign']['start']:column_map['lat_sign']['end']]
            lat_sign = -1 if lat_sign == '-' else 1
            value = lat_sign * float(value)
        elif column == 'longitude':
            lon_sign = line[column_map['lon_sign']['start']:column_map['lon_sign']['end']]
            lon_sign = -1 if lon_sign == '-' else 1
            value = lon_sign * float(value)
        elif column == 'elevation':
            value = int(value)
        elif column == 'frequency':
            value = int(value)
        elif column == 'range':
            value = int(value)
        nav_info[column] = value

    return nav_info

i = 0
data = []
types = []
for line in raw_file_data:
    line_type = int(line[0:2])
    if line_type != 3:
        continue

    line_data = parse_nav_info(line)
    data.append(line_data)

df = pd.DataFrame(data)
columns_of_interest = ['identifier','latitude','longitude','elevation', 'frequency', 'range', 'name']
df = df[columns_of_interest]
df.head()

df.to_pickle('nav_data.pkl')

The code to read the file and import is at the beginning of the python_autopilot.py file and is fairly straightforward:

# nav_data.pkl is a pandas dataframe. yes, this should use a dict or something faster.
nav_points = pickle.load(open("nav_data.pkl", "rb"))

def get_nav_point_lat_lon(id):
	nav_point = nav_points[nav_points["identifier"] == id]
	return nav_point["latitude"].values[0], nav_point["longitude"].values[0]

And for the Flask side of the house, we have index.html:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    function submitDirectTo() {
        const stationId = document.getElementById('target_wpt_input').value; // Grab the value from the input
        if (stationId) { // Check if the stationId is not empty
            adjustSetpoint('target_wpt', stationId); // Adjust the setpoint with the stationId as the value
            adjustSetpoint('hdg_mode', "d_to"); // Your existing function call
        } else {
            alert("Please enter a station ID.");
        }
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>
<style>
    body {
        font-family: Arial, sans-serif;
        margin: 20px;
        background-color: #f4f4f4;
        color: #333;
    }
    h1 {
        color: #005288;
    }
    ul {
        list-style-type: none;
        padding: 0;
    }
    ul li {
        margin: 10px 0;
    }
    button, input[type="text"] {
        padding: 10px;
        margin-top: 5px;
        border: 1px solid #ccc;
        border-radius: 5px;
        cursor: pointer;
        font-size: 16px;
    }
    button:hover {
        background-color: #ddd;
    }
    .button-group {
        margin-bottom: 20px;
    }
    #target_wpt_input {
        margin-right: 10px;
    }
</style>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
        <li>Heading Mode: <span id="hdg_mode">0</span></li>
        <li>Target Waypoint: <span id="target_wpt">BJC</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">OFF</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
        <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
        <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
        <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('hdg_mode', 'hdg')">Follow Heading</button>
        <input type="text" id="target_wpt_input" value="BJC">
        <button onclick="submitDirectTo()">Direct To</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
        <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>
    </div>

</body>
</html>

And the Flask app itself. I still think WebSockets are magic.

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_raw_value = r.get(label) if r.exists(label) else None
    if current_raw_value is not None:
        try:
            current_value = float(current_raw_value)
        except ValueError:
            current_value = current_raw_value

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    elif label == 'hdg_mode':
        new_value = adjustment
    elif label == 'target_wpt':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And here’s the full code of the autopilot itself. This will be transferred to GitHub for the next post. It is a bit long and needs to be split out into a number of separate files.

Conclusion

With a cross track distance known, it isn’t terribly difficult to convert that distance (error) into a heading adjustment. We now have a functioning autopilot that can control our aircraft to any VOR-like point. I could extend the X-Plane nav data parsing to read all points, but I’ll leave that as an exercise for the reader. The X-Plane Python Autopilot is almost complete – all that I have left on the checklist is a “takeoff” button. Hope you enjoyed the post!

References

This page was pretty helpful for realizing the code for determining the cross track distance would be complicated – http://www.movable-type.co.uk/scripts/latlong-vincenty.html. A good bit of the code was generated by ChatGPT as well.

Categories
Python XPlane

Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

I revisited my Python X-Plane autopilot a few weeks ago because it was pretty clunky for how to adjust setpoints and such. The job I started 1.5 years ago is exclusively Python, so I wanted to redo a bit.

Quick aside: For the new PC I just built – Ryzen 9 7900x, 2x32GB 6000 MHz, etc, X-Plane 10 was the 2nd “game” I installed on it. The first was Factorio (I followed Nilaus’ megabase in a book and have got to 5k SPM). Haven’t tried the newer sims yet, but I think they’ll still be somewhat limited by my RTX 2080 Super.

Well imagine my surprise when I woke up to 6x the normal daily hits by 7am. I checked the weblogs and found that my post was trending on ycombinator.com (Hacker News). So I am going to skip pretty much all background and just post the updated code for now, and will go back and clean up this post at some point.

Without further ado: here’s what the super basic dashboard looks like

Screenshot showing HTML autopilot interface running x-plane via Python

I have it separated into two main running python programs, the file that interacts with X-Plane itself, and the Flask part.

Check here for the updated code / next iteration where I add track following and direct-to functionality – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot.

Here’s the adjusted autopilot code to check with Redis for the setpoints every loop execution:

# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID

import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints = {
	"desired_roll": 0,
	"desired_pitch": 2,
	"desired_speed": 160,
	"desired_alt": 8000.0,
	"desired_hdg": 140,
	"autopilot_enabled": 0
}

for key in setpoints:
	# if the key exists in the redis db, use it
	# otherwise, set it
	if r.exists(key):
		setpoints[key] = float(r.get(key))
	else:
		r.set(key, setpoints[key])

update_interval = 0.10 #seconds
update_frequency = 1/update_interval

P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0

roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]

pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]

altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]

speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]

heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def normalize(value, min=-1, max=1):
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

def sleep_until_next_tick(update_frequency):
    # Calculate the update interval from the frequency
    update_interval = 1.0 / update_frequency

    # Get the current time
    current_time = time.time()

    # Calculate the time remaining until the next tick
    sleep_time = update_interval - (current_time % update_interval)

    # Sleep for the remaining time
    time.sleep(sleep_time)
	
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
	r = (b2 - b1) % 360.0
	# Python modulus has same sign as divisor, which is positive here,
	# so no need to consider negative case
	if r >= 180.0:
		r -= 360.0
	return r

# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
    """
    Calculates the bearing between two points.
    The formulae used is the following:
        θ = atan2(sin(Δlong).cos(lat2),
                  cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
    :Parameters:
      - `pointA: The tuple representing the latitude/longitude for the
        first point. Latitude and longitude must be in decimal degrees
      - `pointB: The tuple representing the latitude/longitude for the
        second point. Latitude and longitude must be in decimal degrees
    :Returns:
      The bearing in degrees
    :Returns Type:
      float
    """
    if (type(pointA) != tuple) or (type(pointB) != tuple):
        raise TypeError("Only tuples are supported as arguments")

    lat1 = math.radians(pointA[0])
    lat2 = math.radians(pointB[0])

    diffLong = math.radians(pointB[1] - pointA[1])

    x = math.sin(diffLong) * math.cos(lat2)
    y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
            * math.cos(lat2) * math.cos(diffLong))

    initial_bearing = math.atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = math.degrees(initial_bearing)
    compass_bearing = (initial_bearing + 360) % 360

    return compass_bearing

# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
    R = 6372800  # Earth radius in meters
    lat1, lon1 = coord1
    lat2, lon2 = coord2
    
    phi1, phi2 = math.radians(lat1), math.radians(lat2) 
    dphi       = math.radians(lat2 - lat1)
    dlambda    = math.radians(lon2 - lon1)
    
    a = math.sin(dphi/2)**2 + \
        math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    
    return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))

KBJC_lat = 39.9088056
KBJC_lon = -105.1171944

def write_position_to_redis(position):
	# position is a list of 7 floats
	# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
	position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
	position_str = ','.join([str(x) for x in position])
	r.set('position', position_str)
	for i in range(len(position_elements)):
		r.set(f"position/{position_elements[i]}", position[i])
	
	# position_str = ','.join([str(x) for x in position])
	# r.publish('position_updates', position_str)
		
def get_setpoints_from_redis():
	setpoints = {
		"desired_roll": 0,
		"desired_pitch": 2,
		"desired_speed": 160,
		"desired_alt": 8000.0,
		"desired_hdg": 140
	}
	for key in setpoints:
		# if the key exists in the redis db, use it
		# otherwise, set it
		if r.exists(key):
			setpoints[key] = float(r.get(key))
		else:
			r.set(key, setpoints[key])
	return setpoints

def get_autopilot_enabled_from_redis():
	if r.exists("autopilot_enabled"):
		return int(r.get("autopilot_enabled").decode('utf-8')) == 1

ele_positions = []
ail_positions = []
thr_positions = []

def update_control_position_history(ctrl):
	ele_positions.append(ctrl[0])
	ail_positions.append(ctrl[1])
	thr_positions.append(ctrl[3])

	# if the list is longer than 20, pop the first element
	if len(ele_positions) > 20:
		ele_positions.pop(0)
		ail_positions.pop(0)
		thr_positions.pop(0)

def monitor():
	with xpc.XPlaneConnect() as client:
		while True:
			loop_start = datetime.now()
			print(f"loop start - {loop_start}")
			posi = client.getPOSI()
			write_position_to_redis(posi)

			ctrl = client.getCTRL()

			bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			#desired_hdg = 116 #bearing_to_kbjc

			multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2

			current_roll = posi[4]
			current_pitch = posi[3]
			#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
			current_hdg = multi_DREFs[1][0]
			current_altitude = multi_DREFs[3][0]
			current_asi = multi_DREFs[0][0]
			onground = multi_DREFs[2][0]

			# get the setpoints from redis
			setpoints = get_setpoints_from_redis()
			desired_hdg = setpoints["desired_hdg"]
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			# outer loops first
			altitude_PID.SetPoint = desired_alt
			altitude_PID.update(current_altitude)

			heading_error = get_angle_difference(desired_hdg, current_hdg)
			heading_error_PID.update(heading_error)

			speed_PID.SetPoint = desired_speed
			

			new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
			new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
			# if new_pitch_from_altitude > 15:
			# 	new_pitch_from_altitude = 15
			# elif new_pitch_from_altitude < -15:
			# 	new_pitch_from_altitude = -15
			
			pitch_PID.SetPoint = new_pitch_from_altitude
			roll_PID.SetPoint = new_roll_from_heading_error

			roll_PID.update(current_roll)
			speed_PID.update(current_asi)
			pitch_PID.update(current_pitch)

			new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
			new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
			new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)

			previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
			previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
			previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0

			# not currently functional - need to work on this 
			# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency

			# update the control positions
			# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
			update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))

			onground = -1
			if onground == 1:
				print("on ground, not sending controls")
			else:
				if get_autopilot_enabled_from_redis():
					# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
					ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
					client.sendCTRL(ctrl)

			loop_end = datetime.now()
			loop_duration = loop_end - loop_start

			output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f},    hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
			output = output + "\n" + f"hdg error:                 {heading_error: 0.3f}"
			output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f},    ele: {new_ele_ctrl: 0.4f},   thr: {new_thr_ctrl:0.4f}"
			output = output + "\n" + f"PID outputs --   altitude: {altitude_PID.output: 0.4f},  pitch: {pitch_PID.output: 0.4f},   ail: {roll_PID.output: 0.3f},  hdg: {heading_error_PID.output: 0.3f}"
			output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
			output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
			print(output)
			sleep_until_next_tick(update_frequency)
			os.system('cls' if os.name == 'nt' else 'clear')



if __name__ == "__main__":
	monitor()

And the flask backend/front end. WebSockets are super cool – never used them before this. I was thinking I’d have to make a bunch of endpoints for every type of autopilot change I need. But this handles it far nicer:

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_value = float(r.get(label)) if r.exists(label) else 0.0

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And the http template:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">0</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
    <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    <br>
    <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
    <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    <br>
    <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
    <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    <br>
    <br>
    <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
    <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>

</body>
</html>

This should be enough to get you going. I’ll come back and clean it up later (both my kids just woke up – 1.5 and 3.5 years!)

Categories
Python

Controlling AsrockRack CPU & chassis fan speeds via ipmitool & PID loops

I have a 1U Datto NAS unit that I got for super cheap ($150 for 4x 3.5″ SAS3, D-1541, 4x32GB, 2400MHz, 2x 10GbaseT) that has worked quite well for me. The only downside, which is present among basically all 1U devices, is the noise.

During my research for how to control the tiny, high-RPM (like 8000+ RPM) fans, I stumbled across a thread on the FreeNAS forums – https://www.truenas.com/community/threads/script-to-control-fan-speed-in-response-to-hard-drive-temperatures.41294/. At the bottom of the post, there are a few other links to improvements. I ran the Perl logging scripts that made up the improvements a bit but I am no Perl expert so didn’t up implementing it.

I am not 100% sure of the default AsrockRack behavior but it seemed that if CPU temp >60C, both case and CPU fans would spike. My BlueIris instance sends videos over a couple times an hour, which would spike the fans, which would be annoying during my work from home weeks while I was in the basement, working.

The idea of using a PID loop to control fan speeds stuck with me though, and with the help of GitHub Copilot, I busted out a proof of concept in an hour or so during a particularly boring set of meetings. There is a very high probability this will work for Supermicro motherboards as well with only minor tweaks.

This is how well it works. Note the drop at the end is due to changing CPU setpoint from 57C to 55C. The temperature is very, very steady.

Screenshot of TrueNAS Core reporting page for CPU temp showing very constant CPU temperature due to PID fan control loop

Without further ado, below is the main script (you’ll also need PID.py, which I borrowed a few years ago for the Coding a pitch/roll/altitude autopilot in X-Plane with Python series of posts). It can be run via SSH for debugging purposes (it is no fun to edit python via nano over ssh on FreeBSD), or with native commands if it detects it is running on the target system.

import logging
import time
import PID
import datetime
import socket
import subprocess

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)

# don't care about debug/info level logging from either of these packages
loggers_to_set_to_warning = ['paramiko.transport', 'invoke']
for l in loggers_to_set_to_warning:
    logging.getLogger(l).setLevel(logging.WARNING)

user = "root"
password = r"password"
host = None # this is set via hostname detection below
DESIRED_CPU_TEMP = 55.0
DESIRED_MB_TEMP = 35.0
# HDD_TEMP_THRESHOLD = 44.0 # unused
MIN_FAN_PCT = 10.0
drives_to_monitor = ['da0', 'da1', 'da2', 'da3', 'nvme0','nvme1','nvme2']

# command to set fans via ipmitool
# ipmitool raw 0x3a 0x01 0x00 0x04 0x04 0x04 0x04 0x04 0x04 0x04
					     #cpu #fan #fan #fan #fan #fan #fan ????

BASE_RAW_IPMI = 'raw 0x3a 0x01'
INITIAL_STATE = [32,32,32,32,32,32,32,32] # all 32/64 = half speed
FAN_CURRENT_STATE = INITIAL_STATE

hostname = socket.gethostname()
if 'truenas' in hostname or hostname == 'truenas-datto.home.fluffnet.net':
    host = 'localhost'
    c = None
else:
    from fabric import Connection # importing here because freebsd 13 (or whatever truenas core 13 is based on lacks pip to install packages)
    host = "10.98.1.9"
    c = Connection(host, port=22, user=user, connect_kwargs={'password': password})

current_sensor_readings = {}
cpu_temp_sensor = "CPU Temp"
cpu_fan_sensor = "CPU_FAN1"
case_fans = ["FRNT_FAN2","FRNT_FAN3","FRNT_FAN4"]
mb_temp_sensor = "MB Temp"

def limiter(input_value, min_value, max_value):
    if input_value < min_value:
        return min_value
    elif input_value > max_value:
        return max_value
    else:
        return input_value
    
def set_fans_via_ipmi(connection):
    # raw_ipmi_cmd = construct_raw_ipmi_cmd() # not needed unless debug and remote
    # logging.info(raw_ipmi_cmd)
    if host == 'localhost':
        result = subprocess.run(['ipmitool', 'raw', '0x3a', '0x01',
                                 '0x'+FAN_CURRENT_STATE[0], 
                                 '0x'+FAN_CURRENT_STATE[1],
                                 '0x'+FAN_CURRENT_STATE[2],
                                 '0x'+FAN_CURRENT_STATE[3],
                                 '0x'+FAN_CURRENT_STATE[4],
                                 '0x'+FAN_CURRENT_STATE[5],
                                 '0x'+FAN_CURRENT_STATE[6],
                                 '0x'+FAN_CURRENT_STATE[7]], stdout=subprocess.PIPE)
    else:
        raw_ipmi_cmd = construct_raw_ipmi_cmd()
        result = connection.run('ipmitool ' + raw_ipmi_cmd, hide=True)
    #logging.info(result.stdout)

def scale_to_64ths(input_percent):
    result = input_percent / 100.0 * 64.0
    # prepend 0 to make it a hex value
    result_int = int(result)
    result_str = str(result_int)
    if len(result_str) == 1:
        result_str = '0' + result_str # turn a 0x1 into a 0x01
    return result_str

def adjust_cpu_fan_setpoint(hex_value_64ths):
    FAN_CURRENT_STATE[0] = hex_value_64ths

def adjust_case_fan_setpoint(hex_value_64ths):
    for i in range(len(FAN_CURRENT_STATE) - 1):
        FAN_CURRENT_STATE[i + 1] = hex_value_64ths

def construct_raw_ipmi_cmd():
    new_state = BASE_RAW_IPMI
    for i in range(len(FAN_CURRENT_STATE)):
        new_state = new_state + ' 0x' + str(FAN_CURRENT_STATE[i])
    return new_state

def populate_sensor_readings(sensor, value):
    current_sensor_readings[sensor] = value

def query_ipmitool(connection):
    if host == 'localhost':
        result = subprocess.run(['ipmitool', 'sensor'], stdout=subprocess.PIPE)
        result = result.stdout.decode('utf-8')
    else:
        result = connection.run('ipmitool sensor', hide=True).stdout
    for line in result.split('\n'):
        if line == '':
            break

        row_data = line.split('|')
        sensor_name = row_data[0].strip()
        sensor_value = row_data[1].strip()
        populate_sensor_readings(sensor_name, sensor_value)
        logging.debug(sensor_name + " = " + sensor_value)

def wait_until_top_of_second():
    # calculate time until next top of second
    sleep_seconds = 1 - (time.time() % 1)
    time.sleep(sleep_seconds)

def get_drive_temp(connection, drive):
    ###########################################
    # this is copilot generated, and untested #
    # not sure about row_data[0] stuff        #
    ###########################################
    if host == 'localhost':
        result = subprocess.run(['smartctl', '-A', '/dev/' + drive], stdout=subprocess.PIPE)
        result = result.stdout.decode('utf-8')
    else:
        result = connection.run('smartctl -A /dev/' + drive, hide=True).stdout
    for line in result.split('\n'):
        if line == '':
            break

        row_data = line.split()
        if len(row_data) < 10:
            continue
        if row_data[0] == '194':
            drive_temp = row_data[9]
            logging.info(drive + " = " + drive_temp)

def query_drive_temps(connection):
    for drive in drives_to_monitor:
        get_drive_temp(connection, drive)

# tune these values. the first one is the most important and basically is the multiplier for
# how much you want the fans to run in proportion to the actual-setpoint delta.
# example: if setpoint is 55 and actual is 59, the delta is 4, which is multiplied by 4 for
# 16 output, which if converted to 64ths would be 25% fan speed.
# the 2nd parameter is the integral, which is a cumulative error counter of sorts.
# the 3rd parameter is derivative, which should probably be set to 0 (if tuned correctly, it prevents over/undershoot)
cpu_pid = PID.PID(4.0, 2.5, 0.1)
cpu_pid.SetPoint = DESIRED_CPU_TEMP

mb_pid = PID.PID(2.5, 1.5, 0.1)
mb_pid.SetPoint = DESIRED_MB_TEMP

wait_until_top_of_second()

# set last_execution to now minus one minute to force first execution
last_execution = datetime.datetime.now() - datetime.timedelta(minutes=1)

while(True):
    if datetime.datetime.now().minute != last_execution.minute:
        # TODO: get drive temps
        logging.info("getting drive temps")

    query_ipmitool(c)
    cpu_temp = float(current_sensor_readings[cpu_temp_sensor])
    mb_temp = float(current_sensor_readings[mb_temp_sensor])

    cpu_pid.update(cpu_temp)
    mb_pid.update(mb_temp)
    
    logging.info(f'CPU: {cpu_temp:5.2f} MB: {mb_temp:5.2f} CPU PID: {cpu_pid.output:5.2f} MB PID: {mb_pid.output:5.2f}')
    
    # note negative multiplier!!
    cpu_fan_setpoint = scale_to_64ths(limiter(-1*cpu_pid.output,MIN_FAN_PCT,100))
    case_fan_setpoint = scale_to_64ths(limiter(-1*mb_pid.output,MIN_FAN_PCT,100))
    adjust_cpu_fan_setpoint(cpu_fan_setpoint)
    adjust_case_fan_setpoint(case_fan_setpoint)
    set_fans_via_ipmi(c)

    last_execution = datetime.datetime.now()
    wait_until_top_of_second()

As you can see, it is not quite complete. I still need to add the hard drive temp detection stuff to ramp case fans a bit if the drives get hot. Those NVMe drives sure get hot (especially the Intel P4800X I have in one of the PCIe slots – see Intel Optane P1600X & P4800X as ZFS SLOG/ZIL for details).

This is what the output looks like (keep in mind the -1 multiplier in the setpoint stuff!):

screenshot showing second-by-second output of the PID fan control loop keeping a very consistent 55C CPU temp

And here is a summary of the script provided by the ever helpful ChatGPT with some high-level summaries. I fed it the code and said “write a blog post about this”. I took out the intro paragraph but left the rest.

The Script Overview

This script leverages the PID controller – a control loop mechanism that calculates an “error” value as the difference between a measured process variable and a desired setpoint. It attempts to minimize the error by adjusting the process control inputs.

In this script, we are implementing a fan speed control system that reacts to temperature changes dynamically. Our desired setpoint is the optimal temperature we want to maintain for both the CPU (DESIRED_CPU_TEMP) and the motherboard (DESIRED_MB_TEMP).

Exploring the Script

The Python script begins by setting up the necessary libraries and logging. The logging library is used to log useful debug information, such as the current CPU temperature and fan speed, which can help you understand what’s happening in the script.

Next, we have a section where we define some constants, such as the desired temperatures and minimum fan speed percentage. It also defines a connection to the localhost or to a remote host, depending on the hostname.

It uses ipmitool – a utility for managing and configuring devices that support the Intelligent Platform Management Interface (IPMI) to control fan speeds.

The limiter() function ensures the fan speed remains within the predefined minimum and maximum thresholds. It’s important as it prevents the fan speed from reaching potentially harmful levels.

The script also includes several functions to set and adjust fan speeds, as well as to construct the appropriate ipmitool command. One thing to note is that the fan speeds are set using hexadecimal values, so there are functions to convert the desired fan speed percentages to hexadecimal.

A very useful function is query_ipmitool(). This function runs the ipmitool command, gets the current sensor readings, and stores them in the current_sensor_readings dictionary for further processing.

The script utilizes two PID controllers, cpu_pid for the CPU and mb_pid for the motherboard, with specific setpoints set to desired temperatures.

The core logic is inside the infinite loop at the end of the script. The loop constantly reads temperature sensor data and adjusts the fan speeds accordingly. The loop runs once every second, so it can respond quickly to changes in CPU and motherboard temperatures.

Conclusion

This script demonstrates a neat way of controlling fan speed in response to CPU and motherboard temperatures. It’s an effective approach to ensure that your system runs smoothly and without overheating, while minimizing noise.

Categories
AI Blog Admin

Using ChatGPT to fight spam on WordPress

Like all other WordPress blogs, this one attracts a good number of spam comments. I get usually 5-10 per day, but yesterday there were like 30. Almost all of them contain Cyrillic characters:

screenshot showing spam comments containing cyrillic characters

Since I specify that all comments are held until approved, that means I need to either approve or trash or spam every comment.

Enter ChatGPT

I use ChatGPT (specifically GPT 4) for a number of minor coding tasks. I find it helpful. It is not perfect. That doesn’t mean it isn’t useful. I decided to have it ponder this issue. I work with Python a lot at work and it’s typically my scripting language of choice. My initial request is as follows:

write a python script to log into a wordpress site as an admin, get the top 5 comments, see if there are any Cyrillic characters in them, and if there are, delete said comment

It was understandably unhappy about potentially being asked to “hack” a WordPress site, so I had to reassure it that I was the owner of said site:

yes, I have my own blog. I am the owner. I have the admin credentials. please proceed with how to delete those comments

It happily complied and spit out some very usable code:

chatgpt conversation around a python script to delete blog spam comments

After a bit more back and forth:

does this get comments in a pending state? I don't let them be published instantly because most of them are spam

I was informed there are 5 different comment states: approved, hold, spam, trash, unapproved.

perfect. can you please adjust the script to get the pending, unapproved, and hold comments. also make it top 20

It ran perfectly after copy + pasting the Python. Unfortunately I created an application password for my main login on this site and forgot to change the delete URL so it happily sent my application password and username to yourwebsite.com. After revoking that password and realizing there should be a base url:

please split out the site url (https://austinsnerdythings.com) from base_url for both retrieving the comments as well as deleting

I was left with a 100% functional script. This took 3-4 min of back and forth with ChatGPT 4.0. I definitely could’ve code this up myself with the basic structure in 15 minutes or so but I would’ve had to work out the json format for comments and all that. It is so much easier to just test out what ChatGPT provides and alter as necessary:

import requests
import json
import re

def has_cyrillic(text):
    return bool(re.search('[\u0400-\u04FF]', text))

site_url = "https://austinsnerdythings.com"
base_url = f"{site_url}/wp-json/wp/v2/comments?per_page=20&status="
statuses = ['pending', 'hold', 'unapproved']
auth = ('yourusername', 'yourpassword')

for status in statuses:
    url = base_url + status
    response = requests.get(url, auth=auth)
    comments = json.loads(response.text)

    cyrillic_comments = []

    for comment in comments:
        if has_cyrillic(comment['content']['rendered']):
            cyrillic_comments.append(comment)

    # delete comments with Cyrillic characters
    for comment in cyrillic_comments:
        delete_url = f"{site_url}/wp-json/wp/v2/comments/" + str(comment['id'])
        response = requests.delete(delete_url, auth=auth)
        if response.status_code == 200:
            print(f"Successfully deleted comment with id {comment['id']}")
        else:
            print(f"Failed to delete comment with id {comment['id']}. Response code: {response.status_code}")

Finishing touches

The other finishing touches I did were as follows:

  • Created a user specific for comment moderation. I used the ‘Members’ plugin to create a very limited role (only permissions granted are the necessary ones: Moderate Comments, Read, Edit Posts, Edit Others’ Posts, Edit Published Posts) and assigned said user to it. This greatly limits the potential for abuse if the account password falls into the wrong hands.
  • Copied the script to the web host running the blog
  • Set it to be executed hourly via crontab

Now I have a fully automated script that deletes any blog comments with any Cyrillic characters!

You may be asking yourself why I don’t use Akismet or Recaptcha or anything like that. I found the speed tradeoff to not be worthwhile. They definitely slowed down my site for minimal benefit. It only took a couple minutes a day to delete the spam comments. But now it takes no time because it’s automated!

Here’s the link to the full ChatGPT conversation:

https://chat.openai.com/share/aad6a095-9b90-42c5-b1ca-de2a18828ba2

Results

I created a spam comment and ran the script (after adding a print line to show the comment). Here’s the output:

And the web logs showing the 3 status being retrieved via GET and the DELETE for the single spam comment:

I am quite satisfied with this basic solution. It took me far longer to type up this blog post than it did to get the script working.