Categories
AI Programming Python

Ditching Ultralytics: Training YOLOX for Aircraft Detection

I’ve been building SkySpottr, an AR app overlaying aircraft information on your phone’s screen, using your device’s location, orientation, and incoming aircraft data (ADS-B) to predict where planes should appear on screen, then uses a YOLO model to lock onto the actual aircraft and refine the overlay. YOLOv8 worked great for this… until I actually read the license.

Welcome to Austin’s Nerdy Things, where we train from scratch entire neural networks to avoid talking to lawyers.

The Problem with Ultralytics

YOLOvWhatver is excellent. Fast, accurate, easy to use, great documentation. But Ultralytics licenses it under AGPL-3.0, which means if you use it in a product, you either need to open-source your entire application or pay for a commercial license. For a side project AR app that I might eventually monetize? That’s a hard pass.

Enter YOLOX from Megvii (recommended by either ChatGPT or Claude, can’t remember which, as an alternative). MIT licensed. Do whatever you want with it. The catch? You have to train your own models from scratch instead of using Ultralytics’ pretrained weights and easy fine-tuning pipeline. I have since learned there are some pretrained models. I didn’t use them.

So training from scratch is what I did. Over a few late nights in December 2025, I went from zero YOLOX experience to running custom-trained aircraft detection models in my iOS app. Here’s how it went.

The Setup

Hardware: RTX 3090 on my Windows machine, COCO2017 dataset on network storage (which turned out to be totally fine for training speed), and way too many terminal windows open.

I started with the official YOLOX repo and the aircraft class from COCO2017. The dataset has about 3,000 training images with airplanes, which is modest but enough to get started.

git clone https://github.com/Megvii-BaseDetection/YOLOX
pip install -v -e .

The first training run failed immediately because I forgot to install YOLOX as a package. Classic. Then it failed again because I was importing a class that didn’t exist in the version I had. Claude (who was helping me through this, and hallucinated said class) apologized and fixed the import. We got there eventually.

Training Configs: Nano, Tiny, Small, and “Nanoish”

YOLOX has a nice inheritance-based config system. You create a Python file, inherit from a base experiment class, and override what you want. I ended up with four different configs:

  • yolox_nano_aircraft.py – The smallest. 0.9M params, 1.6 GFLOPs. Runs on anything.
  • yolox_tiny_aircraft.py – Slightly bigger with larger input size for small object detection.
  • yolox_small_aircraft.py – 5M params, 26 GFLOPs. The “serious” model.
  • yolox_nanoish_aircraft.py – My attempt at something between nano and tiny.

The “nanoish” config was my own creation where I tried to find a sweet spot. I bumped the width multiplier from 0.25 to 0.33 and… immediately got a channel mismatch error because 0.33 doesn’t divide evenly into the architecture. Turns out you can’t just pick arbitrary numbers. I am a noob at these things. Lesson learned.

After some back-and-forth, I settled on a config with 0.3125 width (which is 0.25 \* 1.25, mathematically clean) and 512×512 input. This gave me roughly 1.2M params – bigger than nano, smaller than tiny, and it actually worked.

Here’s the small model config – the one that ended up in production. The key decisions are width = 0.50 (2x wider than nano for better feature extraction), 640×640 input for small object detection, and full mosaic + mixup augmentation:

class Exp(MyExp):
    def __init__(self):
        super(Exp, self).__init__()

        # Model config - YOLOX-Small architecture
        self.num_classes = 1  # Single class: airplane
        self.depth = 0.33
        self.width = 0.50  # 2x wider than nano for better feature extraction

        # Input/output config - larger input helps small object detection
        self.input_size = (640, 640)
        self.test_size = (640, 640)
        self.multiscale_range = 5  # Training will vary from 480-800

        # Data augmentation
        self.mosaic_prob = 1.0
        self.mosaic_scale = (0.1, 2.0)
        self.enable_mixup = True
        self.mixup_prob = 1.0
        self.flip_prob = 0.5
        self.hsv_prob = 1.0

        # Training config
        self.warmup_epochs = 5
        self.max_epoch = 400
        self.no_aug_epochs = 100
        self.basic_lr_per_img = 0.01 / 64.0
        self.scheduler = "yoloxwarmcos"

    def get_model(self):
        from yolox.models import YOLOX, YOLOPAFPN, YOLOXHead

        in_channels = [256, 512, 1024]
        # Small uses standard convolutions (no depthwise)
        backbone = YOLOPAFPN(self.depth, self.width, in_channels=in_channels, act=self.act)
        head = YOLOXHead(self.num_classes, self.width, in_channels=in_channels, act=self.act)
        self.model = YOLOX(backbone, head)
        return self.model

And the nanoish config for comparison – note the depthwise=True and the width of 0.3125 (5/16) that I landed on after the channel mismatch debacle:

class Exp(MyExp):
    def __init__(self):
        super(Exp, self).__init__()

        self.num_classes = 1
        self.depth = 0.33
        self.width = 0.3125  # 5/16 - halfway between nano (0.25) and tiny (0.375)

        self.input_size = (512, 512)
        self.test_size = (512, 512)

        # Lighter augmentation than small - this model is meant to be fast
        self.mosaic_prob = 0.5
        self.mosaic_scale = (0.5, 1.5)
        self.enable_mixup = False

    def get_model(self):
        from yolox.models import YOLOX, YOLOPAFPN, YOLOXHead

        in_channels = [256, 512, 1024]
        backbone = YOLOPAFPN(self.depth, self.width, in_channels=in_channels,
                             act=self.act, depthwise=True)  # Depthwise = lighter
        head = YOLOXHead(self.num_classes, self.width, in_channels=in_channels,
                         act=self.act, depthwise=True)
        self.model = YOLOX(backbone, head)
        return self.model

Training is then just:

python tools/train.py -f yolox_small_aircraft.py -d 1 -b 16 --fp16 -c yolox_s.pth

The -c yolox_s.pth loads YOLOX’s pretrained COCO weights as a starting point (transfer learning). The -d 1 is one GPU, -b 16 is batch size 16 (about 8GB VRAM on the 3090 with fp16), and --fp16 enables mixed precision training.

The Small Object Problem

Here’s the thing about aircraft detection for an AR app: planes at cruise altitude look tiny. A 747-8 at 37,000 feet is maybe 20-30 pixels on your phone screen if you’re lucky, even with the 4x optical zoom of the newest iPhones (8x for the 12MP weird zoom mode). Standard YOLO models are tuned for reasonable-sized objects, not specks in the sky. The COCO dataset has aircraft that are reasonably sized, like when you’re sitting at your gate at an airport and take a picture of the aircraft 100 ft in front of you.

My first results were underwhelming. The nano model was detecting larger aircraft okay but completely missing anything at altitude. The evaluation metrics looked like this:

AP for airplane = 0.234
AR for small objects = 0.089

Not great. The model was basically only catching aircraft on approach or takeoff.

For the small config, I made some changes to help with tiny objects:

  • Increased input resolution to 640×640 (more pixels = more detail for small objects)
  • Enabled full mosaic and mixup augmentation (helps the model see varied object scales)
  • Switched from depthwise to regular convolutions (more capacity)
  • (I’ll be honest, I was leaning heavily on Claude for the ML-specific tuning decisions here)

This pushed the model to 26 GFLOPs though, which had me worried about phone performance.

Here’s what the small model’s accuracy looked like broken down by object size. You can see AP for small objects climbing from ~0.45 to ~0.65 over training, while large objects hit ~0.70. Progress, but small objects remain the hardest category – which tracks with the whole “specks in the sky” problem.

YOLOX-Small mAP by IoU threshold and object size

Will This Actually Run on a Phone?

The whole point of this exercise was to run inference on an iPhone. So here is some napkin math:

ModelGFLOPsEstimated Phone Inference
Nano1.6~15ms, smooth 30fps easy
Nanoish3.2~25ms, still good
Small26~80ms, might be sluggish
YOLOv8n (for reference)8.7~27ms

My app was already running YOLOv8n at 15fps with plenty of headroom. So theoretically even the small model should work, but nano/nanoish would leave more room for everything else the app needs to do.

The plan: train everything, compare accuracy, quantize for deployment, and see what actually works in practice.

Training Results (And a Rookie Mistake)

After letting things run overnight (300 epochs takes a while even on a 3090), here’s what I got:

The nanoish model at epoch 100 was already showing 94% detection rate on test images, beating the fully-trained nano model. And it wasn’t even done training yet.

Quick benchmark on 50 COCO test images with aircraft (RTX 3090 GPU inference – not identical to phone, but close enough for the smaller models to be representative):

ModelDetection RateAvg Detections/ImageAvg Inference (ms)FPS
YOLOv8n58.6%0.8233.629.7
YOLOX nano74.3%1.0414.071.4
YOLOX nanoish81.4%1.1415.066.9
YOLOX tiny91.4%1.2816.560.7
YOLOX small92.9%1.3017.457.4
Ground Truth1.40

YOLOv8n getting beaten by every single YOLOX variant while also being slower was… not what I expected. Here’s the mAP comparison across all the models over training – you can see the hierarchy pretty clearly:

mAP comparison across all YOLOX model variants

The big takeaway: more capacity = better accuracy, but with diminishing returns. The jump from nano to nanoish is huge, nanoish to small is solid, and tiny lands somewhere in between depending on the epoch. (You’ll notice two extra lines in the chart – a large model and a self-sourced variant. I kept training after this post’s story ends. More on the self-sourced pipeline later. You can also see the large model is clearly overfitting past epoch ~315 – loss keeps decreasing but mAP starts dropping. My first time overfitting a model.)

The nanoish model hit a nice sweet spot. Faster than YOLOv8n, better small object detection than pure nano, and still lightweight enough for mobile.

And here is the output from my plot_training.py script:

