from dateutil import parser
from requests import get, post
import paho.mqtt.client as mqttClient
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG)
loggers_to_set_to_warning = ['urllib3.connectionpool']
for l in loggers_to_set_to_warning:
logging.getLogger(l).setLevel(logging.WARNING)
mqtt_host = "mqtt.example.com"
base_url = "http://ha.example.come:8123/"
states_url = base_url + "api/states/"
switch_url = base_url + "api/services/switch"
full_bearer_token = "Bearer " + bearer_token
"Authorization": full_bearer_token,
"content-type": "application/json"
endpoints = ["climate.masterbedfancooling",
"sensor.real_outside_temp"]
fan_switch_entity_id = "switch.fan_switch"
climate_topic = "climate/fan_control_state"
desired_seconds_to_sleep = 300
max_cool_outdoor_temp_limit = 66
next_fan_action_time = datetime.datetime.now()
def set_fan_switch_state(state):
logging.warn("requested fan state unknown")
entity_info = {"entity_id": fan_switch_entity_id}
full_url = switch_url + "/turn_" + new_fan_state
response = post(full_url, headers=request_headers,
data=json.dumps(entity_info))
if response.status_code != 200:
f"attempted to set fan state to {new_fan_state} but encountered error with status code: {response.status_code}")
logging.info(f"successfully set fan state to {new_fan_state}")
def set_state_from_mqtt_message(message):
global current_state, next_fan_action_time
if message == "max_cool":
current_state = "max_cool"
elif message == "normal_cool":
current_state = "normal_cool"
logging.error(f"unable to determine state, setting to off")
logging.info(f"current_state set to: {current_state}")
next_fan_action_time = datetime.datetime.now()
# Set Connecting Client ID
client = mqttClient.Client("python_window_fan_control")
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(mqtt_host, mqtt_port)
def on_connect(client, userdata, flags, rc):
logging.info("Connected to MQTT Broker!")
logging.info("Failed to connect, return code %d\n", rc)
def on_message(client, userdata, msg):
logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
set_state_from_mqtt_message(msg.payload.decode())
def on_subscribe(client, userdata, mid, granted_qos):
logging.info(f"subscribed to topic")
def set_fan_state(state):
logging.info(f"setting fan state to on")
set_fan_switch_state("on")
logging.info(f"setting fan state to off")
set_fan_switch_state("off")
def get_and_set_temperatures():
global desired_temp, current_temp, outside_temp
logging.debug("executing loop")
for endpoint in endpoints:
full_url = states_url + endpoint
response = get(full_url, headers=request_headers)
parsed_json = json.loads(response.text)
entity = parsed_json['entity_id']
if endpoint == 'climate.masterbedfancooling':
parsed_json['attributes']['temperature'])
parsed_json['attributes']['current_temperature'])
hvac_action = parsed_json['attributes']['hvac_action']
elif endpoint == 'sensor.real_outside_temp':
outside_temp = float(parsed_json['state'])
last_updated = parser.parse(parsed_json['last_updated'])
f"temps: current={current_temp}, desired={desired_temp}, outside={outside_temp}")
if desired_temp == None or current_temp == None or outside_temp == None:
"one or more temps invalid, turning off switch and breaking execution")
client = mqttClient.Client("window-fan-client")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(mqtt_host, mqtt_port)
client.subscribe(climate_topic)
if datetime.datetime.now() > next_fan_action_time:
logging.info("fan action time")
if current_state == "off":
f"current_state is {current_state}, turning fan {new_fan_state}")
get_and_set_temperatures()
if current_state == "max_cool":
f"current_state is {current_state}, call for max cooling")
if outside_temp < max_cool_outdoor_temp_limit:
f"able to max_cool with outside temp: {outside_temp}, lower than {max_cool_outdoor_temp_limit} ")
elif outside_temp < (current_temp - delta_temp):
f"unable to max cool, but still can cool with outside: {outside_temp} and inside: {current_temp}")
logging.info("unable to cool at all, turning fan off")
elif current_state == "normal_cool":
f"current_state is {current_state}")
if (current_temp > desired_temp):
f"call for cooling. current: {current_temp}, desired: {desired_temp}")
if (current_temp > (outside_temp - delta_temp)):
logging.info("can cool, turning fan on")
logging.info("can't cool, turning fan off")
logging.info("no need for cooling, turning fans off")
elif current_state == "on":
f"current_state is {current_state}, turning fan {new_fan_state}")
last_state = current_state
next_fan_action_time = datetime.datetime.now() \
+ datetime.timedelta(seconds=desired_seconds_to_sleep)
logging.info(f"next fan action in {desired_seconds_to_sleep} seconds")
logging.info("---------loop end-------------------")
#logging.debug("not fan action time yet, sleeping 1s")
# actual_seconds_to_sleep = desired_seconds_to_sleep - datetime.datetime.now().minute % desired_seconds_to_sleep
# seconds_to_sleep = actual_minutes_to_sleep * 60.0
# logging.info(f"sleeping {desired_seconds_to_sleep}s")
# time.sleep(desired_seconds_to_sleep)
import json
import datetime
import time
from dateutil import parser
from requests import get, post
import paho.mqtt.client as mqttClient
import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG)
loggers_to_set_to_warning = ['urllib3.connectionpool']
for l in loggers_to_set_to_warning:
logging.getLogger(l).setLevel(logging.WARNING)
delta_temp = 3.0
mqtt_host = "mqtt.example.com"
mqtt_port = 1883
base_url = "http://ha.example.come:8123/"
states_url = base_url + "api/states/"
switch_url = base_url + "api/services/switch"
bearer_token = "ey...Vw"
full_bearer_token = "Bearer " + bearer_token
request_headers = {
"Authorization": full_bearer_token,
"content-type": "application/json"
}
endpoints = ["climate.masterbedfancooling",
"sensor.real_outside_temp"]
fan_switch_entity_id = "switch.fan_switch"
states = {}
climate_topic = "climate/fan_control_state"
current_state = "off"
last_state = None
desired_seconds_to_sleep = 300
max_cool_outdoor_temp_limit = 66
desired_temp = None
outside_temp = None
current_temp = None
next_fan_action_time = datetime.datetime.now()
def set_fan_switch_state(state):
new_fan_state = None
if state == "on":
new_fan_state = "on"
elif state == "off":
new_fan_state = "off"
else:
logging.warn("requested fan state unknown")
return
entity_info = {"entity_id": fan_switch_entity_id}
full_url = switch_url + "/turn_" + new_fan_state
response = post(full_url, headers=request_headers,
data=json.dumps(entity_info))
if response.status_code != 200:
logging.error(
f"attempted to set fan state to {new_fan_state} but encountered error with status code: {response.status_code}")
else:
logging.info(f"successfully set fan state to {new_fan_state}")
def set_state_from_mqtt_message(message):
global current_state, next_fan_action_time
if message == "max_cool":
current_state = "max_cool"
elif message == "normal_cool":
current_state = "normal_cool"
elif message == "off":
current_state = "off"
elif message == "on":
current_state = "on"
else:
logging.error(f"unable to determine state, setting to off")
current_state = "off"
logging.info(f"current_state set to: {current_state}")
next_fan_action_time = datetime.datetime.now()
def connect_mqtt():
# Set Connecting Client ID
client = mqttClient.Client("python_window_fan_control")
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(mqtt_host, mqtt_port)
return client
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT Broker!")
else:
logging.info("Failed to connect, return code %d\n", rc)
def on_message(client, userdata, msg):
logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
set_state_from_mqtt_message(msg.payload.decode())
def on_subscribe(client, userdata, mid, granted_qos):
logging.info(f"subscribed to topic")
def set_fan_state(state):
if state == "on":
logging.info(f"setting fan state to on")
set_fan_switch_state("on")
elif state == "off":
logging.info(f"setting fan state to off")
set_fan_switch_state("off")
def get_and_set_temperatures():
global desired_temp, current_temp, outside_temp
logging.debug("executing loop")
for endpoint in endpoints:
full_url = states_url + endpoint
response = get(full_url, headers=request_headers)
parsed_json = json.loads(response.text)
entity = parsed_json['entity_id']
hvac_action = ""
if endpoint == 'climate.masterbedfancooling':
desired_temp = float(
parsed_json['attributes']['temperature'])
current_temp = float(
parsed_json['attributes']['current_temperature'])
hvac_action = parsed_json['attributes']['hvac_action']
elif endpoint == 'sensor.real_outside_temp':
outside_temp = float(parsed_json['state'])
last_updated = parser.parse(parsed_json['last_updated'])
logging.info(
f"temps: current={current_temp}, desired={desired_temp}, outside={outside_temp}")
if desired_temp == None or current_temp == None or outside_temp == None:
logging.error(
"one or more temps invalid, turning off switch and breaking execution")
set_fan_state("off")
client = mqttClient.Client("window-fan-client")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(mqtt_host, mqtt_port)
client.subscribe(climate_topic)
client.loop_start()
while(True):
# logging.info("loop")
if datetime.datetime.now() > next_fan_action_time:
logging.info("fan action time")
if current_state == "off":
new_fan_state = "off"
logging.info(
f"current_state is {current_state}, turning fan {new_fan_state}")
set_fan_state("off")
else:
get_and_set_temperatures()
if current_state == "max_cool":
logging.info(
f"current_state is {current_state}, call for max cooling")
if outside_temp < max_cool_outdoor_temp_limit:
logging.info(
f"able to max_cool with outside temp: {outside_temp}, lower than {max_cool_outdoor_temp_limit} ")
set_fan_state("on")
elif outside_temp < (current_temp - delta_temp):
logging.info(
f"unable to max cool, but still can cool with outside: {outside_temp} and inside: {current_temp}")
set_fan_state("on")
else:
logging.info("unable to cool at all, turning fan off")
set_fan_state("off")
elif current_state == "normal_cool":
logging.info(
f"current_state is {current_state}")
if (current_temp > desired_temp):
logging.info(
f"call for cooling. current: {current_temp}, desired: {desired_temp}")
if (current_temp > (outside_temp - delta_temp)):
logging.info("can cool, turning fan on")
set_fan_state("on")
else:
logging.info("can't cool, turning fan off")
set_fan_state("off")
else:
logging.info("no need for cooling, turning fans off")
set_fan_state("off")
elif current_state == "on":
new_fan_state = "on"
logging.info(
f"current_state is {current_state}, turning fan {new_fan_state}")
set_fan_state("on")
last_state = current_state
next_fan_action_time = datetime.datetime.now() \
+ datetime.timedelta(seconds=desired_seconds_to_sleep)
logging.info(f"next fan action in {desired_seconds_to_sleep} seconds")
logging.info("---------loop end-------------------")
else:
#logging.debug("not fan action time yet, sleeping 1s")
time.sleep(0.25)
# actual_seconds_to_sleep = desired_seconds_to_sleep - datetime.datetime.now().minute % desired_seconds_to_sleep
# seconds_to_sleep = actual_minutes_to_sleep * 60.0
# logging.info(f"sleeping {desired_seconds_to_sleep}s")
# time.sleep(desired_seconds_to_sleep)