Merge pull request #8 from janolehuebner/dev

getting prices via etra script
This commit is contained in:
Jan-Ole Hübner 2023-09-09 03:14:57 +02:00 committed by GitHub
commit 88c5603310
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 6 deletions

View file

@ -1,3 +1,4 @@
from datetime import datetime
from influxdb_client import Point from influxdb_client import Point
class Pulse: class Pulse:
label = str(__name__).lower() label = str(__name__).lower()
@ -22,3 +23,22 @@ class Pulse:
def get_datapoint(self): def get_datapoint(self):
return Point(measurement_name=str(self.label)).from_dict(self.datapoint) return Point(measurement_name=str(self.label)).from_dict(self.datapoint)
class Price:
label = str(__name__).lower()
def __init__(self,data):
measurement = data['data']['viewer']['homes'][0]['currentSubscription']['priceInfo']['current']
current_time_utc = datetime.utcnow()
timestamp = current_time_utc.isoformat()
total = measurement.get('total',0.34)
tags = {self.label: ""}
fields = {
"total": float(total),
}
self.datapoint = {"fields": fields,
"tags": tags,
"measurement": "pulse",
"time": timestamp
}
def get_datapoint(self):
return Point(measurement_name=str(self.label)).from_dict(self.datapoint)

View file

@ -12,5 +12,5 @@ ADD . /
# Chmod # Chmod
RUN chmod 755 /pulse.py RUN chmod 755 /pulse.py
CMD ["python3","/get_price.py"]
CMD ["python3","/pulse.py"] CMD ["python3","/pulse.py"]

46
get_price.py Normal file
View file

@ -0,0 +1,46 @@
import os
import requests
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from DataPoints import Price
TOKEN=os.getenv('TOKEN', '')
TIBBERTOKEN=os.getenv('TIBBERTOKEN', '')
URL = os.getenv('URL',"" )
BUCKET = os.getenv('BUCKET',"tibber" )
ORG = os.getenv('ORG',"Default" )
# URL of the Tibber API
url = 'https://api.tibber.com/v1-beta/gql'
# Request headers
headers = {
'authority': 'api.tibber.com',
'accept': 'application/json',
'authorization': f'Bearer {TIBBERTOKEN}',
'content-type': 'application/json',
'user-agent': 'python 3.10',
}
# GraphQL query
query = '{"query":"{ viewer { homes { currentSubscription { priceInfo { current { total } } } } } }"}'
client = InfluxDBClient(url=URL, token=TOKEN, org=ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# Make the HTTP POST request
response = requests.post(url, headers=headers, data=query)
# Check for successful response
if response.status_code == 200:
# Print the response content (JSON data)
print(response.json())
else:
print(f'Error: HTTP {response.status_code}')
p = Price(response.json()).get_datapoint()
write_api.write(record=p, bucket=BUCKET)

View file

@ -10,15 +10,13 @@ from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from DataPoints import Pulse from DataPoints import Pulse
import logging import logging
import time
TOKEN=os.getenv('TOKEN', '') TOKEN=os.getenv('TOKEN', '')
TIBBERTOKEN=os.getenv('TIBBERTOKEN', '') TIBBERTOKEN=os.getenv('TIBBERTOKEN', '')
URL = os.getenv('URL',"" ) URL = os.getenv('URL',"" )
BUCKET = os.getenv('BUCKET',"tibber" ) BUCKET = os.getenv('BUCKET',"tibber" )
ORG = os.getenv('ORG',"Default" ) ORG = os.getenv('ORG',"Default" )
#logging #logging
logger = logging.getLogger("TibberInflux") logger = logging.getLogger("TibberInflux")
formatter = logging.Formatter( formatter = logging.Formatter(
@ -32,7 +30,7 @@ logger.addHandler(ch)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
__version__ = "v0.0.6" __version__ = "v0.1.1"
logger.info(__version__) logger.info(__version__)
client = InfluxDBClient(url=URL, token=TOKEN, org=ORG) client = InfluxDBClient(url=URL, token=TOKEN, org=ORG)
@ -70,4 +68,3 @@ async def run():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(run()) loop.run_until_complete(run())