============================================================
SUMMARY
============================================================
Run                         Epochs   Final Loss    Best AP  Best AP50
------------------------------------------------------------
yolox_large_aircraft           391       0.6000     0.6620     0.8620
yolox_nano_aircraft            300       3.3000     0.4770     0.7390
yolox_nanoish_aircraft         142       4.3000     0.4390     0.7210
yolox_small_aircraft           302       2.2000     0.6360     0.8650
yolox_small_with_self_sou      400       1.4000     0.6420     0.8620
yolox_tiny_aircraft            300       2.5000     0.6060     0.8480
============================================================

====================================================================================================
mAP VALUES AT SPECIFIC EPOCHS
====================================================================================================
Run                           AP@280     AP50@280    APsmall@280     AP@290     AP50@290    APsmall@290     AP@299     AP50@299    APsmall@299
----------------------------------------------------------------------------------------------------
yolox_large_aircraft          0.6350       0.8410         0.6690     0.6390       0.8410         0.6750 0.6480(300)       0.8440         0.6780
yolox_nano_aircraft           0.4750       0.7360         0.4000     0.4740       0.7360         0.3970     0.4770       0.7380         0.4030
yolox_nanoish_aircraft           N/A          N/A            N/A        N/A          N/A            N/A        N/A          N/A            N/A
yolox_small_aircraft          0.5900       0.8440         0.5960     0.6230       0.8610         0.6340     0.6360       0.8630         0.6410
yolox_small_with_self_sou     0.5940       0.8430         0.5690     0.5900       0.8420         0.5660 0.5930(300)       0.8420         0.5630
yolox_tiny_aircraft           0.5800       0.8300         0.5650     0.5950       0.8340         0.5830     0.6060       0.8440         0.5780
====================================================================================================

But there was a problem I didn’t notice until later: my training dataset had zero images without aircraft in them. Every single training image contained at least one airplane. This is… not ideal if you want your model to learn what an airplane isn’t. More on that shortly.

How It Actually Works in the App

Before I get to results, here’s what the ML is actually doing in SkySpottr. The app combines multiple data sources to track aircraft:

  1. ADS-B data tells us where aircraft are in 3D space (lat, lon, altitude)
  2. Device GPS and orientation tell us where the phone is and which way it’s pointing
  3. Physics-based prediction places aircraft overlays on screen based on all the above

That prediction is usually pretty good, but phone sensors drift and aircraft positions are slightly delayed. So the overlays can be off by a couple degrees. This is where YOLO comes in.

The app runs the model on each camera frame looking for aircraft. When it finds one within a threshold distance of where the physics engine predicted an aircraft should be, it “snaps” the overlay to the actual detected position. The UI shows an orange circle around the aircraft and marks it as “SkySpottd” – confirmed via machine learning.

I call this “ML snap” mode. It’s the difference between “there’s probably a plane somewhere around here” and “that specific bright dot is definitely the aircraft.”

The model runs continuously on device, which is why inference time matters so much. Even at 15fps cap, that’s still 15 inference cycles per second competing with everything else the app needs to do (sensor fusion, WebSocket data, AR rendering, etc.). Early on I was seeing 130%+ CPU usage on my iPhone, which is not great for battery life. Every millisecond saved on inference is a win.

Getting YOLOX into CoreML

One thing the internet doesn’t tell you: YOLOX and Apple’s Vision framework don’t play nice together.

YOLOv8 exports to CoreML with a nice Vision-compatible interface. You hand it an image, it gives you detections. Easy. YOLOX expects different preprocessing – it wants pixel values in the 0-255 range (not normalized 0-1), and the output tensor layout is different.

The conversion pipeline goes PyTorch → TorchScript → CoreML. Here’s the core of it:

import torch
import coremltools as ct
from yolox.models import YOLOX, YOLOPAFPN, YOLOXHead

# Build model (same architecture as training config)
backbone = YOLOPAFPN(depth=0.33, width=0.50, in_channels=[256, 512, 1024], act="silu")
head = YOLOXHead(num_classes=1, width=0.50, in_channels=[256, 512, 1024], act="silu")
model = YOLOX(backbone, head)

# Load trained weights
ckpt = torch.load("yolox_small_best.pth", map_location="cpu", weights_only=False)
model.load_state_dict(ckpt["model"])
model.eval()
model.head.decode_in_inference = True  # Output pixel coords, not raw logits

# Trace and convert
dummy = torch.randn(1, 3, 640, 640)
traced = torch.jit.trace(model, dummy)
mlmodel = ct.convert(
    traced,
    inputs=[ct.TensorType(name="images", shape=(1, 3, 640, 640))],
    outputs=[ct.TensorType(name="output")],
    minimum_deployment_target=ct.target.iOS15,
    convert_to="mlprogram",
)
mlmodel.save("yolox_small_aircraft.mlpackage")

The decode_in_inference = True is crucial — without it, the model outputs raw logits and you’d need to implement the decode head in Swift. With it, the output is [1, N, 6] where 6 is [x_center, y_center, width, height, obj_conf, class_score] in pixel coordinates.

On the Swift side, Claude ended up writing a custom detector that bypasses the Vision framework entirely. Here’s the preprocessing — the part that was hardest to get right:

/// Convert pixel buffer to MLMultiArray [1, 3, H, W] with 0-255 range
private func preprocess(pixelBuffer: CVPixelBuffer) -> MLMultiArray? {
    // GPU-accelerated resize via Core Image
    let ciImage = CIImage(cvPixelBuffer: pixelBuffer)
    let scaleX = CGFloat(inputSize) / ciImage.extent.width
    let scaleY = CGFloat(inputSize) / ciImage.extent.height
    let scaledImage = ciImage.transformed(by: CGAffineTransform(scaleX: scaleX, y: scaleY))

    // Reuse pixel buffer from pool (memory leak fix #1)
    var resizedBuffer: CVPixelBuffer?
    CVPixelBufferPoolCreatePixelBuffer(kCFAllocatorDefault, pool, &resizedBuffer)
    guard let buffer = resizedBuffer else { return nil }
    ciContext.render(scaledImage, to: buffer)

    // Reuse pre-allocated MLMultiArray (memory leak fix #2)
    guard let array = inputArray else { return nil }

    CVPixelBufferLockBaseAddress(buffer, .readOnly)
    defer { CVPixelBufferUnlockBaseAddress(buffer, .readOnly) }

    let bytesPerRow = CVPixelBufferGetBytesPerRow(buffer)
    let pixels = CVPixelBufferGetBaseAddress(buffer)!.assumingMemoryBound(to: UInt8.self)
    let arrayPtr = array.dataPointer.assumingMemoryBound(to: Float.self)
    let channelStride = inputSize * inputSize

    // BGRA → RGB, keep 0-255 range (YOLOX expects unnormalized pixels)
    // Direct pointer access is ~100x faster than MLMultiArray subscript
    for y in 0..<inputSize {
        let rowOffset = y * bytesPerRow
        let yOffset = y * inputSize
        for x in 0..<inputSize {
            let px = rowOffset + x * 4
            let idx = yOffset + x
            arrayPtr[idx] = Float(pixels[px + 2])                      // R
            arrayPtr[channelStride + idx] = Float(pixels[px + 1])      // G
            arrayPtr[2 * channelStride + idx] = Float(pixels[px])      // B
        }
    }
    return array
}

The two key gotchas: (1) BGRA byte order from the camera vs RGB that the model expects, and (2) YOLOX wants raw 0-255 pixel values, not the 0-1 normalized range that most CoreML models expect. If you normalize, everything silently breaks — the model runs, returns garbage, and you spend an evening wondering why.

For deployment, I used CoreML’s INT8 quantization (coremltools.optimize.coreml.linear_quantize_weights). This shrinks the model by about 50% with minimal accuracy loss. The small model went from ~17MB to 8.7MB, and inference time improved slightly.

Real World Results (Round 1)

I exported the nanoish model and got it running in SkySpottr. The good news: it works. The ML snap feature locks onto aircraft, the orange verification circles appear, and inference is fast enough that I don’t notice any lag.

The less good news: false positives. Trees, parts of houses, certain cloud formations – the model occasionally thinks these are aircraft. Remember that rookie mistake about no negative samples? Yeah.

I later set up a 3-way comparison to visualize exactly this kind of failure. The three panels show my COCO-only trained model (red boxes), a later model trained on self-sourced images (green boxes – I’ll explain this pipeline shortly), and YOLO26-X as a ground truth oracle (right panel, no boxes means no detection). The COCO-only model confidently detects an “aircraft” that is… a building. The other two correctly ignore it.

False positive comparison - COCO-only model detects a building as aircraft

The app handles this gracefully because of the matching threshold. Random false positives in empty sky don’t trigger the snap because there’s no predicted aircraft nearby to match against. But when there’s a tree branch right next to where a plane should be, the model sometimes locks onto the wrong thing.

The even less good news: it still struggles with truly distant aircraft. A plane at 35,000 feet that’s 50+ miles away is basically a single bright pixel. No amount of ML is going to reliably detect that. For those, the app falls back on pure ADS-B prediction, which is usually good enough to get the overlay in the right general area.

But when it works, it works. I’ll show some examples of successful detections in the self-sourced section below.

The Memory Leak Discovery (Fun Debugging Tangent)

While testing the YOLOX integration, I was also trying to get RevenueCat working for subscriptions. Had the app running for about 20 minutes while I debugged the in-app purchase flow. Noticed it was getting sluggish, opened Instruments, and… yikes.

Base memory for the app is around 200MB. After 20 minutes of continuous use, it had climbed to 450MB. Classic memory leak pattern.

The culprit was AI induced, and AI resolved: it was creating a new CVPixelBuffer and MLMultiArray for every single frame. At 15fps, that’s 900 allocations per minute that weren’t getting cleaned up fast enough.

The fix was straightforward – use a CVPixelBufferPool for the resize buffers and pre-allocate a single MLMultiArray that gets reused. Memory now stays flat even after hours of use.

(The RevenueCat thing? I ended up ditching it entirely and going with native StoreKit2. RevenueCat is great, but keeping debug and release builds separate was more hassle than it was worth for a side project. StoreKit2 is actually pretty nice these days if you don’t need the analytics. I’m at ~80 downloads, and not a single purchase. First paid app still needs some fine tuning, clearly, on the whole freemium thing.)

