diff --git a/pulse.py b/pulse.py index 050444d..6e946cd 100644 --- a/pulse.py +++ b/pulse.py @@ -1,59 +1,3 @@ -import asyncio -import os -import signal -import sys -import logging - -import tibber -from influxdb_client import InfluxDBClient -from influxdb_client.client.write_api import SYNCHRONOUS -from DataPoints import Pulse, Price - -TOKEN = os.getenv('TOKEN', '') -TIBBERTOKEN = os.getenv('TIBBERTOKEN', '') -URL = os.getenv('URL', "") -BUCKET = os.getenv('BUCKET', "tibber") -ORG = os.getenv('ORG', "Default") - -# Logging configuration -logger = logging.getLogger("TibberInflux") -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -ch = logging.StreamHandler(sys.stdout) -ch.setFormatter(formatter) -logger.addHandler(ch) -logger.setLevel(logging.INFO) - -__version__ = "v0.4.0" -logger.info(__version__) - -# Initialize InfluxDB client -client = InfluxDBClient(url=URL, token=TOKEN, org=ORG) -write_api = client.write_api(write_options=SYNCHRONOUS) -query_api = client.query_api() - -# Initialize Tibber account -account = tibber.Account(TIBBERTOKEN) -home = account.homes[0] - - -def _incoming(data): - try: - p = Pulse(data).get_datapoint() - write_api.write(record=p, bucket=BUCKET) - logger.info(p) - except Exception as e: - logger.exception("Error in _incoming():") - raise e - - -def timeout_handler(signum, frame): - raise TypeError("Timeout occurred") - - -async def show_current_power(data): - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(8) - _incoming(data) async def periodic_task(): @@ -69,36 +13,85 @@ async def periodic_task(): await asyncio.sleep(3) -async def main(): - loop = asyncio.get_event_loop() +import asyncio +import os +import signal +import sys + +import tibber +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS +from DataPoints import Pulse +import logging + +TOKEN = os.getenv('TOKEN', '') +TIBBERTOKEN = os.getenv('TIBBERTOKEN', '') +URL = os.getenv('URL', "") +BUCKET = os.getenv('BUCKET', "tibber") +ORG = os.getenv('ORG', "Default") + +# Logging configuration +logger = logging.getLogger("TibberInflux") +formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch = logging.StreamHandler(sys.stdout) +ch.setFormatter(formatter) +logger.addHandler(ch) +logger.setLevel(logging.INFO) + +__version__ = "v0.3.4" +logger.info(__version__) +loop = asyncio.get_event_loop() + +logger.info("connecting to db...") +client = InfluxDBClient(url=URL, token=TOKEN, org=ORG) +write_api = client.write_api(write_options=SYNCHRONOUS) +query_api = client.query_api() +logger.info("connecting to tibber...") +account = tibber.Account(TIBBERTOKEN) +home = account.homes[0] +logger.info("starting subscription...") + +def _incoming(data): + try: + p = Pulse(data).get_datapoint() + write_api.write(record=p, bucket=BUCKET) + logger.info(p) + except Exception as e: + logger.exception("Error in _incoming():") + raise e + +def timeout_handler(signum, frame): + raise TypeError("Timeout occurred") + +@home.event("live_measurement") +async def show_current_power(data): + signal.alarm(8) + _incoming(data) + +def stop(home): + return False + +try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(15) + home.start_live_feed(user_agent="pulse.py/0.3.4",exit_condition=stop(home),retries=2,retry_interval=3.0) + periodic_task_task = loop.create_task(periodic_task()) + + # Wait for both tasks to complete +except TypeError: + logger.exception("Timeout occurred while executing start_live_feed()") + client.close() + exit(1) +except Exception: + logger.exception("Error in start_live_feed()") + client.close() + exit(41) + + + + + - try: - # Start the live feed in a separate task - home_feed_task = loop.create_task(home.start_live_feed( - user_agent="pulse.py/0.4.0", - exit_condition=lambda: False, - retries=2, - retry_interval=3.0, - callback=show_current_power - )) - - # Start the periodic task - periodic_task_task = loop.create_task(periodic_task()) - - # Wait for both tasks to complete - await asyncio.gather(home_feed_task, periodic_task_task) - - except TypeError: - logger.exception("Timeout occurred while executing start_live_feed()") - client.close() - sys.exit(1) - except Exception: - logger.exception("Error in start_live_feed()") - client.close() - sys.exit(41) -if __name__ == "__main__": - asyncio.run(main())