import json import re from datetime import datetime, timezone, timedelta from pprint import pprint from typing import Literal import os import bs4 import pytz import requests import time import threading import sched from mysql.connector.aio.charsets import charsets from src.webScraper.DbEntity import DbEntity class Spritpreise(DbEntity): def __init__(self, location:str, radius:int, fuelType:Literal["diesel", "E5", "E10", "super plus"] = "E10"): self.location = location self.radius = radius self.fuelType = fuelType self.url = "https://www.clever-tanken.de/tankstelle_liste" self.fuelTypes = { "diesel": 3, "super E10": 5, "super plus": 6, "super E5": 7, } self.fuelInfos = {} super().__init__() def getCurrentTime(self): return datetime.now().strftime("%Y/%m/%d %H:%M:%S") def __writeLog(self, text: str, printOnConsole=True): now = "" try: now = datetime.now().strftime("%Y/%m/%d %H:%M:%S") with open(os.path.join(self.currentFolder, "spritpreise.log"), "a") as logs: logs.write(f"[{now}] {text}\n") except Exception as e: print(f"Error occurred: {e}") if printOnConsole: print(f"[{now}] {text}\n") def convertType(self, fuelType:str): if isinstance(fuelType, int): return fuelType for name, value in self.fuelTypes.items(): if fuelType.casefold() in (name, *name.split()): return value raise ValueError(f"Unknown type: {fuelType!r}") def iterResults(self, markup): soup = bs4.BeautifulSoup(markup, "html.parser") for result in soup.find_all(class_="list-card-container"): price = result.find(class_="price").text.strip() location = { typ: result.find(class_=f"fuel-station-location-{typ}").text.strip() for typ in ("name", "street", "city") } yield price, location def getPrices(self, fuelType): page = 1 while True: try: print(f"fuelType: {fuelType} page: {page}") query = dict(ort=self.location, r=self.radius, spritsorte=self.convertType(fuelType), page=page) response = requests.get(self.url, query) if response.status_code != 200: break for price, location in self.iterResults(response.text): price = str(price).replace(",", ".") entryName = f'{location["city"]} {location["street"]}' if entryName not in self.fuelInfos: self.fuelInfos[entryName] = {"3": None, "5": None, "6": None, "7": None} city = re.match("(\d{5}) (.*?)", location["city"]) address = re.match(r"(.+?)\s+(\d+[a-zA-Z]?)$", location["street"]) self.fuelInfos[entryName]["street"] = address.group(1) self.fuelInfos[entryName]["houseNumber"] = address.group(2) self.fuelInfos[entryName]["zipCode"] = city.group(1) self.fuelInfos[entryName]["city"] = city.group(2) self.fuelInfos[entryName]["name"] = location["name"] self.fuelInfos[entryName]["time"] = datetime.now(pytz.timezone('Europe/Berlin')).strftime('%Y-%m-%d %H:%M:%S%z')[:-2] self.fuelInfos[entryName][str(self.convertType(fuelType))] = price if price.replace(".", "").isnumeric() else None page += 1 except Exception as e: self.__writeLog(f"Error occurred: {e}") time.sleep(5) def getAllPrices(self): start = time.time() """ dieselThread = threading.Thread(target=self.getPrices, args=(3,)) e10Thread = threading.Thread(target=self.getPrices, args=(5,)) e5Thread = threading.Thread(target=self.getPrices, args=(6,)) superPlusThread = threading.Thread(target=self.getPrices, args=(7,)) dieselThread.start() e10Thread.start() e5Thread.start() superPlusThread.start() dieselThread.join() e10Thread.join() e5Thread.join() superPlusThread.join()""" self.getPrices(3) self.getPrices(5) self.getPrices(6) self.getPrices(7) self.insertIntoDb() now = time.time() print(f'[{self.getCurrentTime()}] Total time: {timedelta(seconds=now - start)}') return self def getAllPricesSchedule(self, intervalInSeconds:int=900): while True: nextRun = time.time() + intervalInSeconds print(f"[{self.getCurrentTime()}] Starting") self.getAllPrices() sleepTime = nextRun - time.time() print(f"[{self.getCurrentTime()}] Finished, sleeping for: {sleepTime} ({datetime.fromtimestamp(nextRun).strftime('%Y/%m/%d %H:%M:%S')})") time.sleep(sleepTime) def insertIntoDb(self): connection, cursor = self._getSqlLiteConnection() for key, value in self.fuelInfos.items(): cursor.execute('SELECT id FROM spritpreis_header WHERE city = ? and street = ?', (value['city'], value["street"])) result = cursor.fetchone() if result is None: cursor.execute("INSERT INTO spritpreis_header (city, street, name, houseNumber, zipCode) VALUES (?, ?, ?, ?, ?)", (value['city'], value["street"], value["name"], value["houseNumber"], value["zipCode"])) cursor.execute('SELECT id FROM spritpreis_header WHERE city = ? and street = ?', (value['city'], value["street"])) result = cursor.fetchone() print(result) cursor.execute("INSERT INTO spritpreis_position (of_spritpreis_header, time, diesel_price, e10_price, e5_price, super_plus_price) VALUES (?, ?, ?, ?, ?, ?)", (result[0], value["time"], value["3"], value["5"], value["7"], value["6"])) cursor.close() connection.commit() connection.close() return self def exportAsJson(self, outputFile:str): start = time.time() with open(outputFile, "w", encoding="utf-8") as f: json.dump(self.fuelInfos, f, indent=4) end = time.time() self.__writeLog(f"Wrote JSON in {timedelta(seconds=end - start)}") return self def getDictFromJson(self, inputFile:str): with open(inputFile, "r", encoding="utf-8") as f: self.fuelInfos = json.load(f) return self