Round 2: Retraining with Negative Samples

After discovering the false positive issue, I went back and retrained. This time I made sure to include images without aircraft – random sky photos, clouds, trees, buildings, just random COCO2017 stuff. The model needs to learn what’s NOT an airplane just as much as what IS one.

Here’s the extraction script that handles the negative sampling. The key insight: you need to explicitly tell the model what empty sky looks like:

def extract_airplane_dataset(split="train", negative_ratio=0.2, seed=42):
    """Extract airplane images from COCO, with negative samples."""
    with open(f"instances_{split}2017.json") as f:
        coco_data = json.load(f)

    # Find all images WITH airplanes
    airplane_image_ids = set()
    for ann in coco_data['annotations']:
        if ann['category_id'] == AIRPLANE_CATEGORY_ID:  # 5 in COCO
            airplane_image_ids.add(ann['image_id'])

    # Find images WITHOUT airplanes for negative sampling
    all_ids = {img['id'] for img in coco_data['images']}
    negative_ids = all_ids - airplane_image_ids

    # Add 20% negative images (no airplanes = teach model what ISN'T a plane)
    num_negatives = int(len(airplane_image_ids) * negative_ratio)
    sampled_negatives = random.sample(list(negative_ids), num_negatives)
    # ... copy images and annotations to output directory

I also switched from nanoish to the small model. The accuracy improvement on distant aircraft was worth the extra compute, and with INT8 quantization the inference time came in at around 5.6ms on an iPhone – way better than my napkin math predicted. Apple’s Neural Engine is impressive.

The final production model: YOLOX-Small, 640×640 input, INT8 quantized, ~8.7MB on disk. It runs at 15fps with plenty of headroom for the rest of the app on my iPhone 17 Pro.

Round 3: Self-Sourced Images and Closing the Loop

So the model works, but it was trained entirely on COCO2017 – airport tarmac photos, stock images, that kind of thing. My app is pointing at the sky from the ground. Those are very different domains.

I added a debug flag to SkySpottr for my phone that saves every camera frame where the model fires a detection. Just flip it on, walk around outside for a while, and the app quietly collects real-world training data. Over a few weeks of casual use, I accumulated about 2,000 images from my phone.

The problem: these images don’t have ground truth labels. I’m not going to sit there and manually draw bounding boxes on 2,000 sky photos. So I used YOLO26-X (Ultralytics’ latest and greatest, which I’m fine using as an offline tool since it never ships in the app) as a teacher model. Run it on all the collected images, take its high-confidence detections as pseudo-labels, convert to COCO annotation format, and now I have a self-sourced dataset to mix in with the original COCO training data.

Here’s the pseudo-labeling pipeline. First, run the teacher model on all collected images:

from ultralytics import YOLO

model = YOLO("yolo26x.pt")  # Big model, accuracy over speed

for img_path in tqdm(image_paths, desc="Processing images"):
    results = model(str(img_path), conf=0.5, verbose=False)
    boxes = results[0].boxes
    airplane_boxes = boxes[boxes.cls == AIRPLANE_CLASS_ID]

    for box in airplane_boxes:
        xyxy = box.xyxy[0].cpu().numpy().tolist()
        x1, y1, x2, y2 = xyxy
        detections.append({
            "bbox_xywh": [x1, y1, x2 - x1, y2 - y1],  # COCO format
            "confidence": float(box.conf[0]),
        })

Then convert those detections to COCO annotation format so YOLOX can train on them:

def convert_to_coco(detections):
    """Convert YOLO26 detections to COCO training format."""
    coco_data = {
        "images": [], "annotations": [],
        "categories": [{"id": 1, "name": "airplane", "supercategory": "vehicle"}],
    }

    for uuid, data in detections.items():
        img_path = Path(data["image_path"])
        width, height = Image.open(img_path).size

        if width > 1024 or height > 1024:  # Skip oversized images
            continue

        coco_data["images"].append({"id": image_id, "file_name": f"{uuid}.jpg",
                                     "width": width, "height": height})

        for det in data["detections"]:
            coco_data["annotations"].append({
                "id": ann_id, "image_id": image_id, "category_id": 1,
                "bbox": det["bbox_xywh"], "area": det["bbox_xywh"][2] * det["bbox_xywh"][3],
                "iscrowd": 0,
            })

    with open("instances_train.json", "w") as f:
        json.dump(coco_data, f)

Finally, combine both datasets in the training config using YOLOX’s ConcatDataset:

def get_dataset(self, cache=False, cache_type="ram"):
    from yolox.data import COCODataset, TrainTransform
    from yolox.data.datasets import ConcatDataset

    preproc = TrainTransform(max_labels=50, flip_prob=0.5, hsv_prob=1.0)

    # Original COCO aircraft dataset
    coco_dataset = COCODataset(data_dir=self.data_dir, json_file=self.train_ann,
                                img_size=self.input_size, preproc=preproc, cache=cache)

    # Self-sourced dataset (YOLO26-X validated)
    self_sourced = COCODataset(data_dir=self.self_sourced_dir, json_file=self.self_sourced_ann,
                                name="train", img_size=self.input_size, preproc=preproc, cache=cache)

    print(f"COCO aircraft images: {len(coco_dataset)}")
    print(f"Self-sourced images: {len(self_sourced)}")
    return ConcatDataset([coco_dataset, self_sourced])

Out of 2,000 images, YOLO26-X found aircraft in about 108 of them at a 0.5 confidence threshold – a 1.8% hit rate, which makes sense since most frames are just empty sky between detections. I filtered out anything over 1024px and ended up with a nice supplementary dataset of aircraft-from-the-ground images.

The 3-way comparison images I showed earlier came from this pipeline. Here’s what successful detections look like – the COCO-only model (red), self-sourced model (green), and YOLO26-X (right panel, shown at full resolution so you can see what we’re actually detecting):

Aircraft detection comparison - all models detecting a plane in clear sky

That’s maybe 30 pixels of airplane against blue sky, detected with 0.88 and 0.92 confidence by the two YOLOX variants.

And here’s one I particularly like – aircraft spotted through pine tree branches. Real-world conditions, not a clean test image. Both YOLOX models nail it, YOLO26-X misses at this confidence threshold:

Aircraft detection through pine tree branches

And a recent one from February 12, 2026 – a pair of what appear to be F/A-18s over Denver at 4:22 PM MST, captured at 12x zoom. The model picks up both jets at 73-75% confidence, plus the bird in the bottom-right at 77% (a false positive the app filters out via ADS-B matching). Not bad for specks against an overcast sky.

F/A-18 pair detected over Denver, CO - Feb 12, 2026

I also trained a full YOLOX-Large model (depth 1.0, width 1.0, 1024×1024 input) on the combined dataset, just to see how far I could push it. Too heavy for phone deployment, but useful for understanding the accuracy ceiling.

Conclusion

Was this worth it to avoid Ultralytics’ licensing? Since it took an afternoon and a couple evenings of vibe-coding, yes, it was not hard to switch. Not just because MIT is cleaner than AGPL, but because I learned a ton about how these models actually work. The Ultralytics ecosystem is so polished that it’s easy to treat it as a black box. Building from YOLOX forced me to understand some of the nuances, the training configs, and the tradeoffs between model size and accuracy.

Plus, I can now say I trained my own object detection model from scratch. That’s worth something at parties. Nerdy parties, anyway.

SkySpottr is live on the App Store if you want to see the model in action – point your phone at the sky and watch it lock onto aircraft in real-time.

The self-sourced pipeline is still running. Every time I use the app with the debug flag on, it collects more training data. The plan is to periodically retrain as the dataset grows – especially now that I’m getting images from different weather conditions, times of day, and altitudes. The COCO-only model was a solid start, but a model trained on actual ground-looking-up images of aircraft at altitude? That’s the endgame.

Categories
Python XPlane

Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot

Continuing from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I have a bit of momentum going on the Python X-Plane Autopilot stuff. There were a couple of items I wanted to complete before declaring the project “done”. The first is a “takeoff” button, which isn’t yet done. The other is the ability to fly along a track. That is now complete as of last night.

It is one thing to fly a bearing from A to B. That works fine as long as there is no wind in any direction. Flying a heading set by bearing is easy, and is part of the heading select & hold feature built out in a previous iteration of the code. To do so requires a “desired heading” and a heading error PID. The goal is to minimize the heading error, so we set the setpoint to 0. This controls a “roll” PID controller, which controls an aileron PID controller.

Each have limits in place to prevent excessive movement. For example, the roll PID controller is limited to +/- 30 degrees. Pitch is +/- 15 degrees.

To take this to the next step requires a few things:

  • A “track”, which is commonly defined as a start point and an end point. Both are simply lat/lon coordinate sets.
  • A current location, which is current lat/lon
  • A cross track distance (error), which is the distance the current location is off the track.
  • More PID loops, namely a cross track distance PID control, which, like the heading error PID, has a setpoint of 0 (i.e. the goal is to minimize the cross track distance).

Additionally, to make something actually useful, we need a “database” of navigation points. I parsed the fixed-width delimited text files of X-Plane for this, which was not fun.

To tie it all together, the web interface needs a way to type in a nav point, and a Direct To (D->To) button. Direct to is common in aviation GPS units to set a track from the location when the button is pushed to some point (VOR, fix, airport, etc). I’ve emulated that functionality.

Here’s the screenshot showing the example aircraft navigating to DVV, which is the KDEN VOR, from somewhere near KBJC. It shows a cross track error of 0.056 km, or 56 meters. ChatGPT helpfully generated the cross track error function with a resultant number in meters. I am comfortable with many kinds of units so I’ll leave this for now. The red line on the right map view is the aircraft’s interpretation of the direct to set at the same time as I clicked my autopilot’s Direct To button. There is a 4 kt wind coming from 029. I tested with greater, somewhat constant crosswinds in the 40-50 kt range with gusts of +/- 5 kts.

screenshot showing python autopilot code controlling xplane, flying aircraft along a track

