import json from datetime import datetime, timezone, timedelta from pprint import pprint from typing import Literal import os import mysql.connector import bs4 import pytz import requests import psycopg2 import time import threading import sched from mysql.connector.aio.charsets import charsets class Spritpreise: def __init__(self, location:str, radius:int, fuelType:Literal["diesel", "E5", "E10", "super plus"] = "E10"): self.location = location self.radius = radius self.fuelType = fuelType self.url = "https://www.clever-tanken.de/tankstelle_liste" self.fuelTypes = { "diesel": 3, "super E10": 5, "super plus": 6, "super E5": 7, } self.fuelInfos = {} self.currentFolder = os.path.dirname(os.path.realpath(__file__)) self.dbType = "" self.dbName = "" self.dbHost = "" self.dbPort = "" self.dbUser = "" self.dbPassword = "" def getCurrentTime(self): return datetime.now().strftime("%Y/%m/%d %H:%M:%S") def setDbConnection(self, dbHost:str, dbPort:str, dbUser, dbPassword:str, dbType:Literal["mysql", "pgsql"]): self.dbType = dbType self.dbName = "Spritpreise" self.dbHost = dbHost self.dbPort = dbPort self.dbUser = dbUser self.dbPassword = dbPassword def __getDbConnection(self): if self.dbType == "mysql": connection = mysql.connector.connect(host=self.dbHost, user=self.dbUser, password=self.dbPassword, database=self.dbName, port=self.dbPort) elif self.dbType == "pgsql": connection = psycopg2.connect(dbname=self.dbName, user=self.dbUser, password=self.dbPassword, host=self.dbHost, port=self.dbPort) cursor = connection.cursor() return connection, cursor def __writeLog(self, text: str, printOnConsole=True): now = "" try: now = datetime.now().strftime("%Y/%m/%d %H:%M:%S") with open(os.path.join(self.currentFolder, "spritpreise.log"), "a") as logs: logs.write(f"[{now}] {text}\n") except Exception as e: print(f"Error occurred: {e}") if printOnConsole: print(f"[{now}] {text}\n") def convertType(self, fuelType:str): if isinstance(fuelType, int): return fuelType for name, value in self.fuelTypes.items(): if fuelType.casefold() in (name, *name.split()): return value raise ValueError(f"Unknown type: {fuelType!r}") def iterResults(self, markup): soup = bs4.BeautifulSoup(markup, "html.parser") for result in soup.find_all(class_="list-card-container"): price = result.find(class_="price").text.strip() location = { typ: result.find(class_=f"fuel-station-location-{typ}").text.strip() for typ in ("name", "street", "city") } yield price, location def getPrices(self, fuelType): page = 1 while True: try: print(f"fuelType: {fuelType} page: {page}") query = dict(ort=self.location, r=self.radius, spritsorte=self.convertType(fuelType), page=page) response = requests.get(self.url, query) if response.status_code != 200: break for price, location in self.iterResults(response.text): price = str(price).replace(",", ".") entryName = f'{location["city"]} {location["street"]}' if entryName not in self.fuelInfos: self.fuelInfos[entryName] = {"3": None, "5": None, "6": None, "7": None} self.fuelInfos[entryName]["street"] = location["street"] self.fuelInfos[entryName]["city"] = location["city"] self.fuelInfos[entryName]["name"] = location["name"] self.fuelInfos[entryName]["time"] = datetime.now(pytz.timezone('Europe/Berlin')).strftime('%Y-%m-%d %H:%M:%S%z')[:-2] self.fuelInfos[entryName][str(self.convertType(fuelType))] = price if price.replace(".", "").isnumeric() else None page += 1 except Exception as e: self.__writeLog(f"Error occurred: {e}") time.sleep(5) def getAllPrices(self): start = time.time() """ dieselThread = threading.Thread(target=self.getPrices, args=(3,)) e10Thread = threading.Thread(target=self.getPrices, args=(5,)) e5Thread = threading.Thread(target=self.getPrices, args=(6,)) superPlusThread = threading.Thread(target=self.getPrices, args=(7,)) dieselThread.start() e10Thread.start() e5Thread.start() superPlusThread.start() dieselThread.join() e10Thread.join() e5Thread.join() superPlusThread.join()""" self.getPrices(3) self.getPrices(5) self.getPrices(6) self.getPrices(7) self.insertIntoDb() now = time.time() print(f'[{self.getCurrentTime()}] Total time: {timedelta(seconds=now - start)}') return self def getAllPricesSchedule(self, intervalInSeconds:int=900): while True: nextRun = time.time() + intervalInSeconds print(f"[{self.getCurrentTime()}] Starting") self.getAllPrices() sleepTime = nextRun - time.time() print(f"[{self.getCurrentTime()}] Finished, sleeping for: {sleepTime} ({datetime.fromtimestamp(nextRun).strftime('%Y/%m/%d %H:%M:%S')})") time.sleep(sleepTime) def insertIntoDb(self): connection, cursor = self.__getDbConnection() for key, value in self.fuelInfos.items(): cursor.execute('SELECT id FROM spritpreis_header WHERE city = %s and street = %s', (value['city'], value["street"])) result = cursor.fetchone() if result is None: cursor.execute("INSERT INTO spritpreis_header (city, street, name) VALUES (%s, %s, %s)", (value['city'], value["street"], value["name"])) cursor.execute('SELECT id FROM spritpreis_header WHERE city = %s and street = %s', (value['city'], value["street"])) result = cursor.fetchone() print(result) id = result[0] cursor.execute("INSERT INTO spritpreis_position (of_spritpreis_header, time, diesel_price, e10_price, e5_price, super_plus_price) VALUES (%s, %s, %s, %s, %s, %s)", (id, value["time"], value["3"], value["5"], value["7"], value["6"])) cursor.close() connection.commit() connection.close() return self def exportAsJson(self, outputFile): start = time.time() with open(outputFile, "w", encoding="utf-8") as f: json.dump(self.fuelInfos, f, indent=4) end = time.time() self.__writeLog(f"Wrote JSON in {timedelta(seconds=end - start)}") return self def getDictFromJson(self, inputFile): with open(inputFile, "r", encoding="utf-8") as f: self.fuelInfos = json.load(f) return self