Categories
Home Assistant Home Automation Python

Ultra efficient “air conditioner” (fans controlled with Home Assistant and Python) using cold outside air

Just getting this up as a draft now for a Reddit user.

In short, this Python script reads the temperatures of two different sensors (outside from an The Ambient Weather WS-2902C weather station and in our master bedroom with a Govee Bluetooth Thermometer), the temperature set point for a generic thermostat entity, does some logic, and turns a switch on or off, all with the Home Assistant API. The switch control two basic box fans that are set to blow air into our bedroom from outside. It runs every X minutes (currently set to 5). This method works great if nighttime temperatures drop below 70F before bedtime. We like the bedroom temp at 66F, so unless it gets below 70F by around 9PM, it probably won’t cool enough for us to be comfortable enough to fall asleep. My wife wakes up at 5:45am, me at 6:30am, pending what our 22 month old daughter thinks of that schedule, so we typically aim to be asleep by 10pm.

Today, 2022-05-14, was the day I got out the window AC. It will be in place for the rest of the summer.

Requirements:

  • A working Home Assistant installation (mine is Python venv install in a Ubuntu VM)
  • A bearer token authorization code for a/your Home Assistant user
  • A working MQTT installation
  • A switch controllable by Home Assistant
  • 1-2 box fans plugged into said switch controlled by Home Assistant

Here is what the Home Assistant control screen looks like. The buttons should be self explanatory. The generic thermostat entity doesn’t need to be on/active for this to work. It uses the set point for control purposes (set to 67.0F in the screenshot).

Home Assistant control screen for ultra efficient air conditioner system

I believe there is currently a logic bug with max cool not respecting the delta_temp variable. Other than that it works perfect. Below is a screenshot of the last 7 days showing the room cooling off nicely to the setpoint of 66F on nights 1-3 and 67F on nights 4-7. Switching a control device on and off every so often is a version of a bang-bang controller. If you look closely, you will notice that each cycle on and off results in a greater temperature drop, which is due to the colder outside air being blown in for the same duration regardless of delta T.

Master bedroom temperature with Home Assistant controlled fans blowing in cold air from outside. Setpoint was 66F for evening of 5/7-5/9 (first 3 nights) and 67F for the rest (next 4 nights).
Zoomed in view of the evening of 5/9 to the morning of 5/10. The temperature drops quickly to the setpoint of 66F and does not go much above. Not sure what the spike is right after 23:00. The outside temp starts at 65F for this same timeframe, dropping to 60 at 21:00 and 55 at 22:00, so a great night to use cold outside air for cooling (thus the “ultra efficient AC”.
import json
import datetime
import time
from dateutil import parser
from requests import get, post
import paho.mqtt.client as mqttClient
import logging

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG)

loggers_to_set_to_warning = ['urllib3.connectionpool']
for l in loggers_to_set_to_warning:
    logging.getLogger(l).setLevel(logging.WARNING)

delta_temp = 3.0
mqtt_host = "mqtt.example.com"
mqtt_port = 1883

base_url = "http://ha.example.come:8123/"
states_url = base_url + "api/states/"
switch_url = base_url + "api/services/switch"
bearer_token = "ey...Vw"
full_bearer_token = "Bearer " + bearer_token
request_headers = {
    "Authorization": full_bearer_token,
    "content-type": "application/json"
}
endpoints = ["climate.masterbedfancooling",
             "sensor.real_outside_temp"]
fan_switch_entity_id = "switch.fan_switch"
states = {}
climate_topic = "climate/fan_control_state"
current_state = "off"
last_state = None
desired_seconds_to_sleep = 300
max_cool_outdoor_temp_limit = 66
desired_temp = None
outside_temp = None
current_temp = None
next_fan_action_time = datetime.datetime.now()


def set_fan_switch_state(state):
    new_fan_state = None
    if state == "on":
        new_fan_state = "on"
    elif state == "off":
        new_fan_state = "off"
    else:
        logging.warn("requested fan state unknown")
        return

    entity_info = {"entity_id": fan_switch_entity_id}
    full_url = switch_url + "/turn_" + new_fan_state
    response = post(full_url, headers=request_headers,
                    data=json.dumps(entity_info))
    if response.status_code != 200:
        logging.error(
            f"attempted to set fan state to {new_fan_state} but encountered error with status code: {response.status_code}")
    else:
        logging.info(f"successfully set fan state to {new_fan_state}")


def set_state_from_mqtt_message(message):
    global current_state, next_fan_action_time
    if message == "max_cool":
        current_state = "max_cool"
    elif message == "normal_cool":
        current_state = "normal_cool"
    elif message == "off":
        current_state = "off"
    elif message == "on":
        current_state = "on"
    else:
        logging.error(f"unable to determine state, setting to off")
        current_state = "off"
    logging.info(f"current_state set to: {current_state}")
    next_fan_action_time = datetime.datetime.now()


def connect_mqtt():
    # Set Connecting Client ID
    client = mqttClient.Client("python_window_fan_control")
    #client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(mqtt_host, mqtt_port)
    return client


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        logging.info("Connected to MQTT Broker!")
    else:
        logging.info("Failed to connect, return code %d\n", rc)


def on_message(client, userdata, msg):
    logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    set_state_from_mqtt_message(msg.payload.decode())


def on_subscribe(client, userdata, mid, granted_qos):
    logging.info(f"subscribed to topic")


def set_fan_state(state):
    if state == "on":
        logging.info(f"setting fan state to on")
        set_fan_switch_state("on")
    elif state == "off":
        logging.info(f"setting fan state to off")
        set_fan_switch_state("off")


def get_and_set_temperatures():
    global desired_temp, current_temp, outside_temp
    logging.debug("executing loop")

    for endpoint in endpoints:
        full_url = states_url + endpoint
        response = get(full_url, headers=request_headers)
        parsed_json = json.loads(response.text)
        entity = parsed_json['entity_id']
        hvac_action = ""
        if endpoint == 'climate.masterbedfancooling':
            desired_temp = float(
                parsed_json['attributes']['temperature'])
            current_temp = float(
                parsed_json['attributes']['current_temperature'])
            hvac_action = parsed_json['attributes']['hvac_action']
        elif endpoint == 'sensor.real_outside_temp':
            outside_temp = float(parsed_json['state'])
        last_updated = parser.parse(parsed_json['last_updated'])
    logging.info(
        f"temps: current={current_temp}, desired={desired_temp}, outside={outside_temp}")
    if desired_temp == None or current_temp == None or outside_temp == None:
        logging.error(
            "one or more temps invalid, turning off switch and breaking execution")
        set_fan_state("off")


client = mqttClient.Client("window-fan-client")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(mqtt_host, mqtt_port)
client.subscribe(climate_topic)
client.loop_start()