The cross track error settles down to < 10 m after a minute or so. It is a little “lazy”. If it is on a track that is due east, and I flip the track to due west, it’ll dutifully do the 180, then attempt to rejoin the track but it overshoots a bit and settles down after ~1 oscillation. I could probably turn up the P on the xte PID and that would help. Below is a track of tacking off from KBJC and the doing direct to DVV. The X is where I clicked Direct to back to the BJC VOR, it turned left and rejoined the track, overshooting, then settling back in nicely.

plot showing the lat/lon track of the aircraft doing almost a complete 180 to rejoin the track going the opposite direction with python autopilot code

The Python Autopilot Code

I’m not going to pretend I wrote the cross track distance code, nor will I pretend to understand it. It works. The sign of the result depends on something along the lines of which side of the great circle line you are on. Luckily, aircraft (and boats and other things that follow tracks) don’t typically go from B to A. They go from A to B so this is consistent no matter which direction the track is facing. If they do need to go back to the start, the start becomes the end, if that makes sense.

This is the glorious cross track distance code along with some test code. Using Google Earth, the distance from the KBJC control tower to the centerline of 30R/12L should be ~0.44 km.

def cross_track_distance(point, start, end):
    # Convert all latitudes and longitudes from degrees to radians
    point_lat, point_lon = math.radians(point[0]), math.radians(point[1])
    start_lat, start_lon = math.radians(start[0]), math.radians(start[1])
    end_lat, end_lon = math.radians(end[0]), math.radians(end[1])

    # Calculate the angular distance from start to point
    # Ensure the argument is within the domain of acos
    acos_argument = math.sin(start_lat) * math.sin(point_lat) + math.cos(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon)
    acos_argument = max(-1, min(1, acos_argument))  # Clamp the argument between -1 and 1
    delta_sigma = math.acos(acos_argument)

    # Calculate the bearing from start to point and start to end
    theta_point = math.atan2(math.sin(point_lon - start_lon) * math.cos(point_lat),
                             math.cos(start_lat) * math.sin(point_lat) - math.sin(start_lat) * math.cos(point_lat) * math.cos(point_lon - start_lon))
    theta_end = math.atan2(math.sin(end_lon - start_lon) * math.cos(end_lat),
                           math.cos(start_lat) * math.sin(end_lat) - math.sin(start_lat) * math.cos(end_lat) * math.cos(end_lon - start_lon))

    # Calculate the cross track distance
    cross_track_dist = math.asin(math.sin(delta_sigma) * math.sin(theta_point - theta_end))

    # Convert cross track distance to kilometers by multiplying by the Earth's radius (6371 km)
    cross_track_dist = cross_track_dist * 6371

    return cross_track_dist

kbjc_runways = {
	"30R/12L": {
		"Runway 12L": {
			"Latitude": 39.91529286666667,
			"Longitude": -105.12841313333334
		},
		"Runway 30R": {
			"Latitude": 39.901373883333335,
			"Longitude": -105.10191808333333
		}
	}
}


kbjc_runway_30R_start = (kbjc_runways["30R/12L"]["Runway 30R"]["Latitude"], kbjc_runways["30R/12L"]["Runway 30R"]["Longitude"])
kbjc_runway_30R_end = (kbjc_runways["30R/12L"]["Runway 12L"]["Latitude"], kbjc_runways["30R/12L"]["Runway 12L"]["Longitude"])
kbjc_tower = (test_locations["kbjc_tower"]["lat"], test_locations["kbjc_tower"]["lon"])
def test_cross_track_distance():
	print(f"start lat: {kbjc_runway_30R_start[0]}, start lon: {kbjc_runway_30R_start[1]}")
	print(f"end lat: {kbjc_runway_30R_end[0]}, end lon: {kbjc_runway_30R_end[1]}")
	print(f"tower lat: {kbjc_tower[0]}, tower lon: {kbjc_tower[1]}")

	dist = cross_track_distance(kbjc_tower, kbjc_runway_30R_start, kbjc_runway_30R_end)
	print(f"cross track distance: {dist}")

test_cross_track_distance()

And the rest of the magic happens in this block. If you recall from the last post (Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets), I am using Redis as a store to hold the setpoints from the web app controlling the autopilot. It is fast enough that I don’t need to worry about latency when running at 10 Hz (the loop durations are consistently less than 30 milliseconds, with the bulk of that time being used to get and set data from X-Plane itself).

# get the setpoints from redis
			setpoints = get_setpoints_from_redis()

			# check if we have just changed to direct-to mode and if so, update the direct to coords. same if the target waypoint has changed
			if (setpoints["hdg_mode"] == "d_to" and previous_nav_mode != "d_to") or (setpoints["target_wpt"] != previous_nav_target):
				print("reason for entering this block")
				print(f"previous nav mode: {previous_nav_mode}, setpoints hdg mode: {setpoints['hdg_mode']}, previous nav target: {previous_nav_target}, setpoints target wpt: {setpoints['target_wpt']}")
				
				# d_to_start_coords is the current position, in lat,lon tuple
				d_to_start_coords = (posi[0], posi[1])

				# this function does a lookup in the nav_points dataframe to get the lat, lon of the target waypoint
				# it could certainly be optimized to use something faster than a pandas dataframe
				d_to_target_coords = get_nav_point_lat_lon(setpoints["target_wpt"])

				# reset xte PID
				xte_PID.clear()
				print(f"setting d_to_start_coords to {d_to_start_coords}")

			# these are unchanged
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			if setpoints["hdg_mode"] == "hdg":
				# if we're in heading mode, just use the desired heading. this is mostly unchanged from the previous iteration
				desired_hdg = setpoints["desired_hdg"]
				heading_error = get_angle_difference(desired_hdg, current_hdg)
				heading_error_PID.update(heading_error)
				
			elif setpoints["hdg_mode"] == "d_to":
				# if we're in direct-to mode, calculate the cross-track error and update the xte_PID.
				# I am using xte to mean cross-track error/distance
				xte = cross_track_distance((posi[0], posi[1]), d_to_start_coords, d_to_target_coords)
				xte_PID.update(xte)

				# calculate the heading correction based on the xte_PID output
				heading_correction = xte_PID.output

				# this is essentially saying for 1 km of cross-track error, we want to correct by 30 degrees
				heading_correction = heading_correction * 30

				# limit the heading correction to -45 to 45 degrees
				heading_correction = normalize(heading_correction, -45, 45)

				# calculate the track heading to the target waypoint. the track heading is the heading we would
				# need to fly to get to the target waypoint from the current position. it is used as an initial heading
				track_heading = get_bearing((posi[0], posi[1]), d_to_target_coords)

				# adjust the desired heading by the heading correction
				adjusted_desired_hdg = track_heading + heading_correction

				# make sure the adjusted desired heading is between 0 and 360
				adjusted_desired_hdg = adjusted_desired_hdg % 360

				# calculate the heading error based on the adjusted desired heading, this is no different than the hdg mode
				adjusted_heading_error = get_angle_difference(adjusted_desired_hdg, current_hdg)
				heading_error_PID.update(adjusted_heading_error)

				# log the current values
				print(f"track hdg: {track_heading:.1f}, heading corr: {heading_correction:.1f}, adj desired hdg: {adjusted_desired_hdg:.1f}, adj heading err: {adjusted_heading_error:.1f}")

				# write to a log file so we can make nice plots for the blog post
				log_line = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]},{posi[0]},{posi[1]},{posi[2]},{xte},{xte_PID.output},{track_heading},{heading_correction},{adjusted_desired_hdg},{adjusted_heading_error}"
				with open(current_run_log_filename, "a") as log_file:
					log_file.write(log_line + "\n")

Getting nav data from X-Plane data files

If you looked at the code closely, you will see the d_to_target_coords is set via a function called get_nav_point_lat_lon(nav_point). This looks up lat/lon in a file that was generated by parsing the X-Plane navigation data. In my previous job, I dealt with fixed width data formats. It is not fun. I originally tried to split based on spaces, but some of the nav point names have more than one space in them. I suppose I could just ignore the name but this is already written. This code parses the earth_nav.dat file, specifically for type 3, which is VOR/DME-like.

import pandas as pd

nav_filepath = r"C:\Users\Austin\Desktop\X-Plane 10\Resources\default data\earth_nav.dat"

raw_file_data = open(nav_filepath, 'r').readlines()

# remove first 3 lines
raw_file_data = raw_file_data[3:]

# remove last line
raw_file_data = raw_file_data[:-1]

# remove new line characters
raw_file_data = [line.replace('\n', '') for line in raw_file_data]

# Adjusting the function based on the new column map provided
def parse_nav_info(line):
    column_map = {
        'type': {'start': 0, 'end': 1},
        'lat_sign': {'start': 2, 'end': 3},
        'latitude': {'start': 3, 'end': 15},
        'lon_sign': {'start': 15, 'end': 16},
        'longitude': {'start': 16, 'end': 28},
        'elevation': {'start': 29, 'end': 35},
        'frequency': {'start': 36, 'end': 41},
        'range': {'start': 42, 'end': 45},
        'unknown': {'start': 46, 'end': 52},
        'identifier': {'start': 53, 'end': 56},
        'name': {'start': 56}  # Assuming end is not needed; take till the end of the line
    }

    nav_info = {}
    for column, column_info in column_map.items():
        start = column_info['start']
        end = column_info.get('end', None)
        value = line[start:end].strip()
        # print(f"attempting to parse {column} with value {value}")
        if column == 'latitude':
            lat_sign = line[column_map['lat_sign']['start']:column_map['lat_sign']['end']]
            lat_sign = -1 if lat_sign == '-' else 1
            value = lat_sign * float(value)
        elif column == 'longitude':
            lon_sign = line[column_map['lon_sign']['start']:column_map['lon_sign']['end']]
            lon_sign = -1 if lon_sign == '-' else 1
            value = lon_sign * float(value)
        elif column == 'elevation':
            value = int(value)
        elif column == 'frequency':
            value = int(value)
        elif column == 'range':
            value = int(value)
        nav_info[column] = value

    return nav_info

