tibber-influx/DataPoints.py
janolehuebner 0d62cf11e4
All checks were successful
Docker Build and Push / build-and-push (push) Successful in 59s
Update DataPoints.py
fix: udate price accessing to handle nulls
2025-07-19 02:16:18 +02:00

47 lines
1.6 KiB
Python
Executable file

from datetime import datetime
from influxdb_client import Point
import json
class Pulse:
label = str(__name__).lower()
def __init__(self,data):
measurement = data.cache
timestamp = measurement['timestamp']
power = measurement['power']
cost = measurement.get('accumulatedCost',None)
lastMeterConsumption = measurement['lastMeterConsumption']
tags = {self.label: ""}
fields = {
"power": float(power),
"lastMeterConsumption": float(lastMeterConsumption)
}
if cost is not None and float(cost) > 0:
fields.update({'accumulatedCost':cost})
self.datapoint = {"fields": fields,
"tags": tags,
"measurement": "pulse",
"time": timestamp
}
def get_datapoint(self):
return Point(measurement_name=str(self.label)).from_dict(self.datapoint)
class Price:
label = str(__name__).lower()
def __init__(self,data):
measurement = data.cache
current_time_utc = datetime.utcnow()
timestamp = current_time_utc.isoformat()
total = measurement.get("total", 0)
tax = measurement.get("tax", 0)
tags = {self.label: ""}
fields = {
"total": float(total),
"tax": float(tax)
}
self.datapoint = {"fields": fields,
"tags": tags,
"measurement": "pulse",
"time": timestamp
}
def get_datapoint(self):
return Point(measurement_name=str(self.label)).from_dict(self.datapoint)