while(True):
    # logging.info("loop")
    if datetime.datetime.now() > next_fan_action_time:
        logging.info("fan action time")
        if current_state == "off":
            new_fan_state = "off"
            logging.info(
                f"current_state is {current_state}, turning fan {new_fan_state}")
            set_fan_state("off")
        else:
            get_and_set_temperatures()

        if current_state == "max_cool":
            logging.info(
                f"current_state is {current_state}, call for max cooling")
            if outside_temp < max_cool_outdoor_temp_limit:
                logging.info(
                    f"able to max_cool with outside temp: {outside_temp}, lower than {max_cool_outdoor_temp_limit} ")
                set_fan_state("on")
            elif outside_temp < (current_temp - delta_temp):
                logging.info(
                    f"unable to max cool, but still can cool with outside: {outside_temp} and inside: {current_temp}")
                set_fan_state("on")
            else:
                logging.info("unable to cool at all, turning fan off")
                set_fan_state("off")
        elif current_state == "normal_cool":
            logging.info(
                f"current_state is {current_state}")
            if (current_temp > desired_temp):
                logging.info(
                    f"call for cooling. current: {current_temp}, desired: {desired_temp}")
                if (current_temp > (outside_temp - delta_temp)):
                    logging.info("can cool, turning fan on")
                    set_fan_state("on")
                else:
                    logging.info("can't cool, turning fan off")
                    set_fan_state("off")
            else:
                logging.info("no need for cooling, turning fans off")
                set_fan_state("off")
        elif current_state == "on":
            new_fan_state = "on"
            logging.info(
                f"current_state is {current_state}, turning fan {new_fan_state}")
            set_fan_state("on")

        last_state = current_state
        next_fan_action_time = datetime.datetime.now() \
            + datetime.timedelta(seconds=desired_seconds_to_sleep)
        logging.info(f"next fan action in {desired_seconds_to_sleep} seconds")
        logging.info("---------loop end-------------------")
    else:
        #logging.debug("not fan action time yet, sleeping 1s")
        time.sleep(0.25)
    # actual_seconds_to_sleep = desired_seconds_to_sleep - datetime.datetime.now().minute % desired_seconds_to_sleep
    # seconds_to_sleep = actual_minutes_to_sleep * 60.0
    # logging.info(f"sleeping {desired_seconds_to_sleep}s")
    # time.sleep(desired_seconds_to_sleep)
Categories
Home Assistant Home Automation

Using the Govee Bluetooth Thermometer with Home Assistant (Python and MQTT)

Introduction

Like many other Home Automation enthusiasts, I have been on the lookout for a cheap thermometer/hygrometer that has either WiFi or Bluetooth connectivity. I think I found the answer in the $12 USD Govee Bluetooth Digital Thermometer and Hygrometer. The fact that the device broadcasts the current temperature and humidity every 2 seconds via low energy Bluetooth (BLE) is the metaphorical icing on the cake. Here is a pic of the unit in our garage:

Govee bluetooth thermometer and hygrometer showing 56*F/24% in a garage

Specifications

This is a pretty basic device. It has a screen that shows the current temperature, humidity, and min/max values. It sends the current readings (along with battery health) every 2 seconds via low-energy bluetooth (BLE). The description on Amazon says it has a “Swiss-made smart hygrometer sensor”. Dunno if I believe that but for $12 it is good enough. The temperature is accurate to +/- 0.54F and humidity is +/- 3% RH. If you use the app, it is apparently possible to read the last 20 days or 2 years of data from the device (I haven’t used the app at all).

Enough with the boring stuff. Let’s get it connected to Home Assistant.

Reading the Govee bluetooth advertisements with Python

I found some sample code on tchen’s GitHub page (link) to help get me going in the right direction.

Without further ado, here is the code I’m using to read the data and publish via MQTT:

observe.py (updated 2022-01-04 with some logging improvements. default logging level is now WARNING, which disables printing every advertisement)

# basic govee bluetooth data reading code for model https://amzn.to/3z14BIi
# written/modified by Austin of austinsnerdythings.com 2021-12-27
# original source: https://gist.github.com/tchen/65d6b29a20dd1ef01b210538143c0bf4
import logging
import json
from time import sleep
from basic_mqtt import basic_mqtt
from bleson import get_provider, Observer

logging.basicConfig(
	format='%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s',
	datefmt='%Y-%m-%d %H:%M:%S',
	level=logging.WARNING)

# I did write all the mqtt stuff. I kept it in a separate class
mqtt = basic_mqtt()
mqtt.connect()

# writing code on my windows computer, committing, pushing, and then 
# pulling the new code on the Raspberry Pi is a tedious "debug" process.
# there are quite a few errors that spit out from bleson, which as far
# as I can tell, isn't super polished. if we set the debug level to
# critical, most of those disappear.
logging.getLogger("bleson").setLevel(logging.CRITICAL)

# basic celsius to fahrenheit function
def c2f(val):
    return round(32 + 9*val/5, 2)

last = {}

# I didn't write this, but it takes the raw BT data and spits out the data of interest
def temp_hum(values, battery, address):
    global last
    #########################
    #
    # there is a fix for temperatures below freezing here - 
    # https://github.com/joshgordon/govee_ble_to_mqtt/blob/master/observe.py
    # I will adjust post code afternoon of 2022-11-28
    #
    #########################
    values = int.from_bytes(values, 'big')
    if address not in last or last[address] != values:
        last[address] = values
        temp = float(values / 10000)
        hum = float((values % 1000) / 10)
        # this print looks like this:
        # 2021-12-27T11:22:17.040469 BDAddress('A4:C1:38:9F:1B:A9') Temp: 45.91 F  Humidity: 25.8 %  Battery: 100 %
        logging.info(f"decoded values: {address} Temp: {c2f(temp)} F  Humidity: {hum} %  Battery: {battery}")
        # this code originally just printed the data, but we need it to publish to mqtt.
        # added the return values to be used elsewhere
        return c2f(temp), hum, battery

def on_advertisement(advertisement):
    #print(advertisement)
    mfg_data = advertisement.mfg_data

    # there are lots of BLE advertisements flying around. only look at ones that have mfg_data
    if mfg_data is not None:
        #print(advertisement)
        # there are a few Govee models and the data is in different positions depending on which
        # the unit of interest isn't either of these hardcoded values, so they are skipped
        if advertisement.name == 'GVH5177_9835':
            address = advertisement.address
            temp_hum(mfg_data[4:7], mfg_data[7], address)
        elif advertisement.name == 'GVH5075_391D':
            address = advertisement.address
            temp_hum(mfg_data[3:6], mfg_data[6], address)
        elif advertisement.name != None:
            # this is where all of the advertisements for our unit of interest will be processed
            address = advertisement.address
            if 'GVH' in advertisement.name:
                #print(advertisement)
                temp_f, hum, battery = temp_hum(mfg_data[3:6], mfg_data[6], address)

                if temp_f > 180.0 or temp_f < -30.0:
                    return
                # as far as I can tell bleson doesn't have a string representation of the MAC address
                # address is of type BDAddress(). str(address) is BDAddress('A4:C1:38:9F:1B:A9')
                # substring 11:-2 is just the MAC address
                mac_addr = str(address)[11:-2]

                # construct dict with relevant info
                msg = {'temp_f':temp_f,
                        'hum':hum,
                        'batt':battery}
                
                # looks like this:
                # msg data: {'temp_f': 45.73, 'hum': 25.5, 'batt': 100}
                logging.info(f"MQTT msg data: {msg}")

                # turn into JSON for publishing
                json_string = json.dumps(msg, indent=4)

                # publish to topic separated by MAC address
                mqtt.publish(f"govee/{mac_addr}", json_string)

# base stuff from the original gist
logging.warning(f"initializing bluetooth")
adapter = get_provider().get_adapter()
observer = Observer(adapter)
observer.on_advertising_data = on_advertisement

logging.warning(f"starting observer")
observer.start()
logging.warning(f"listening for events and publishing to MQTT")
while True:
    # unsure about this loop and how much of a delay works
    sleep(1)
observer.stop()

And for the MQTT helper class (mqtt_helper.py):

# basic MQTT helper class. really needed to write one of these to simplify basic MQTT operations
# written/modified by Austin of austinsnerdythings.com 2021-12-27
# original source: https://gist.github.com/fisherds/f302b253cf7a11c2a0d814acd424b9bb
# filename is basic_mqtt.py
from paho.mqtt import client as mqtt_client
import logging
import datetime
logging.basicConfig(
	format='%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s',
	datefmt='%Y-%m-%d %H:%M:%S',
	level=logging.INFO)


mqtt_host = "mqtt.home.fluffnet.net"
test_topic = "mqtt_test_topic"