i = 0
data = []
types = []
for line in raw_file_data:
    line_type = int(line[0:2])
    if line_type != 3:
        continue

    line_data = parse_nav_info(line)
    data.append(line_data)

df = pd.DataFrame(data)
columns_of_interest = ['identifier','latitude','longitude','elevation', 'frequency', 'range', 'name']
df = df[columns_of_interest]
df.head()

df.to_pickle('nav_data.pkl')

The code to read the file and import is at the beginning of the python_autopilot.py file and is fairly straightforward:

# nav_data.pkl is a pandas dataframe. yes, this should use a dict or something faster.
nav_points = pickle.load(open("nav_data.pkl", "rb"))

def get_nav_point_lat_lon(id):
	nav_point = nav_points[nav_points["identifier"] == id]
	return nav_point["latitude"].values[0], nav_point["longitude"].values[0]

And for the Flask side of the house, we have index.html:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    function submitDirectTo() {
        const stationId = document.getElementById('target_wpt_input').value; // Grab the value from the input
        if (stationId) { // Check if the stationId is not empty
            adjustSetpoint('target_wpt', stationId); // Adjust the setpoint with the stationId as the value
            adjustSetpoint('hdg_mode', "d_to"); // Your existing function call
        } else {
            alert("Please enter a station ID.");
        }
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>
<style>
    body {
        font-family: Arial, sans-serif;
        margin: 20px;
        background-color: #f4f4f4;
        color: #333;
    }
    h1 {
        color: #005288;
    }
    ul {
        list-style-type: none;
        padding: 0;
    }
    ul li {
        margin: 10px 0;
    }
    button, input[type="text"] {
        padding: 10px;
        margin-top: 5px;
        border: 1px solid #ccc;
        border-radius: 5px;
        cursor: pointer;
        font-size: 16px;
    }
    button:hover {
        background-color: #ddd;
    }
    .button-group {
        margin-bottom: 20px;
    }
    #target_wpt_input {
        margin-right: 10px;
    }
</style>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
        <li>Heading Mode: <span id="hdg_mode">0</span></li>
        <li>Target Waypoint: <span id="target_wpt">BJC</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">OFF</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
        <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
        <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
        <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('hdg_mode', 'hdg')">Follow Heading</button>
        <input type="text" id="target_wpt_input" value="BJC">
        <button onclick="submitDirectTo()">Direct To</button>
    </div>
    <div class="button-group">
        <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
        <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>
    </div>

</body>
</html>

And the Flask app itself. I still think WebSockets are magic.

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_raw_value = r.get(label) if r.exists(label) else None
    if current_raw_value is not None:
        try:
            current_value = float(current_raw_value)
        except ValueError:
            current_value = current_raw_value

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    elif label == 'hdg_mode':
        new_value = adjustment
    elif label == 'target_wpt':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And here’s the full code of the autopilot itself. This will be transferred to GitHub for the next post. It is a bit long and needs to be split out into a number of separate files.

Conclusion

With a cross track distance known, it isn’t terribly difficult to convert that distance (error) into a heading adjustment. We now have a functioning autopilot that can control our aircraft to any VOR-like point. I could extend the X-Plane nav data parsing to read all points, but I’ll leave that as an exercise for the reader. The X-Plane Python Autopilot is almost complete – all that I have left on the checklist is a “takeoff” button. Hope you enjoyed the post!

References

This page was pretty helpful for realizing the code for determining the cross track distance would be complicated – http://www.movable-type.co.uk/scripts/latlong-vincenty.html. A good bit of the code was generated by ChatGPT as well.

Categories
Python XPlane

Adding some polish to the X-Plane Python Autopilot with Flask, Redis, and WebSockets

I revisited my Python X-Plane autopilot a few weeks ago because it was pretty clunky for how to adjust setpoints and such. The job I started 1.5 years ago is exclusively Python, so I wanted to redo a bit.

Quick aside: For the new PC I just built – Ryzen 9 7900x, 2x32GB 6000 MHz, etc, X-Plane 10 was the 2nd “game” I installed on it. The first was Factorio (I followed Nilaus’ megabase in a book and have got to 5k SPM). Haven’t tried the newer sims yet, but I think they’ll still be somewhat limited by my RTX 2080 Super.

Well imagine my surprise when I woke up to 6x the normal daily hits by 7am. I checked the weblogs and found that my post was trending on ycombinator.com (Hacker News). So I am going to skip pretty much all background and just post the updated code for now, and will go back and clean up this post at some point.

Without further ado: here’s what the super basic dashboard looks like

Screenshot showing HTML autopilot interface running x-plane via Python

I have it separated into two main running python programs, the file that interacts with X-Plane itself, and the Flask part.

Check here for the updated code / next iteration where I add track following and direct-to functionality – Adding track following (Direct To) with cross track error to the Python X-Plane Autopilot.

Here’s the adjusted autopilot code to check with Redis for the setpoints every loop execution:

# https://onion.io/2bt-pid-control-python/
# https://github.com/ivmech/ivPID

import sys
import os
import xpc
from datetime import datetime, timedelta
import PID
import time
import math, numpy
import redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints = {
	"desired_roll": 0,
	"desired_pitch": 2,
	"desired_speed": 160,
	"desired_alt": 8000.0,
	"desired_hdg": 140,
	"autopilot_enabled": 0
}

for key in setpoints:
	# if the key exists in the redis db, use it
	# otherwise, set it
	if r.exists(key):
		setpoints[key] = float(r.get(key))
	else:
		r.set(key, setpoints[key])

update_interval = 0.10 #seconds
update_frequency = 1/update_interval

P = 0.05
I = 0.01
D = 0
MAX_DEFLECTION_PER_SECOND = 2.0

roll_PID = PID.PID(P*2, I*2, D)
roll_PID.SetPoint = setpoints["desired_roll"]

pitch_PID = PID.PID(P, I, D)
pitch_PID.SetPoint = setpoints["desired_pitch"]

altitude_PID = PID.PID(P*2, P/2, D)
altitude_PID.SetPoint = setpoints["desired_alt"]

speed_PID = PID.PID(P, I, D)
speed_PID.SetPoint = setpoints["desired_speed"]

heading_error_PID = PID.PID(1,0.05,0.1)
heading_error_PID.SetPoint = 0 # need heading error to be 0

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def normalize(value, min=-1, max=1):
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

def sleep_until_next_tick(update_frequency):
    # Calculate the update interval from the frequency
    update_interval = 1.0 / update_frequency

    # Get the current time
    current_time = time.time()

    # Calculate the time remaining until the next tick
    sleep_time = update_interval - (current_time % update_interval)

    # Sleep for the remaining time
    time.sleep(sleep_time)
	
# https://rosettacode.org/wiki/Angle_difference_between_two_bearings#Python
def get_angle_difference(b1, b2):
	r = (b2 - b1) % 360.0
	# Python modulus has same sign as divisor, which is positive here,
	# so no need to consider negative case
	if r >= 180.0:
		r -= 360.0
	return r

# https://gist.github.com/jeromer/2005586
def get_bearing(pointA, pointB):
    """
    Calculates the bearing between two points.
    The formulae used is the following:
        θ = atan2(sin(Δlong).cos(lat2),
                  cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong))
    :Parameters:
      - `pointA: The tuple representing the latitude/longitude for the
        first point. Latitude and longitude must be in decimal degrees
      - `pointB: The tuple representing the latitude/longitude for the
        second point. Latitude and longitude must be in decimal degrees
    :Returns:
      The bearing in degrees
    :Returns Type:
      float
    """
    if (type(pointA) != tuple) or (type(pointB) != tuple):
        raise TypeError("Only tuples are supported as arguments")

    lat1 = math.radians(pointA[0])
    lat2 = math.radians(pointB[0])

    diffLong = math.radians(pointB[1] - pointA[1])

    x = math.sin(diffLong) * math.cos(lat2)
    y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1)
            * math.cos(lat2) * math.cos(diffLong))

    initial_bearing = math.atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = math.degrees(initial_bearing)
    compass_bearing = (initial_bearing + 360) % 360

    return compass_bearing

# https://janakiev.com/blog/gps-points-distance-python/
def haversine(coord1, coord2):
    R = 6372800  # Earth radius in meters
    lat1, lon1 = coord1
    lat2, lon2 = coord2
    
    phi1, phi2 = math.radians(lat1), math.radians(lat2) 
    dphi       = math.radians(lat2 - lat1)
    dlambda    = math.radians(lon2 - lon1)
    
    a = math.sin(dphi/2)**2 + \
        math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    
    return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))

KBJC_lat = 39.9088056
KBJC_lon = -105.1171944

def write_position_to_redis(position):
	# position is a list of 7 floats
	# position_elements = [lat, lon, alt, pitch, roll, yaw, gear_indicator]
	position_elements = ["lat", "lon", "alt", "pitch", "roll", "yaw", "gear_indicator"]
	position_str = ','.join([str(x) for x in position])
	r.set('position', position_str)
	for i in range(len(position_elements)):
		r.set(f"position/{position_elements[i]}", position[i])
	
	# position_str = ','.join([str(x) for x in position])
	# r.publish('position_updates', position_str)
		
def get_setpoints_from_redis():
	setpoints = {
		"desired_roll": 0,
		"desired_pitch": 2,
		"desired_speed": 160,
		"desired_alt": 8000.0,
		"desired_hdg": 140
	}
	for key in setpoints:
		# if the key exists in the redis db, use it
		# otherwise, set it
		if r.exists(key):
			setpoints[key] = float(r.get(key))
		else:
			r.set(key, setpoints[key])
	return setpoints

def get_autopilot_enabled_from_redis():
	if r.exists("autopilot_enabled"):
		return int(r.get("autopilot_enabled").decode('utf-8')) == 1

ele_positions = []
ail_positions = []
thr_positions = []

def update_control_position_history(ctrl):
	ele_positions.append(ctrl[0])
	ail_positions.append(ctrl[1])
	thr_positions.append(ctrl[3])

	# if the list is longer than 20, pop the first element
	if len(ele_positions) > 20:
		ele_positions.pop(0)
		ail_positions.pop(0)
		thr_positions.pop(0)

def monitor():
	with xpc.XPlaneConnect() as client:
		while True:
			loop_start = datetime.now()
			print(f"loop start - {loop_start}")
			posi = client.getPOSI()
			write_position_to_redis(posi)

			ctrl = client.getCTRL()

			bearing_to_kbjc = get_bearing((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			dist_to_kbjc = haversine((posi[0], posi[1]), (KBJC_lat, KBJC_lon))
			#desired_hdg = 116 #bearing_to_kbjc

			multi_DREFs = client.getDREFs(DREFs) #speed=0, mag hdg=1, onground=2

			current_roll = posi[4]
			current_pitch = posi[3]
			#current_hdg = posi[5] # this is true, need to use DREF to get mag ''
			current_hdg = multi_DREFs[1][0]
			current_altitude = multi_DREFs[3][0]
			current_asi = multi_DREFs[0][0]
			onground = multi_DREFs[2][0]

			# get the setpoints from redis
			setpoints = get_setpoints_from_redis()
			desired_hdg = setpoints["desired_hdg"]
			desired_alt = setpoints["desired_alt"]
			desired_speed = setpoints["desired_speed"]

			# outer loops first
			altitude_PID.SetPoint = desired_alt
			altitude_PID.update(current_altitude)

			heading_error = get_angle_difference(desired_hdg, current_hdg)
			heading_error_PID.update(heading_error)

			speed_PID.SetPoint = desired_speed
			

			new_pitch_from_altitude = normalize(altitude_PID.output, -10, 10)
			new_roll_from_heading_error = normalize(heading_error_PID.output, -25, 25)
			# if new_pitch_from_altitude > 15:
			# 	new_pitch_from_altitude = 15
			# elif new_pitch_from_altitude < -15:
			# 	new_pitch_from_altitude = -15
			
			pitch_PID.SetPoint = new_pitch_from_altitude
			roll_PID.SetPoint = new_roll_from_heading_error

			roll_PID.update(current_roll)
			speed_PID.update(current_asi)
			pitch_PID.update(current_pitch)

			new_ail_ctrl = normalize(roll_PID.output, min=-1, max=1)
			new_ele_ctrl = normalize(pitch_PID.output, min=-1, max=1)
			new_thr_ctrl = normalize(speed_PID.output, min=0, max=1)

			previous_ail_ctrl = ail_positions[-1] if len(ail_positions) > 0 else 0
			previous_ele_ctrl = ele_positions[-1] if len(ele_positions) > 0 else 0
			previous_thr_ctrl = thr_positions[-1] if len(thr_positions) > 0 else 0

			# not currently functional - need to work on this 
			# new_ail_ctrl_limited = previous_ail_ctrl + new_ail_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_ele_ctrl_limited = previous_ele_ctrl + new_ele_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency
			# new_thr_ctrl_limited = previous_thr_ctrl + new_thr_ctrl * MAX_DEFLECTION_PER_SECOND / update_frequency

			# update the control positions
			# update_control_position_history((new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited))
			update_control_position_history((new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl))

			onground = -1
			if onground == 1:
				print("on ground, not sending controls")
			else:
				if get_autopilot_enabled_from_redis():
					# ctrl = [new_ele_ctrl_limited, new_ail_ctrl_limited, 0.0, new_thr_ctrl_limited]
					ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, new_thr_ctrl]
					client.sendCTRL(ctrl)

			loop_end = datetime.now()
			loop_duration = loop_end - loop_start

			output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f},    hdg: {current_hdg:0.3f}, alt: {current_altitude:0.3f}, asi: {current_asi:0.3f}"
			output = output + "\n" + f"hdg error:                 {heading_error: 0.3f}"
			output = output + "\n" + f"new ctrl positions -- ail: {new_ail_ctrl: 0.4f},    ele: {new_ele_ctrl: 0.4f},   thr: {new_thr_ctrl:0.4f}"
			output = output + "\n" + f"PID outputs --   altitude: {altitude_PID.output: 0.4f},  pitch: {pitch_PID.output: 0.4f},   ail: {roll_PID.output: 0.3f},  hdg: {heading_error_PID.output: 0.3f}"
			output = output + "\n" + f"bearing to KBJC: {bearing_to_kbjc:3.1f}, dist: {dist_to_kbjc*0.000539957:0.2f} NM"
			output = output + "\n" + f"loop duration (ms): {loop_duration.total_seconds()*1000:0.2f} ms"
			print(output)
			sleep_until_next_tick(update_frequency)
			os.system('cls' if os.name == 'nt' else 'clear')



if __name__ == "__main__":
	monitor()

And the flask backend/front end. WebSockets are super cool – never used them before this. I was thinking I’d have to make a bunch of endpoints for every type of autopilot change I need. But this handles it far nicer:

from flask import Flask, render_template
from flask_socketio import SocketIO, emit
import redis

app = Flask(__name__)
socketio = SocketIO(app)
r = redis.StrictRedis(host='localhost', port=6379, db=0)

setpoints_of_interest = ['desired_hdg', 'desired_alt', 'desired_speed']
# get initial setpoints from Redis, send to clients

@app.route('/')
def index():
    return render_template('index.html')  # You'll need to create an HTML template

def update_setpoint(label, adjustment):
    # This function can be adapted to update setpoints and then emit updates via WebSocket
    current_value = float(r.get(label)) if r.exists(label) else 0.0

    if label == 'desired_hdg':
        new_value = (current_value + adjustment) % 360
    elif label == 'autopilot_enabled':
        new_value = adjustment
    else:
        new_value = current_value + adjustment

    r.set(label, new_value)
    # socketio.emit('update_setpoint', {label: new_value})  # Emit update to clients
    return new_value

@socketio.on('adjust_setpoint')
def handle_adjust_setpoint(json):
    label = json['label']
    adjustment = json['adjustment']
    # Your logic to adjust the setpoint in Redis and calculate new_value
    new_value = update_setpoint(label, adjustment)

    # Emit updated setpoint to all clients
    emit('update_setpoint', {label: new_value}, broadcast=True)

@socketio.on('connect')
def handle_connect():
    # Fetch initial setpoints from Redis
    initial_setpoints = {label: float(r.get(label)) if r.exists(label) else 0.0 for label in setpoints_of_interest}
    
    # Emit the initial setpoints to the connected client
    emit('update_setpoints', initial_setpoints)

if __name__ == '__main__':
    socketio.run(app)

And the http template:

<!DOCTYPE html>
<html>
<head>
    <title>Autopilot Interface</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket;  // Declare socket globally

    // Define adjustSetpoint globally
    function adjustSetpoint(label, adjustment) {
        socket.emit('adjust_setpoint', {label: label, adjustment: adjustment});
    }

    document.addEventListener('DOMContentLoaded', () => {
        socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port);
        
        socket.on('connect', () => {
            console.log("Connected to WebSocket server.");
        });

        // Listen for update_setpoints event to initialize the UI with Redis values
        socket.on('update_setpoints', function(setpoints) {
          for (const [label, value] of Object.entries(setpoints)) {
              const element = document.getElementById(label);
              if (element) {
                  element.innerHTML = value;
              }
          }
      });

        // Listen for update_setpoint events from the server
        socket.on('update_setpoint', data => {
            // Assuming 'data' is an object like {label: new_value}
            for (const [label, value] of Object.entries(data)) {
                // Update the displayed value on the webpage
                const element = document.getElementById(label);
                if (element) {
                    element.innerHTML = value;
                }
            }
        });
    });
</script>

</head>
<body>
    <h1>Autopilot Interface</h1>
    <p>Current Setpoints:</p>
    <ul>
        <li>Heading: <span id="desired_hdg">0</span></li>
        <li>Altitude: <span id="desired_alt">0</span></li>
        <li>Speed: <span id="desired_speed">0</span></li>
    </ul>
    <p>Autopilot: <span id="autopilot_enabled">0</span></p>

    <!-- Example buttons for adjusting setpoints -->
    <button onclick="adjustSetpoint('desired_hdg', -10)">-10 HDG</button>
    <button onclick="adjustSetpoint('desired_hdg', 10)">+10 HDG</button>
    <br>
    <button onclick="adjustSetpoint('desired_alt', 500)">+500 ALT</button>
    <button onclick="adjustSetpoint('desired_alt', -500)">-500 ALT</button>
    <br>
    <button onclick="adjustSetpoint('desired_speed', 5)">+5 KTS</button>
    <button onclick="adjustSetpoint('desired_speed', -5)">-5 KTS</button>
    <br>
    <br>
    <button onclick="adjustSetpoint('autopilot_enabled', 1)">Enable Autopilot</button>
    <button onclick="adjustSetpoint('autopilot_enabled', 0)">Disable Autopilot</button>

</body>
</html>

This should be enough to get you going. I’ll come back and clean it up later (both my kids just woke up – 1.5 and 3.5 years!)

Categories
Python

Controlling AsrockRack CPU & chassis fan speeds via ipmitool & PID loops

I have a 1U Datto NAS unit that I got for super cheap ($150 for 4x 3.5″ SAS3, D-1541, 4x32GB, 2400MHz, 2x 10GbaseT) that has worked quite well for me. The only downside, which is present among basically all 1U devices, is the noise.

During my research for how to control the tiny, high-RPM (like 8000+ RPM) fans, I stumbled across a thread on the FreeNAS forums – https://www.truenas.com/community/threads/script-to-control-fan-speed-in-response-to-hard-drive-temperatures.41294/. At the bottom of the post, there are a few other links to improvements. I ran the Perl logging scripts that made up the improvements a bit but I am no Perl expert so didn’t up implementing it.

I am not 100% sure of the default AsrockRack behavior but it seemed that if CPU temp >60C, both case and CPU fans would spike. My BlueIris instance sends videos over a couple times an hour, which would spike the fans, which would be annoying during my work from home weeks while I was in the basement, working.