# this is really not polished. it was a stream of consciousness project to pound
# something out to do basic MQTT publish stuff in a reusable fashion.
class basic_mqtt:
	def __init__(self):
		self.client = mqtt_client.Client()
		self.subscription_topic_name = None
		self.publish_topic_name = None
		self.callback = None
		self.host = mqtt_host
	
	def connect(self):
		self.client.on_connect = self.on_connect
		self.client.on_subscribe = self.on_subscribe
		self.client.on_message = self.on_message
		logging.info(f"connecting to MQTT broker at {mqtt_host}")
		self.client.connect(host=mqtt_host,keepalive=30)
		self.client.loop_start()

	def on_connect(self, client, userdata, flags, rc):
		print("Connected with result code "+str(rc))

		# Subscribing in on_connect() means that if we lose the connection and
		# reconnect then subscriptions will be renewed.
		#self.client.subscribe("$SYS/#")

	def on_message(self, client, userdata, msg):
		print(f"got message of topic {msg.topic} with payload {msg.payload}")

	def on_subscribe(self, client, userdata, mid, granted_qos):
		print("Subscribed: " + str(mid) + " " + str(granted_qos))

	def publish(self, topic, msg):
		self.client.publish(topic=topic, payload=msg)

	def subscribe_to_test_topic(self, topic=test_topic):
		self.client.subscribe(topic)

	def send_test_message(self, topic=test_topic):

		self.publish(topic=topic, msg=f"test message from python script at {datetime.datetime.now()}")

	def disconnect(self):
		self.client.disconnect()

	def loop(self):
		self.client.loop_forever()

if __name__ == "__main__":
	logging.info("running MQTT test")
	mqtt_helper = basic_mqtt()
	mqtt_helper.connect()
	mqtt_helper.subscribe_to_test_topic()
	mqtt_helper.send_test_message()
	mqtt_helper.disconnect()

Results

Running this script on a Raspberry Pi 3 shows the advertisements coming in as expected. The updates are very quick compared to the usual 16 second update interval for my Acurite stuff.

screenshot of Python code running to receive BLE advertisements from Govee Bluetooth Thermometer
screenshot of Python code running to receive BLE advertisements from Govee Bluetooth Thermometer

And running mosquitto_sub with the right arguments (mosquitto_sub -h mqtt -v -t “govee/#”) shows the MQTT messages are being published as expected:

mosquitto_sub showing our published MQTT messages with temperature/humidity data from the Govee sensor

Getting Govee MQTT data into Home Assistant

Lastly, we need to add a MQTT sensor to get the data importing into Home Assistant:

- platform: mqtt
  state_topic: "govee/A4:C1:38:9F:1B:A9"
  value_template: "{{ value_json.temp_f }}"
  name: "garage temp"
  unit_of_measurement: "F"

And from there you can do whatever you want with the collected data!

Home Assistant displaying data from Govee Bluetooth temperature and humidity sensor

Conclusion

This was a relatively quick post and code development. I really hate the cycle of developing on my Bluetooth-less Windows computer, committing the code, pushing to Git, pulling on the Pi, and running to “debug”. Thus, the code isn’t as good as it can be. I probably did 20-25 iterations before calling it good enough.

Regardless, I think this $12 Govee Bluetooth Thermometer and Hygrometer is a great little tool for collecting data around the house. You don’t need an SDR to get Acurite beacons, and you don’t need to spend a lot either. You just need a way to receive BLE advertisements (basically any Bluetooth-capable device can do this). There is even a 2 pack of just the sensors that I just discovered on Amazon for $24 – 2 pack of Govee bluetooth thermometer and hygrometer. I know there are other Bluetooth devices but they’re quite a bit more expensive.

Disclosure: Some of the links on this post are Amazon affiliate links. This means that, at zero cost to you, I will earn an affiliate commission if you click through the link and finalize a purchase.

Also, at least for me, these are available with same day shipping on Amazon. Scratch that instant gratification itch.

Same day shipping available on Amazon near Denver for the Govee sensors
Categories
Python XPlane

Coding a pitch/roll/altitude autopilot in X-Plane with Python

Introduction

Sorry it has taken me so long to write this post! The last post on the blog was October 19th – almost 6 weeks ago. Life happens. We have a 15 month old running around and she is a handful!

Anyways, back to the next topic – coding a pitch/roll (2 axis) autopilot in X-Plane with Python with altitude and heading hold. Today we will be adding the following:

  • Real-time graphing for 6 parameters
  • Additional method to grab data out of X-Plane
  • A normalize function to limit outputs to reasonable values
  • Altitude preselect and hold function

The full code will be at the end of this post.

Video Link

coming soon

Contents

  1. Adding PyQtGraph
  2. Developing a normalize function
  3. Initializing the data structures to hold the graph values
  4. Defining the PyQtGraph window and parameters
  5. Getting more data points out of X-Plane
  6. Feeding the graph data structures with data
  7. Adding altitude preselect and hold

1 – Adding PyQtGraph

Pip is my preferred tool to manage Python packages. It is easy and works well. I initially tried graphing with MatPlotLib but it took 200-300ms to update, which really dragged down the control loop to the point of being unusable. Instead, we will be using PyQtGraph. Install it with Pip:

pip install pyqtgraph

2 – Developing a normalize function

This task is pretty straightforward. There are a couple places where we want to pass values that need to be within a certain range. The first example is to the client.sendCTRL() method to set the control surfaces in X-Plane. The documentation states values are expected to be from -1 to 1. I have got some really weird results sending values outside that range (specifically for throttle, if you send something like 4, you can end up with 400% throttle which is wayyy more than the engines can actually output).

# this function takes an input and either passes it through or adjusts
# the input to fit within the specified max/min values
def normalize(value, min=-1, max=1):
	# if value = 700, and max = 20, return 20
	# if value = -200, and min = -20, return -20
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

3 – Initializing the graphing data structures

We need a couple of arrays to store the data for our graphs. We need (desired data to plot) + 1 arrays. The +1 is the x-axis, which will just store values like 0,1,2,3,etc. The others will be the y-values. We haven’t added the altitude stuff yet, so you can add them but they won’t be used yet.

x_axis_counters = [] #0, 1, 2, 3, etc. just basic x-axis values used for plotting
roll_history = []
pitch_history = []
#altitude_history = []
roll_setpoint_history = []
pitch_setpoint_history = []
#altitude_setpoint_history = []
plot_array_max_length = 100 # how many data points to hold in our arrays and graph
i = 1 # initialize x_axis_counter

4 – Defining the PyQtGraph window and parameters

Working with PyQtGraph more or less means we’ll be working with a full blown GUI (just stripped down).

# first the base app needs to be instantiated
app = pg.mkQApp("python xplane autopilot monitor")

# now the window itself is defined and sized
win = pg.GraphicsLayoutWidget(show=True)
win.resize(1000,600) #pixels
win.setWindowTitle("XPlane autopilot system control")

# we have 3 subplots
p1 = win.addPlot(title="roll",row=0,col=0)
p2 = win.addPlot(title="pitch",row=1,col=0)
p3 = win.addPlot(title="altitude", row=2, col=0)

# show the y grid lines to make it easier to interpret the graphs
p1.showGrid(y=True)
p2.showGrid(y=True)
p3.showGrid(y=True)

5 – Getting more data points out of X-Plane

The initial .getPOSI() method that came in the example has worked well for us so far. But at this point we need more data that isn’t available in the .getPOSI() method. We will be utilizing a different method called .getDREFs() which is short for ‘get data references’. We will need to construct a list of data references we want to retrieve, pass that list to the method, and then parse the output. It is more granular than .getPOSI(). I haven’t evaluated the performance but I don’t think it is a problem.

The DREFs we want are for indicated airspeed, magnetic heading (.getPOSI() has true heading, not magnetic), an indicator to show if we are on the ground or not, and height as understood by the flight model. Thus, we can define our DREFs as follows:

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

And we can get the data with client.getDREFs(DREFs). The returned object is a 2d array. We need to parse out our values of interest. The full data gathering code looks like this:

posi = client.getPOSI();
ctrl = client.getCTRL();
multi_DREFs = client.getDREFs(DREFs)

current_roll = posi[4]
current_pitch = posi[3]
current_hdg = multi_DREFs[1][0]
current_altitude = multi_DREFs[3][0]
current_asi = multi_DREFs[0][0]
onground = multi_DREFs[2][0]

With those data points, we have everything we need to start plotting the state of our aircraft and monitoring for PID tuning.

6 – Feeding the real-time graphs with data

Next up is actually adding data to be plotted. There are two scenarios to consider when adding data to the arrays: 1) the arrays have not yet reached the limit we set earlier (100 points), and 2) they have. Case 1 is easy. We just append the current values to the arrays:

x_axis_counters.append(i)
roll_history.append(current_roll)
roll_setpoint_history.append(desired_roll)
pitch_history.append(current_pitch)
pitch_setpoint_history.append(pitch_PID.SetPoint)
altitude_history.append(current_altitude)
altitude_setpoint_history.append(desired_altitude)

The above code will work perfectly fine if you want the arrays to grow infinitely large over time. Ain’t nobody got time for that so we need to check how long the arrays are and delete data. We’ll check the length of the x-axis array as a proxy for all the others and use that to determine what to do. Typing this code that looks very similar over and over again means it’s probably time to abstract it into classes or something else. The more you type something over and over again, the larger indication you have that you need to so something about it. But for now we’ll leave it like this for ease of reading and comprehension.

# if we reach our data limit set point, evict old data and add new.
# this helps keep the graph clean and prevents it from growing infinitely
if(len(x_axis_counters) > plot_array_max_length):
	x_axis_counters.pop(0)
	roll_history.pop(0)
	roll_setpoint_history.pop(0)
	pitch_history.pop(0)
	pitch_setpoint_history.pop(0)
	altitude_history.pop(0)
	altitude_setpoint_history.pop(0)

	x_axis_counters.append(i)
	roll_history.append(current_roll)
	roll_setpoint_history.append(desired_roll)
	pitch_history.append(current_pitch)
	pitch_setpoint_history.append(pitch_PID.SetPoint)
	altitude_history.append(current_altitude)
	altitude_setpoint_history.append(desired_altitude)
# else, just add new. we are not yet at limit.
else:
	x_axis_counters.append(i)
	roll_history.append(current_roll)
	roll_setpoint_history.append(desired_roll)
	pitch_history.append(current_pitch)
	pitch_setpoint_history.append(pitch_PID.SetPoint)
	altitude_history.append(current_altitude)
	altitude_setpoint_history.append(desired_altitude)
i = i + 1

You will notice that there are quite a few entries for altitude. We haven’t done anything with that yet so just set desired_altitude to an integer somewhere in the code so it doesn’t error out.

To complete the graphing portion, we need to actually plot the data. The clear=true in the below lines clears out the plot so we’re not replotting on top of the old data. We also need to process events to actually draw the graph:

# process events means draw the graphs
pg.QtGui.QApplication.processEvents()

# arguments are x values, y values, options
# pen is a different line in the plot
p1.plot(x_axis_counters, roll_history, pen=0, clear=True)
p1.plot(x_axis_counters, roll_setpoint_history, pen=1)

p2.plot(x_axis_counters, pitch_history, pen=0,clear=True)
p2.plot(x_axis_counters, pitch_setpoint_history, pen=1)

p3.plot(x_axis_counters, altitude_history, pen=0,clear=True)
p3.plot(x_axis_counters, altitude_setpoint_history, pen=1)

You can now run the code to see your graph populating with data!

PyQtGraph plotting the aircraft’s roll/pitch and desired roll/pitch

7 – Adding altitude autopilot (preselect and hold)

Ok so now that we have eye candy with the real-time graphs, we can make our autopilot do something useful: go to a selected altitude and hold it.

We already have the roll and pitch PIDs functioning as desired. How do we couple the pitch PID to get to the desired altitude? One cannot directly control altitude. Altitude is controlled via a combination of pitch and airspeed (and time).

We will call the coupled PIDs an inner loop (pitch) and an outer loop (altitude). The outer loop runs and its output will feed the input of the inner loop. The altitude PID will be fed a desired altitude and current altitude. The output will then mostly be the error (desired altitude – current altitude) multiplied by our P setting. Of course I and D will have a say in the output but by and large it will be some proportion of the error.

Let’s start with defining the altitude PID and desired altitude:

altitude_PID = PID.PID(P, I, D)
desired_altitude = 8000
altitude_PID.SetPoint = desired_altitude

With those defined, we now move to the main loop. The outer loop needs to be updated first. From there, we will normalize the output from the altitude PID and use that to set the pitch PID. The pitch PID will also be normalized to keep values in a reasonable range:

# update outer loops first
altitude_PID.update(current_altitude)

# if alt=12000, setpoint = 10000, the error is 2000. if P=0.1, output will be 2000*0.1=200
pitch_PID.SetPoint = normalize(altitude_PID.output, min=-15, max=10)

# update PIDs
roll_PID.update(current_roll)
pitch_PID.update(current_pitch)

# update control outputs
new_ail_ctrl = normalize(roll_PID.output)
new_ele_ctrl = normalize(pitch_PID.output)

Now we just need to send those new control surface commands and we’ll be controlling the plane!

Outputting the control deflections should basically be the last part of the loop. We’ll put it right before the debug output:

# sending actual control values to XPlane
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
client.sendCTRL(ctrl)

Full code of pitch_roll_autopilot_with_graphing.py

import sys
import xpc
import PID
from datetime import datetime, timedelta
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
import time

def normalize(value, min=-1, max=1):
	# if value = 700, and max = 20, return 20
	# if value = -200, and min = -20, return -20
	if (value > max):
		return max
	elif (value < min):
		return min
	else:
		return value

update_interval = 0.050 # seconds, 0.05 = 20 Hz
start = datetime.now()
last_update = start

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)
altitude_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2
desired_altitude = 8000

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch
altitude_PID.SetPoint = desired_altitude

x_axis_counters = [] #0, 1, 2, 3, etc. just basic x-axis values used for plotting
roll_history = []
pitch_history = []
altitude_history = []
roll_setpoint_history = []
pitch_setpoint_history = []
altitude_setpoint_history = []
plot_array_max_length = 300 # how many data points to hold in our arrays and graph
i = 1 # initialize x_axis_counter

# first the base app needs to be instantiated
app = pg.mkQApp("python xplane autopilot monitor")

# now the window itself is defined and sized
win = pg.GraphicsLayoutWidget(show=True)
win.resize(1000,600) #pixels
win.setWindowTitle("XPlane autopilot system control")

# we have 3 subplots
p1 = win.addPlot(title="roll",row=0,col=0)
p2 = win.addPlot(title="pitch",row=1,col=0)
p3 = win.addPlot(title="altitude", row=2, col=0)

# show the y grid lines to make it easier to interpret the graphs
p1.showGrid(y=True)
p2.showGrid(y=True)
p3.showGrid(y=True)

DREFs = ["sim/cockpit2/gauges/indicators/airspeed_kts_pilot",
		"sim/cockpit2/gauges/indicators/heading_electric_deg_mag_pilot",
		"sim/flightmodel/failures/onground_any",
		"sim/flightmodel/misc/h_ind"]