The idea of using a PID loop to control fan speeds stuck with me though, and with the help of GitHub Copilot, I busted out a proof of concept in an hour or so during a particularly boring set of meetings. There is a very high probability this will work for Supermicro motherboards as well with only minor tweaks.

This is how well it works. Note the drop at the end is due to changing CPU setpoint from 57C to 55C. The temperature is very, very steady.

Screenshot of TrueNAS Core reporting page for CPU temp showing very constant CPU temperature due to PID fan control loop

Without further ado, below is the main script (you’ll also need PID.py, which I borrowed a few years ago for the Coding a pitch/roll/altitude autopilot in X-Plane with Python series of posts). It can be run via SSH for debugging purposes (it is no fun to edit python via nano over ssh on FreeBSD), or with native commands if it detects it is running on the target system.

import logging
import time
import PID
import datetime
import socket
import subprocess

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)

# don't care about debug/info level logging from either of these packages
loggers_to_set_to_warning = ['paramiko.transport', 'invoke']
for l in loggers_to_set_to_warning:
    logging.getLogger(l).setLevel(logging.WARNING)

user = "root"
password = r"password"
host = None # this is set via hostname detection below
DESIRED_CPU_TEMP = 55.0
DESIRED_MB_TEMP = 35.0
# HDD_TEMP_THRESHOLD = 44.0 # unused
MIN_FAN_PCT = 10.0
drives_to_monitor = ['da0', 'da1', 'da2', 'da3', 'nvme0','nvme1','nvme2']

# command to set fans via ipmitool
# ipmitool raw 0x3a 0x01 0x00 0x04 0x04 0x04 0x04 0x04 0x04 0x04
					     #cpu #fan #fan #fan #fan #fan #fan ????

BASE_RAW_IPMI = 'raw 0x3a 0x01'
INITIAL_STATE = [32,32,32,32,32,32,32,32] # all 32/64 = half speed
FAN_CURRENT_STATE = INITIAL_STATE

hostname = socket.gethostname()
if 'truenas' in hostname or hostname == 'truenas-datto.home.fluffnet.net':
    host = 'localhost'
    c = None
else:
    from fabric import Connection # importing here because freebsd 13 (or whatever truenas core 13 is based on lacks pip to install packages)
    host = "10.98.1.9"
    c = Connection(host, port=22, user=user, connect_kwargs={'password': password})

current_sensor_readings = {}
cpu_temp_sensor = "CPU Temp"
cpu_fan_sensor = "CPU_FAN1"
case_fans = ["FRNT_FAN2","FRNT_FAN3","FRNT_FAN4"]
mb_temp_sensor = "MB Temp"

def limiter(input_value, min_value, max_value):
    if input_value < min_value:
        return min_value
    elif input_value > max_value:
        return max_value
    else:
        return input_value
    
def set_fans_via_ipmi(connection):
    # raw_ipmi_cmd = construct_raw_ipmi_cmd() # not needed unless debug and remote
    # logging.info(raw_ipmi_cmd)
    if host == 'localhost':
        result = subprocess.run(['ipmitool', 'raw', '0x3a', '0x01',
                                 '0x'+FAN_CURRENT_STATE[0], 
                                 '0x'+FAN_CURRENT_STATE[1],
                                 '0x'+FAN_CURRENT_STATE[2],
                                 '0x'+FAN_CURRENT_STATE[3],
                                 '0x'+FAN_CURRENT_STATE[4],
                                 '0x'+FAN_CURRENT_STATE[5],
                                 '0x'+FAN_CURRENT_STATE[6],
                                 '0x'+FAN_CURRENT_STATE[7]], stdout=subprocess.PIPE)
    else:
        raw_ipmi_cmd = construct_raw_ipmi_cmd()
        result = connection.run('ipmitool ' + raw_ipmi_cmd, hide=True)
    #logging.info(result.stdout)

def scale_to_64ths(input_percent):
    result = input_percent / 100.0 * 64.0
    # prepend 0 to make it a hex value
    result_int = int(result)
    result_str = str(result_int)
    if len(result_str) == 1:
        result_str = '0' + result_str # turn a 0x1 into a 0x01
    return result_str

def adjust_cpu_fan_setpoint(hex_value_64ths):
    FAN_CURRENT_STATE[0] = hex_value_64ths

def adjust_case_fan_setpoint(hex_value_64ths):
    for i in range(len(FAN_CURRENT_STATE) - 1):
        FAN_CURRENT_STATE[i + 1] = hex_value_64ths

def construct_raw_ipmi_cmd():
    new_state = BASE_RAW_IPMI
    for i in range(len(FAN_CURRENT_STATE)):
        new_state = new_state + ' 0x' + str(FAN_CURRENT_STATE[i])
    return new_state

def populate_sensor_readings(sensor, value):
    current_sensor_readings[sensor] = value

def query_ipmitool(connection):
    if host == 'localhost':
        result = subprocess.run(['ipmitool', 'sensor'], stdout=subprocess.PIPE)
        result = result.stdout.decode('utf-8')
    else:
        result = connection.run('ipmitool sensor', hide=True).stdout
    for line in result.split('\n'):
        if line == '':
            break

        row_data = line.split('|')
        sensor_name = row_data[0].strip()
        sensor_value = row_data[1].strip()
        populate_sensor_readings(sensor_name, sensor_value)
        logging.debug(sensor_name + " = " + sensor_value)

def wait_until_top_of_second():
    # calculate time until next top of second
    sleep_seconds = 1 - (time.time() % 1)
    time.sleep(sleep_seconds)

def get_drive_temp(connection, drive):
    ###########################################
    # this is copilot generated, and untested #
    # not sure about row_data[0] stuff        #
    ###########################################
    if host == 'localhost':
        result = subprocess.run(['smartctl', '-A', '/dev/' + drive], stdout=subprocess.PIPE)
        result = result.stdout.decode('utf-8')
    else:
        result = connection.run('smartctl -A /dev/' + drive, hide=True).stdout
    for line in result.split('\n'):
        if line == '':
            break

        row_data = line.split()
        if len(row_data) < 10:
            continue
        if row_data[0] == '194':
            drive_temp = row_data[9]
            logging.info(drive + " = " + drive_temp)

def query_drive_temps(connection):
    for drive in drives_to_monitor:
        get_drive_temp(connection, drive)

# tune these values. the first one is the most important and basically is the multiplier for
# how much you want the fans to run in proportion to the actual-setpoint delta.
# example: if setpoint is 55 and actual is 59, the delta is 4, which is multiplied by 4 for
# 16 output, which if converted to 64ths would be 25% fan speed.
# the 2nd parameter is the integral, which is a cumulative error counter of sorts.
# the 3rd parameter is derivative, which should probably be set to 0 (if tuned correctly, it prevents over/undershoot)
cpu_pid = PID.PID(4.0, 2.5, 0.1)
cpu_pid.SetPoint = DESIRED_CPU_TEMP

mb_pid = PID.PID(2.5, 1.5, 0.1)
mb_pid.SetPoint = DESIRED_MB_TEMP

wait_until_top_of_second()

# set last_execution to now minus one minute to force first execution
last_execution = datetime.datetime.now() - datetime.timedelta(minutes=1)

while(True):
    if datetime.datetime.now().minute != last_execution.minute:
        # TODO: get drive temps
        logging.info("getting drive temps")

    query_ipmitool(c)
    cpu_temp = float(current_sensor_readings[cpu_temp_sensor])
    mb_temp = float(current_sensor_readings[mb_temp_sensor])

    cpu_pid.update(cpu_temp)
    mb_pid.update(mb_temp)
    
    logging.info(f'CPU: {cpu_temp:5.2f} MB: {mb_temp:5.2f} CPU PID: {cpu_pid.output:5.2f} MB PID: {mb_pid.output:5.2f}')
    
    # note negative multiplier!!
    cpu_fan_setpoint = scale_to_64ths(limiter(-1*cpu_pid.output,MIN_FAN_PCT,100))
    case_fan_setpoint = scale_to_64ths(limiter(-1*mb_pid.output,MIN_FAN_PCT,100))
    adjust_cpu_fan_setpoint(cpu_fan_setpoint)
    adjust_case_fan_setpoint(case_fan_setpoint)
    set_fans_via_ipmi(c)

    last_execution = datetime.datetime.now()
    wait_until_top_of_second()

As you can see, it is not quite complete. I still need to add the hard drive temp detection stuff to ramp case fans a bit if the drives get hot. Those NVMe drives sure get hot (especially the Intel P4800X I have in one of the PCIe slots – see Intel Optane P1600X & P4800X as ZFS SLOG/ZIL for details).

This is what the output looks like (keep in mind the -1 multiplier in the setpoint stuff!):

screenshot showing second-by-second output of the PID fan control loop keeping a very consistent 55C CPU temp

And here is a summary of the script provided by the ever helpful ChatGPT with some high-level summaries. I fed it the code and said “write a blog post about this”. I took out the intro paragraph but left the rest.

The Script Overview

This script leverages the PID controller – a control loop mechanism that calculates an “error” value as the difference between a measured process variable and a desired setpoint. It attempts to minimize the error by adjusting the process control inputs.

In this script, we are implementing a fan speed control system that reacts to temperature changes dynamically. Our desired setpoint is the optimal temperature we want to maintain for both the CPU (DESIRED_CPU_TEMP) and the motherboard (DESIRED_MB_TEMP).

Exploring the Script

The Python script begins by setting up the necessary libraries and logging. The logging library is used to log useful debug information, such as the current CPU temperature and fan speed, which can help you understand what’s happening in the script.

Next, we have a section where we define some constants, such as the desired temperatures and minimum fan speed percentage. It also defines a connection to the localhost or to a remote host, depending on the hostname.

It uses ipmitool – a utility for managing and configuring devices that support the Intelligent Platform Management Interface (IPMI) to control fan speeds.

The limiter() function ensures the fan speed remains within the predefined minimum and maximum thresholds. It’s important as it prevents the fan speed from reaching potentially harmful levels.

The script also includes several functions to set and adjust fan speeds, as well as to construct the appropriate ipmitool command. One thing to note is that the fan speeds are set using hexadecimal values, so there are functions to convert the desired fan speed percentages to hexadecimal.

A very useful function is query_ipmitool(). This function runs the ipmitool command, gets the current sensor readings, and stores them in the current_sensor_readings dictionary for further processing.

The script utilizes two PID controllers, cpu_pid for the CPU and mb_pid for the motherboard, with specific setpoints set to desired temperatures.

The core logic is inside the infinite loop at the end of the script. The loop constantly reads temperature sensor data and adjusts the fan speeds accordingly. The loop runs once every second, so it can respond quickly to changes in CPU and motherboard temperatures.

Conclusion

This script demonstrates a neat way of controlling fan speed in response to CPU and motherboard temperatures. It’s an effective approach to ensure that your system runs smoothly and without overheating, while minimizing noise.

Categories
Home Assistant Home Automation Python

Ultra efficient “air conditioner” (fans controlled with Home Assistant and Python) using cold outside air

Just getting this up as a draft now for a Reddit user.

In short, this Python script reads the temperatures of two different sensors (outside from an The Ambient Weather WS-2902C weather station and in our master bedroom with a Govee Bluetooth Thermometer), the temperature set point for a generic thermostat entity, does some logic, and turns a switch on or off, all with the Home Assistant API. The switch control two basic box fans that are set to blow air into our bedroom from outside. It runs every X minutes (currently set to 5). This method works great if nighttime temperatures drop below 70F before bedtime. We like the bedroom temp at 66F, so unless it gets below 70F by around 9PM, it probably won’t cool enough for us to be comfortable enough to fall asleep. My wife wakes up at 5:45am, me at 6:30am, pending what our 22 month old daughter thinks of that schedule, so we typically aim to be asleep by 10pm.

Today, 2022-05-14, was the day I got out the window AC. It will be in place for the rest of the summer.

Requirements:

  • A working Home Assistant installation (mine is Python venv install in a Ubuntu VM)
  • A bearer token authorization code for a/your Home Assistant user
  • A working MQTT installation
  • A switch controllable by Home Assistant
  • 1-2 box fans plugged into said switch controlled by Home Assistant

Here is what the Home Assistant control screen looks like. The buttons should be self explanatory. The generic thermostat entity doesn’t need to be on/active for this to work. It uses the set point for control purposes (set to 67.0F in the screenshot).

Home Assistant control screen for ultra efficient air conditioner system

I believe there is currently a logic bug with max cool not respecting the delta_temp variable. Other than that it works perfect. Below is a screenshot of the last 7 days showing the room cooling off nicely to the setpoint of 66F on nights 1-3 and 67F on nights 4-7. Switching a control device on and off every so often is a version of a bang-bang controller. If you look closely, you will notice that each cycle on and off results in a greater temperature drop, which is due to the colder outside air being blown in for the same duration regardless of delta T.

Master bedroom temperature with Home Assistant controlled fans blowing in cold air from outside. Setpoint was 66F for evening of 5/7-5/9 (first 3 nights) and 67F for the rest (next 4 nights).
Zoomed in view of the evening of 5/9 to the morning of 5/10. The temperature drops quickly to the setpoint of 66F and does not go much above. Not sure what the spike is right after 23:00. The outside temp starts at 65F for this same timeframe, dropping to 60 at 21:00 and 55 at 22:00, so a great night to use cold outside air for cooling (thus the “ultra efficient AC”.
import json
import datetime
import time
from dateutil import parser
from requests import get, post
import paho.mqtt.client as mqttClient
import logging

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG)

loggers_to_set_to_warning = ['urllib3.connectionpool']
for l in loggers_to_set_to_warning:
    logging.getLogger(l).setLevel(logging.WARNING)

delta_temp = 3.0
mqtt_host = "mqtt.example.com"
mqtt_port = 1883

base_url = "http://ha.example.come:8123/"
states_url = base_url + "api/states/"
switch_url = base_url + "api/services/switch"
bearer_token = "ey...Vw"
full_bearer_token = "Bearer " + bearer_token
request_headers = {
    "Authorization": full_bearer_token,
    "content-type": "application/json"
}
endpoints = ["climate.masterbedfancooling",
             "sensor.real_outside_temp"]
fan_switch_entity_id = "switch.fan_switch"
states = {}
climate_topic = "climate/fan_control_state"
current_state = "off"
last_state = None
desired_seconds_to_sleep = 300
max_cool_outdoor_temp_limit = 66
desired_temp = None
outside_temp = None
current_temp = None
next_fan_action_time = datetime.datetime.now()


def set_fan_switch_state(state):
    new_fan_state = None
    if state == "on":
        new_fan_state = "on"
    elif state == "off":
        new_fan_state = "off"
    else:
        logging.warn("requested fan state unknown")
        return

    entity_info = {"entity_id": fan_switch_entity_id}
    full_url = switch_url + "/turn_" + new_fan_state
    response = post(full_url, headers=request_headers,
                    data=json.dumps(entity_info))
    if response.status_code != 200:
        logging.error(
            f"attempted to set fan state to {new_fan_state} but encountered error with status code: {response.status_code}")
    else:
        logging.info(f"successfully set fan state to {new_fan_state}")


def set_state_from_mqtt_message(message):
    global current_state, next_fan_action_time
    if message == "max_cool":
        current_state = "max_cool"
    elif message == "normal_cool":
        current_state = "normal_cool"
    elif message == "off":
        current_state = "off"
    elif message == "on":
        current_state = "on"
    else:
        logging.error(f"unable to determine state, setting to off")
        current_state = "off"
    logging.info(f"current_state set to: {current_state}")
    next_fan_action_time = datetime.datetime.now()


def connect_mqtt():
    # Set Connecting Client ID
    client = mqttClient.Client("python_window_fan_control")
    #client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(mqtt_host, mqtt_port)
    return client


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        logging.info("Connected to MQTT Broker!")
    else:
        logging.info("Failed to connect, return code %d\n", rc)


def on_message(client, userdata, msg):
    logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    set_state_from_mqtt_message(msg.payload.decode())


def on_subscribe(client, userdata, mid, granted_qos):
    logging.info(f"subscribed to topic")


def set_fan_state(state):
    if state == "on":
        logging.info(f"setting fan state to on")
        set_fan_switch_state("on")
    elif state == "off":
        logging.info(f"setting fan state to off")
        set_fan_switch_state("off")


def get_and_set_temperatures():
    global desired_temp, current_temp, outside_temp
    logging.debug("executing loop")

    for endpoint in endpoints:
        full_url = states_url + endpoint
        response = get(full_url, headers=request_headers)
        parsed_json = json.loads(response.text)
        entity = parsed_json['entity_id']
        hvac_action = ""
        if endpoint == 'climate.masterbedfancooling':
            desired_temp = float(
                parsed_json['attributes']['temperature'])
            current_temp = float(
                parsed_json['attributes']['current_temperature'])
            hvac_action = parsed_json['attributes']['hvac_action']
        elif endpoint == 'sensor.real_outside_temp':
            outside_temp = float(parsed_json['state'])
        last_updated = parser.parse(parsed_json['last_updated'])
    logging.info(
        f"temps: current={current_temp}, desired={desired_temp}, outside={outside_temp}")
    if desired_temp == None or current_temp == None or outside_temp == None:
        logging.error(
            "one or more temps invalid, turning off switch and breaking execution")
        set_fan_state("off")


client = mqttClient.Client("window-fan-client")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(mqtt_host, mqtt_port)
client.subscribe(climate_topic)
client.loop_start()

while(True):
    # logging.info("loop")
    if datetime.datetime.now() > next_fan_action_time:
        logging.info("fan action time")
        if current_state == "off":
            new_fan_state = "off"
            logging.info(
                f"current_state is {current_state}, turning fan {new_fan_state}")
            set_fan_state("off")
        else:
            get_and_set_temperatures()

        if current_state == "max_cool":
            logging.info(
                f"current_state is {current_state}, call for max cooling")
            if outside_temp < max_cool_outdoor_temp_limit:
                logging.info(
                    f"able to max_cool with outside temp: {outside_temp}, lower than {max_cool_outdoor_temp_limit} ")
                set_fan_state("on")
            elif outside_temp < (current_temp - delta_temp):
                logging.info(
                    f"unable to max cool, but still can cool with outside: {outside_temp} and inside: {current_temp}")
                set_fan_state("on")
            else:
                logging.info("unable to cool at all, turning fan off")
                set_fan_state("off")
        elif current_state == "normal_cool":
            logging.info(
                f"current_state is {current_state}")
            if (current_temp > desired_temp):
                logging.info(
                    f"call for cooling. current: {current_temp}, desired: {desired_temp}")
                if (current_temp > (outside_temp - delta_temp)):
                    logging.info("can cool, turning fan on")
                    set_fan_state("on")
                else:
                    logging.info("can't cool, turning fan off")
                    set_fan_state("off")
            else:
                logging.info("no need for cooling, turning fans off")
                set_fan_state("off")
        elif current_state == "on":
            new_fan_state = "on"
            logging.info(
                f"current_state is {current_state}, turning fan {new_fan_state}")
            set_fan_state("on")

        last_state = current_state
        next_fan_action_time = datetime.datetime.now() \
            + datetime.timedelta(seconds=desired_seconds_to_sleep)
        logging.info(f"next fan action in {desired_seconds_to_sleep} seconds")
        logging.info("---------loop end-------------------")
    else:
        #logging.debug("not fan action time yet, sleeping 1s")
        time.sleep(0.25)
    # actual_seconds_to_sleep = desired_seconds_to_sleep - datetime.datetime.now().minute % desired_seconds_to_sleep
    # seconds_to_sleep = actual_minutes_to_sleep * 60.0
    # logging.info(f"sleeping {desired_seconds_to_sleep}s")
    # time.sleep(desired_seconds_to_sleep)