def monitor():
	global i
	global last_update
	with xpc.XPlaneConnect() as client:
		while True:
			if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
				last_update = datetime.now()
				print(f"loop start - {datetime.now()}")

				posi = client.getPOSI();
				ctrl = client.getCTRL();
				multi_DREFs = client.getDREFs(DREFs)

				current_roll = posi[4]
				current_pitch = posi[3]
				current_hdg = multi_DREFs[1][0]
				current_altitude = multi_DREFs[3][0]
				current_asi = multi_DREFs[0][0]
				onground = multi_DREFs[2][0]

				# update the display
				pg.QtGui.QApplication.processEvents()

				# update outer loops first
				altitude_PID.update(current_altitude)

				# if alt=12000, setpoint = 10000, the error is 2000. if P=0.1, output will be 2000*0.1=200
				pitch_PID.SetPoint = normalize(altitude_PID.output, min=-15, max=10)

				# update PIDs
				roll_PID.update(current_roll)
				pitch_PID.update(current_pitch)

				# update control outputs
				new_ail_ctrl = normalize(roll_PID.output)
				new_ele_ctrl = normalize(pitch_PID.output)

				# if we reach our data limit set point, evict old data and add new.
				# this helps keep the graph clean and prevents it from growing infinitely
				if(len(x_axis_counters) > plot_array_max_length):
					x_axis_counters.pop(0)
					roll_history.pop(0)
					roll_setpoint_history.pop(0)
					pitch_history.pop(0)
					pitch_setpoint_history.pop(0)
					altitude_history.pop(0)
					altitude_setpoint_history.pop(0)

					x_axis_counters.append(i)
					roll_history.append(current_roll)
					roll_setpoint_history.append(desired_roll)
					pitch_history.append(current_pitch)
					pitch_setpoint_history.append(pitch_PID.SetPoint)
					altitude_history.append(0)
					altitude_setpoint_history.append(desired_altitude)
				# else, just add new. we are not yet at limit.
				else:
					x_axis_counters.append(i)
					roll_history.append(current_roll)
					roll_setpoint_history.append(desired_roll)
					pitch_history.append(current_pitch)
					pitch_setpoint_history.append(pitch_PID.SetPoint)
					altitude_history.append(0)
					altitude_setpoint_history.append(desired_altitude)
				i = i + 1

				p1.plot(x_axis_counters, roll_history, pen=0, clear=True)
				p1.plot(x_axis_counters, roll_setpoint_history, pen=1)

				p2.plot(x_axis_counters, pitch_history, pen=0,clear=True)
				p2.plot(x_axis_counters, pitch_setpoint_history, pen=1)

				p3.plot(x_axis_counters, altitude_history, pen=0,clear=True)
				p3.plot(x_axis_counters, altitude_setpoint_history, pen=1)

				# sending actual control values to XPlane
				ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
				client.sendCTRL(ctrl)

				output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"
				output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"
				output = output + "\n"
				print(output)

if __name__ == "__main__":
	monitor()

Using the autopilot / Conclusion

To use the autopilot, fire up XPlane, hop in a small-ish plane (gross weight less than 10k lb), take off, climb 1000′, then execute the code. Your plane should bring roll to 0 pretty quick and start the climb/descent to the desired altitude.

X-Plane Python autopilot leveling off at 8000' with pitch/roll/altitude graphed in real-time
X-Plane Python autopilot leveling off at 8000′ with pitch/roll/altitude graphed in real-time
Categories
Python XPlane

Coding a wing leveler autopilot in X-Plane with Python

Introduction

Continuing from the first post, where we hooked up X-Plane to our Python code, we will build a wing leveler today. The first post was just about making sure we could get data into and out of X-Plane. Today will add a few features to our XPlane autopilot written in Python.

  • A control loop timer (we will be targeting a loop frequency of 10 Hz, or 10 updates per second)
  • Additional data feeds from X-Plane
  • Two PID controllers, one for roll, one for pitch
  • Some debugging output to keep track of what the PIDs are doing

The full code will be at the end of this post.

Video Link

Python Tutorial: code a wing leveler in X-Plane using PID loops

Contents

  1. Control loop timer/limiter
  2. Obtaining current pitch/roll values from X-Plane
  3. Initializing the PID controllers
  4. Feeding the PID controllers within the control loop
  5. Controlling the aircraft with the new PID output
  6. Monitoring the control loops via debug prints

1 – Control loop timer/limiter

Since we will be targeting a 10 Hz update rate, we need to develop a method to ensure the loop does not run more frequent than once every 100 milliseconds. We do not want the loop running uninhibited, because that will result in variable loop execution times and we like to keep those things constant. It could potentially execute thousands of times per second, which is entirely unnecessary. Most control loop algorithms run in the 10-100 Hz range (10-100 times per second). For reference, my RC plane running ArduPlane on Pixhawk uses 50 Hz as a standard update frequency.

To accomplish this task, we need to set up some timing variables.

First of all, add an import statment for datetime and timedelta:

from datetime import datetime, timedelta

Next, define the timing variables:

update_interval = 0.100 # this value is in seconds. 1/0.100 = 10 which is the update interval in Hz

# start is set to the time the line of code is executed, which is essential when the program started
start = datetime.now()

# last_update needs to be set to start for the first execution of the loop to successfully run.
# alternatively, we could've set start to something in the past.
last_update = start

# last update needs to be defined as a global within the monitor() function:
def monitor():
	global last_update

That handles the variables. Now we need to limit the loop execution. To do so requires wrapping all of the loop code into a new if statement that evaluates the time and only executes if the current time is 100 milliseconds greater than the last loop execution:

# loop is the "while True:" statement
while True:
	# this if statement is evaluated with every loop execution
	if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
		# when the if statement evaluates to true, the first thing we'll do is set the last update to the current time so the next iteration fires at the correct time
		last_update = datetime.now()

		# rest of the loop code goes here

2 – Obtaining current roll/pitch values from X-Plane

This task is pretty straightforward. In the first post, the monitorExample.py code included obtaining the position with posi = client.getPOSI(). There are 7 elements returned with that method, and two of them are roll and pitch.

Put the below after the .getPOSI() call.

current_roll = posi[4]
current_pitch = posi[3]

3 – Initializing the PID controllers

First you need to get the PID control file from the Ivmech (Ivmech Mechatronics Ltd.) GitHub page. Direct link PID.py file here. Put the file in the same working directory as everything else then import it with import PID at the top with the rest of the imports.

Then we can initialize the control instances. PID controllers are magic. But they do need to be set with smart values for the initial run. The default values with the PID library are P=0.2, I=0, D=0. This essentially means make the output be 20% of the error between the setpoint and the current value. For example, if the aircraft has a roll of 10 degrees to the left (-10), and the P=0.2, the output from the PID controller will be -2.

When setting PID values, it is almost always a good idea to start small and work your way up. If your gains are too high, you could get giant oscillations and other undesirable behaviors. In the YouTube video, I talk about the various types of PID controllers. They will basically always have a P (proportional) set. Many will also have an I (integral) value set, but not many use the D term (derivative).

Going with the “less is more” truism with PID controllers, I started with a P value of 0.1, and an I value of 0.01 (P/10). The I term (integral) is meant to take care of accumulated errors (which are usually long term errors). An example is your car’s cruise control going up a hill. If your cruise control is set to 65 mph, it will hold 65 no problem on flat roads. If you start going up a hill, the controller will slowly apply more throttle. With an I of 0, your car would never get to 65. It would probably stay around 62 or so. With the integrator going, the error will accumulate and boost the output (throttle) to get you back up to your desired 65. In the linked YouTube video, I show what happens when the I value is set to 0 and why it is necessary to correct long-term errors.

These values (P=0.1, I=0.01, D=0) turned out to work perfectly.

Place the following before the monitor function:

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing both PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch

4 – Updating the PID controllers within the control loop

With the PIDs created, they will need to be updated with the new, current pitch and roll values with every loop execution.

Place the following after current_roll and current_pitch are assigned:

roll_PID.update(current_roll)
pitch_PID.update(current_pitch)

5 – Controlling the aircraft with the new PID output

Updating the PID controller instances will generate new outputs. We can use those outputs to set the control surfaces. You can place these two lines directly below the update lines:

new_ail_ctrl = roll_PID.output
new_ele_ctrl = pitch_PID.output

So we have new control surface values – now we need to actually move the control surfaces. This is accomplished by sending an array of 4 floats with .sendCTRL():

# ele, ail, rud, thr. -998 means don't set/change
ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998]
client.sendCTRL(ctrl)

6 – Monitoring the control loops via debug prints

The last bit to tie a bunch of this together is printing out values with every loop execution to ensure things are heading the right direction. We will turn these into graphs in the next post or two.

We will be concatenating strings because it’s easy and we aren’t working with enough strings for it to be a performance problem.

In newer Python versions (3.6+), placing ‘f’ before the quotes in a print statement (f””) means the string is interpolated. This means you can basically put code in the print statement, which makes creating print statements much easier and cleaner.

The first line will print out the current roll and pitch value (below). We are using current_roll and current_pitch interpolated. The colon, then blank space with 0.3f is a string formatter. It rounds the value to 3 decimal places and leaves space for a negative. It results in things being lined up quite nicely.

output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"

The next code statement will add a new line to the previous output, and also add the PID outputs for reference:

output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"

The final line will just add another new line to keep each loop execution’s print statements grouped together:

output = output + "\n"

Finally, we print the output with print(output), which will look like this:

loop start - 2021-10-19 14:24:26.208945
current values --    roll:  0.000,  pitch:  1.994
PID outputs    --    roll: -0.000,  pitch:  0.053

Full code of pitch_roll_autopilot.py

import sys
import xpc
import PID
from datetime import datetime, timedelta

update_interval = 0.100 # seconds
start = datetime.now()
last_update = start

# defining the initial PID values
P = 0.1 # PID library default = 0.2
I = P/10 # default = 0
D = 0 # default = 0

# initializing both PID controllers
roll_PID = PID.PID(P, I, D)
pitch_PID = PID.PID(P, I, D)

# setting the desired values
# roll = 0 means wings level
# pitch = 2 means slightly nose up, which is required for level flight
desired_roll = 0
desired_pitch = 2

# setting the PID set points with our desired values
roll_PID.SetPoint = desired_roll
pitch_PID.SetPoint = desired_pitch

def monitor():
	global last_update
	with xpc.XPlaneConnect() as client:
		while True:
			if (datetime.now() > last_update + timedelta(milliseconds = update_interval * 1000)):
				last_update = datetime.now()
				print(f"loop start - {datetime.now()}")

				posi = client.getPOSI();
				ctrl = client.getCTRL();

				current_roll = posi[4]
				current_pitch = posi[3]

				roll_PID.update(current_roll)
				pitch_PID.update(current_pitch)

				new_ail_ctrl = roll_PID.output
				new_ele_ctrl = pitch_PID.output

				ctrl = [new_ele_ctrl, new_ail_ctrl, 0.0, -998] # ele, ail, rud, thr. -998 means don't change
				client.sendCTRL(ctrl)

				output = f"current values --    roll: {current_roll: 0.3f},  pitch: {current_pitch: 0.3f}"
				output = output + "\n" + f"PID outputs    --    roll: {roll_PID.output: 0.3f},  pitch: {pitch_PID.output: 0.3f}"
				output = output + "\n"
				print(output)

if __name__ == "__main__":
	monitor()

Using the autopilot / Conclusion

To use the autopilot, fire up XPlane, hop in a small-ish plane (gross weight less than 10k lb), take off, climb 1000′, then execute the code. Your plane should level off within a second or two in both axis.

Here is a screenshot of the output after running for a few seconds:

Screenshot showing current pitch and roll values along with their respective PID outputs

The linked YouTube video shows the aircraft snapping to the desired pitch and roll angles very quickly from a diving turn. The plane is righted to within 0.5 degrees of the setpoints within 2 seconds.

A note about directly setting the control surfaces

If you are familiar with PIDs and/or other parts of what I’ve discussed here, you’ll realize that we could be setting large values for the control surfaces (i.e. greater than 1 or less than -1). We will address that next post with a normalization function. It will quickly become a problem when a pitch of 100+ is commanded for altitude hold. I have found that XPlane will allow throttle values of more than 100% (I’ve seen as high as ~430%) if sent huge throttle values.

Categories
Python XPlane

Creating an autopilot in X-Plane using Python – part 1

Introduction

Today’s post will take us in a slightly different direction than the last few. Today’s post will be about hooking up some Python code to the X-Plane flight simulator to enable development of an autopilot using PID (proportional-integral-derivative) controllers. I’ve been a fan of flight simulators for quite some time (I distinctly remember getting Microsoft Flight Simulator 98 for my birthday when I was like 8 or 9) but have only recently started working with interfacing them to code. X-Plane is a well-known flight simulator developed by another Austin – Austin Meyer. It is regarded as having one of the best flight models and has tons of options for getting data into/out of the simulator. More than one FAA-certified simulator setups are running X-Plane as the primary driver software.

I got started thinking about writing some code for X-Plane while playing another game, Factorio. I drive a little plane or car in the game to get around my base and I just added a plug-in that “snaps” the vehicle to a heading, which makes it easier to go in straight lines. I thought – “hmm how hard could this be to duplicate in a flight sim?”. So here we are.

This post will get X-Plane hooked up to Python. The real programming will start with the next post.

Video Link

Contents

  1. Download and install X-Plane (I used X-Plane 10 because it uses less resources than X-Plane 11 and we don’t need the graphics/scenery to look super pretty to do coding. It also loads faster.)
  2. Download and install NASA’s XPlaneConnect X-Plane plug-in to X-Plane
  3. Verify the XPlaneConnect plug-in is active in X-Plane
  4. Download sample code from XPlaneConnect’s GitHub page
  5. Run the sample script to verify data is being transmitted from X-Plane via UDP to the XPlaneConnect code

1 – Download and install X-Plane 10 or X-Plane 11

I’ll leave this one up to you. X-Plane 10 is hard to find these days I just discovered. X-Plane 11 is available on Steam for $59.99 as of writing. I just tested and the plug-in/code works fine on X-Plane 11 (but the flight models are definitely different and will need different PID values). My screenshots might jump between the two versions but the content/message will be the same.

2 – Download and install NASA’s XPlaneConnect plug-in

NASA (yes, that NASA, the National Aeronautics and Space Administration) has wrote a bunch of code to interface with X-Plane. They have adapters for C, C++, Java, Matlab, and Python. They work with X-Plane 9, 10, and 11.

  1. Download the latest version from the XPlaneConnect GitHub releases page, 1.3 RC6 as of writing
  2. Open the .zip and place the contents in the [X-Plane directory]/Resources/plugins folder. There are few other folders already present in this directory. Mine looked like this after adding the XPlaneConnect folder:
Screenshot of X-Plane 10 plugins directory with XPlaneConnect folder added
Screenshot of X-Plane 11 plugins directory with XPlaneConnect folder added

3 – Verify XPlaneConnect is active in X-Plane

Now we’ll load up X-Plane and check the plug-ins to verify XPlaneConnect is enabled. Go to the top menu and select Plugins -> Plugin Admin. You should see X-Plane Connect checked in the enabled column:

Screenshot showing XPlaneConnect plug-in active in X-Plane 11
Screenshot showing XPlaneConnect plug-in active in X-Plane 10

4 – Download sample code from XPlaneConnect’s GitHub page

From the Python3 portion of the GitHub code, download xpc.py and monitorExample.py and stick them in your working directory (doesn’t matter where). For me, I just downloaded the entire git structure so the code is at C:\Users\Austin\source\repos\XPlaneConnect\Python3\src:

Screenshot showing xpc.py and monitorExample.py in my working directory

5 – Run sample code to verify data is making it from X-Plane to our code

With X-Plane running with a plane on a runway (or anywhere really), go ahead and run monitorExample.py! I will be using Visual Studio Code to program this XPlane Python autopilot stuff so that’s where I’ll run it from.

You will start seeing lines scroll by very fast with 6 pieces of information – latitude, longitude, elevation (in meters), and the control deflections for aileron, elevator, and rudder (normalized from -1 to 1, with 0 being centered). In the below screenshot, we see a lat/lon of 39.915, -105.128, with an elevation of 1719m. First one to tell me in the comments what runway that is wins internet points!

Screenshot showing Visual Studio Code running monitorExample.py in front of X-Plane 10 and the output scrolling by.

Conclusion

In this post, we have successfully downloaded the XPlaneConnect plug-in, and demonstrated that it can successfully interface with Python code in both X-Plane 10 and X-Plane 11.

Next up is to start controlling the plane with a basic wing leveler. As of writing this post, I have the following completely functional:

  • Pitch / roll hold at reasonable angles (-25 to 25)
  • Altitude set and hold
  • Heading set and hold
  • Airspeed set and hold
  • Navigate directly to a lat/lon point

See you at the next post! Next post – Coding a wing leveler autopilot in X-Plane with Python

Categories
Offgrid Solar Uncategorized

Sending MPP inverter data to MQTT and InfluxDB with Python

Catching up

Hey there, welcome back to Austin’s Nerdy Things. It’s been a while since my last post – life happens. I haven’t lost sight of the blog. Just needed to do some other things and finish up some projects before documenting stuff.

Background

If you recall, my DIY hybrid solar setup with battery backup is up and running. I wrote about it here – DIY solar with battery backup – up and running!

The MPP LV2424 inverter I’m using puts out a lot of data. Some of it is quite useful, some of it not so much. Regardless, I am capturing all of it in my InfluxDB database with Python. This allows me to chart it in Grafana, which I use for basic monitoring stuff around the house. This post will document getting data from the MPP inverter to Grafana.

Jumping ahead

Final product first. This is a screenshot showing my complete (work-in-progress) Grafana dashboard for my DIY hybrid solar setup.

screenshot showing Grafana dashboard which contains various charts and graphs of MPP solar inverter voltages and datat
Solar dashboard as seen in Grafana pulling data from InfluxDB

The screenshot shows the last 6 hours worth of data from my system. It was a cloudy and then stormy afternoon/evening here in Denver (we got 0.89 inches of rain since midnight as of typing this post!), so the solar production wasn’t great. The panels are as follows:

  • Temperatures – showing temperatures from 3 sources: the MPP LV2424 inverter heat sink, and both BMS temperature probes (one is on the BMS board itself, the other is a probe on the battery bank). The inverter has at least 2 temperature readings, maybe 3. They all basically show the same thing.
  • Solar input – shows solar voltage as seen by the solar charge controller as well as solar power (in watts) going through the charge controller.
  • Cell voltages – the voltage reading of each of my 8 battery bank cells as reported by the BMS. The green graph also shows the delta between max and min cells. They are still pretty balanced in the flat part of the discharge curve.
  • Total power – a mashup of what the inverter is putting out, what the solar is putting in, the difference between the two, and what the BMS is reporting. I’m still trying to figure out all the nuances here. There is definitely a discrepancy between what the inverter is putting out and what the solar is putting in when the batteries are fully charged. I believe the difference is the power required to keep the transformer energized (typically ranges from 30-60W).
  • DC voltages – as reported by the inverter and the BMS. The inverter is accurate to 0.1V, the BMS goes to 0.01V.
  • AC voltages – shows the input voltage from the grid and the output voltage from the inverter. These will match unless the inverter is disconnected from the grid.
  • Data table – miscellaneous information from the inverter that isn’t graphable
  • Output load – how much output I’m using compared to the inverter’s limit
  • Total Generation – how much total energy has been captured by the inverter/solar panels. This is limited because I’m not back feeding the grid.

Getting data out of the MPP Solar LV2424 inverter with Python to MQTT

I am using two cables to plug the inverter into a computer. The first is the serial cable that came with the inverter. The second is a simple USB to RS232 serial adapter plugged into a Dell Micro 3070.

The computer is running Proxmox, which is a virtual machine hypervisor. That doesn’t matter for this post, we can ignore it. Just pretend the USB cable is plugged directly into a computer running Ubuntu 20.04 Linux.

The main bit of software I’m using is published on GitHub by jblance under the name ‘mpp-solar’ – https://github.com/jblance/mpp-solar. This is a utility written in Python to communicate with MPP inverters.

There was a good bit of fun trying to figure out exactly what command I needed to run to get data out of the inverter as evidenced by the history command:

Trying to figure out the usage of the Python mpp-solar utility

In the end, what worked for me to get the data is the following. I believe the protocol (-P) will be different for different MPP Solar inverters:

sudo mpp-solar -p /dev/ttyUSB0 -b 2400 -P PI18 --getstatus

And the results are below. The grid voltage reported is a bit low because the battery started charging from the grid a few minutes before this was run.

[email protected]:~/mpp-solar-python$ sudo mpp-solar -p /dev/ttyUSB0 -b 2400 -P PI18 --getstatus
Command: ET - Total Generated Energy query
------------------------------------------------------------
Parameter                       Value           Unit
working_mode                    Hybrid mode(Line mode, Grid mode)
grid_voltage                    111.2           V
grid_frequency                  59.9            Hz
ac_output_voltage               111.2           V
ac_output_frequency             59.9            Hz
ac_output_apparent_power        155             VA
ac_output_active_power          139             W
output_load_percent             5               %
battery_voltage                 27.1            V
battery_voltage_from_scc        0.0             V
battery_voltage_from_scc2       0.0             V
battery_discharge_current       0               A
battery_charging_current        19              A
battery_capacity                76              %
inverter_heat_sink_temperature  30              °C
mppt1_charger_temperature       0               °C
mppt2_charger_temperature       0               °C
pv1_input_power                 0               W
pv2_input_power                 0               W
pv1_input_voltage               0.0             V
pv2_input_voltage               0.0             V
setting_value_configuration_state       Something changed
mppt1_charger_status            abnormal
mppt2_charger_status            abnormal
load_connection                 connect
battery_power_direction         charge
dc/ac_power_direction           AC-DC
line_power_direction            input
local_parallel_id               0
total_generated_energy          91190           Wh

And to get the same data right into MQTT I am using the following:

sudo mpp-solar -p /dev/ttyUSB0 -P PI18 --getstatus -o mqtt -q mqtt --mqtttopic mpp

The above command is being run as a cron job once a minute. The default baud rate for the inverter is 2400 bps (yes, bits per second), which is super slow so a full poll takes ~6 seconds. Kind of annoying in 2021 but not a huge problem. The cron entry for the command is this:

# this program feeds a systemd service to convert the outputted mqtt to influx points
* * * * * /usr/local/bin/mpp-solar -p /dev/ttyUSB0 -P PI18 --getstatus -o mqtt -q mqtt --mqtttopic mpp

So with that we have MPP inverter data going to MQTT.

Putting MPP data into InfluxDB from MQTT

Here we need another script written in… take a guess… Python! This Python basically just opens a connection to a MQTT broker and transmits any updates to InfluxDB. The full script is a bit more complicated and I actually stripped a lot out because my MQTT topic names didn’t fit the template the original author used. I have started using this framework in other places to do the MQTT to InfluxDB translation. I like things going to the intermediate MQTT so they can be picked up for easy viewing in Home Assistant. Original code from https://github.com/KHoos/mqtt-to-influxdb-forwarder. The original code seems like it was built to be more robust than what I’m using it for (read: I have no idea what half of it does) but it worked for my purposes.

You’ll need a simple text file with your InfluxDB password and then reference it in the arguments. If your password is ‘password’, the only contents of the file should be ‘password’. I added the isFloat() function to basically make sure that strings weren’t getting added to the numeric tables in InfluxDB. I honestly find the structure/layout of storing stuff in Influx quite confusing so I’m sure there’s a better way to do this.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
####################################
# originally found at/modified from https://github.com/KHoos/mqtt-to-influxdb-forwarder
####################################

# forwarder.py - forwards IoT sensor data from MQTT to InfluxDB
#
# Copyright (C) 2016 Michael Haas <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA

import argparse
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json
import re
import logging
import sys
import requests.exceptions


class MessageStore(object):

        def store_msg(self, node_name, measurement_name, value):
                raise NotImplementedError()

class InfluxStore(MessageStore):

        logger = logging.getLogger("forwarder.InfluxStore")

        def __init__(self, host, port, username, password_file, database):
                password = open(password_file).read().strip()
                self.influx_client = InfluxDBClient(
                        host=host, port=port, username=username, password=password, database=database)

        def store_msg(self, database, sensor, value):
                influx_msg = {
                        'measurement': database,
                        'tags': {'sensor': sensor},
                        'fields': {'value' : value}
                }
                self.logger.debug("Writing InfluxDB point: %s", influx_msg)
                try:
                        self.influx_client.write_points([influx_msg])
                except requests.exceptions.ConnectionError as e:
                        self.logger.exception(e)

class MessageSource(object):

        def register_store(self, store):
                if not hasattr(self, '_stores'):
                        self._stores = []
                self._stores.append(store)

        @property
        def stores(self):
                # return copy
                return list(self._stores)

def isFloat(str_val):
  try:
    float(str_val)
    return True
  except ValueError:
    return False

def convertToFloat(str_val):
    if isFloat(str_val):
        fl_result = float(str_val)
        return fl_result
    else:
        return str_val

class MQTTSource(MessageSource):

        logger = logging.getLogger("forwarder.MQTTSource")

        def __init__(self, host, port, node_names, stringify_values_for_measurements):
                self.host = host
                self.port = port
                self.node_names = node_names
                self.stringify = stringify_values_for_measurements
                self._setup_handlers()

        def _setup_handlers(self):
                self.client = mqtt.Client()

                def on_connect(client, userdata, flags, rc):
                        self.logger.info("Connected with result code  %s", rc)
                        # subscribe to /node_name/wildcard
                        #for node_name in self.node_names:
                        # topic = "{node_name}/#".format(node_name=node_name)
                        topic = "get_status/status/#"
                        self.logger.info("Subscribing to topic %s", topic)
                        client.subscribe(topic)

                def on_message(client, userdata, msg):
                        self.logger.debug("Received MQTT message for topic %s with payload %s", msg.topic, msg.payload)
                        list_of_topics = msg.topic.split('/')
                        measurement = list_of_topics[2]
                        if list_of_topics[len(list_of_topics)-1] == 'unit':
                                value = None
                        else:
                                value = msg.payload
                                decoded_value = value.decode('UTF-8')
                                if isFloat(decoded_value):
                                        str_value = convertToFloat(decoded_value)
                                        for store in self.stores:
                                                store.store_msg("power_measurement",measurement,str_value)
                                else:
                                        for store in self.stores:
                                                store.store_msg("power_measurement_strings",measurement,decoded_value)





                self.client.on_connect = on_connect
                self.client.on_message = on_message

        def start(self):
                print(f"starting mqtt on host: {self.host} and port: {self.port}")
                self.client.connect(self.host, self.port)
                # Blocking call that processes network traffic, dispatches callbacks and
                # handles reconnecting.
                # Other loop*() functions are available that give a threaded interface and a
                # manual interface.
                self.client.loop_forever()


def main():
        parser = argparse.ArgumentParser(
                description='MQTT to InfluxDB bridge for IOT data.')
        parser.add_argument('--mqtt-host', default="mqtt", help='MQTT host')
        parser.add_argument('--mqtt-port', default=1883, help='MQTT port')
        parser.add_argument('--influx-host', default="dashboard", help='InfluxDB host')
        parser.add_argument('--influx-port', default=8086, help='InfluxDB port')
        parser.add_argument('--influx-user', default="power", help='InfluxDB username')
        parser.add_argument('--influx-pass', default="<I have a password here, unclear if the pass-file takes precedence>", help='InfluxDB password')
        parser.add_argument('--influx-pass-file', default="/home/austin/mpp-solar-python/pass.file", help='InfluxDB password file')
        parser.add_argument('--influx-db', default="power", help='InfluxDB database')
        parser.add_argument('--node-name', default='get_status', help='Sensor node name', action="append")
        parser.add_argument('--stringify-values-for-measurements', required=False,      help='Force str() on measurements of the given name', action="append")
        parser.add_argument('--verbose', help='Enable verbose output to stdout', default=False, action='store_true')
        args = parser.parse_args()

        if args.verbose:
                logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
        else:
                logging.basicConfig(stream=sys.stdout, level=logging.WARNING)

        print("creating influxstore")
        store = InfluxStore(host=args.influx_host, port=args.influx_port, username=args.influx_user, password_file=args.influx_pass_file, database=args.influx_db)
        print("creating mqttsource")
        source = MQTTSource(host=args.mqtt_host,
                                                port=args.mqtt_port, node_names=args.node_name,
                                                stringify_values_for_measurements=args.stringify_values_for_measurements)
        print("registering store")
        source.register_store(store)
        print("start")
        source.start()

if __name__ == '__main__':
        main()

Running the MQTT to InfluxDB script as a system daemon

Next up, we need to run the MQTT to InfluxDB Python script as a daemon so it starts with the machine and runs in the background. If you haven’t noticed by now, this is the standard pattern for most of the stuff I do – either a cron job or daemon to get data and another daemon to put it where I want it. Sometimes they’re the same.

[email protected]:~$ cat /etc/systemd/system/mpp-solar.service
[Unit]
Description=MPP inverter data - MQTT to influx
After=multi-user.target

[Service]
User=austin
Type=simple
Restart=always
RestartSec=10
# data feeds this script from a root cronjob running every 60s
ExecStart=/usr/bin/python3 /home/austin/mpp-solar-python/main.py

[Install]
WantedBy=multi-user.target

Then activate it:

[email protected]:~$ sudo systemctl daemon-reload
[email protected]:~$ sudo systemctl enable mpp-solar.service
[email protected]:~$ sudo systemctl start mpp-solar.service
[email protected]:~$ sudo systemctl status mpp-solar.service
● mpp-solar.service - MPP inverter data - MQTT to influx
     Loaded: loaded (/etc/systemd/system/mpp-solar.service; enabled; vendor preset: enabled)
     Active: active (running) since Sat 2021-07-31 02:21:32 UTC; 1 day 13h ago
   Main PID: 462825 (python3)
      Tasks: 1 (limit: 1072)
     Memory: 19.2M
     CGroup: /system.slice/mpp-solar.service
             └─462825 /usr/bin/python3 /home/austin/mpp-solar-python/main.py

Jul 31 02:21:32 mpp-linux systemd[1]: Started MPP inverter data - MQTT to influx.

All done

With that, you should now have data flowing into your InfluxDB instance from your MPP inverter via this Python script. This is exactly what I’m using for my LV2424 but it should work with others like the PIP LV2424 MSD, PIP-4048MS, IPS stuff, LV5048, and probably a lot of others.

Next Steps

Next post will cover designing the Grafana dashboard to show this data.