This commit is contained in:
2024-11-03 18:16:23 +01:00
commit ced5d8c5ca
5 changed files with 223 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.idea
test.*
*.db
db.*
*.log

0
main.py Normal file
View File

206
src/Spritpreise.py Normal file
View File

@@ -0,0 +1,206 @@
import json
from datetime import datetime, timezone, timedelta
from pprint import pprint
from typing import Literal
import os
import mysql.connector
import bs4
import pytz
import requests
import psycopg2
import time
import threading
import sched
from mysql.connector.aio.charsets import charsets
class Spritpreise:
def __init__(self, location:str, radius:int, fuelType:Literal["diesel", "E5", "E10", "super plus"] = "E10"):
self.location = location
self.radius = radius
self.fuelType = fuelType
self.url = "https://www.clever-tanken.de/tankstelle_liste"
self.fuelTypes = {
"diesel": 3,
"super E10": 5,
"super plus": 6,
"super E5": 7,
}
self.fuelInfos = {}
self.currentFolder = os.path.dirname(os.path.realpath(__file__))
self.dbType = ""
self.dbName = ""
self.dbHost = ""
self.dbPort = ""
self.dbUser = ""
self.dbPassword = ""
def getCurrentTime(self):
return datetime.now().strftime("%Y/%m/%d %H:%M:%S")
def setDbConnection(self, dbHost:str, dbPort:str, dbUser, dbPassword:str, dbType:Literal["mysql", "pgsql"]):
self.dbType = dbType
self.dbName = "Spritpreise"
self.dbHost = dbHost
self.dbPort = dbPort
self.dbUser = dbUser
self.dbPassword = dbPassword
def __getDbConnection(self):
if self.dbType == "mysql":
connection = mysql.connector.connect(host=self.dbHost, user=self.dbUser, password=self.dbPassword,
database=self.dbName, port=self.dbPort)
elif self.dbType == "pgsql":
connection = psycopg2.connect(dbname=self.dbName, user=self.dbUser, password=self.dbPassword,
host=self.dbHost, port=self.dbPort)
cursor = connection.cursor()
return connection, cursor
def __writeLog(self, text: str, printOnConsole=True):
now = ""
try:
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
with open(f"{self.currentFolder}\\spritpreise.log", "a") as logs:
logs.write(f"[{now}] {text}\n")
except Exception as e:
print(f"Error occurred: {e}")
if printOnConsole:
print(f"[{now}] {text}\n")
def convertType(self, fuelType:str):
if isinstance(fuelType, int):
return fuelType
for name, value in self.fuelTypes.items():
if fuelType.casefold() in (name, *name.split()):
return value
raise ValueError(f"Unknown type: {fuelType!r}")
def iterResults(self, markup):
soup = bs4.BeautifulSoup(markup, "html.parser")
for result in soup.find_all(class_="list-card-container"):
price = result.find(class_="price").text.strip()
location = {
typ: result.find(class_=f"fuel-station-location-{typ}").text.strip()
for typ in ("name", "street", "city")
}
yield price, location
def getPrices(self, fuelType):
page = 1
while True:
try:
print(f"fuelType: {fuelType} page: {page}")
query = dict(ort=self.location, r=self.radius,
spritsorte=self.convertType(fuelType),
page=page)
response = requests.get(self.url, query)
if response.status_code != 200:
break
for price, location in self.iterResults(response.text):
price = str(price).replace(",", ".")
entryName = f'{location["city"]} {location["street"]}'
if entryName not in self.fuelInfos:
self.fuelInfos[entryName] = {"3": None, "5": None, "6": None, "7": None}
self.fuelInfos[entryName]["street"] = location["street"]
self.fuelInfos[entryName]["city"] = location["city"]
self.fuelInfos[entryName]["name"] = location["name"]
self.fuelInfos[entryName]["time"] = datetime.now(pytz.timezone('Europe/Berlin')).strftime('%Y-%m-%d %H:%M:%S%z')[:-2]
self.fuelInfos[entryName][str(self.convertType(fuelType))] = price if price.replace(".", "").isnumeric() else None
page += 1
except Exception as e:
self.__writeLog(f"Error occurred: {e}")
time.sleep(5)
def getAllPrices(self):
start = time.time()
"""
dieselThread = threading.Thread(target=self.getPrices, args=(3,))
e10Thread = threading.Thread(target=self.getPrices, args=(5,))
e5Thread = threading.Thread(target=self.getPrices, args=(6,))
superPlusThread = threading.Thread(target=self.getPrices, args=(7,))
dieselThread.start()
e10Thread.start()
e5Thread.start()
superPlusThread.start()
dieselThread.join()
e10Thread.join()
e5Thread.join()
superPlusThread.join()"""
self.getPrices(3)
self.getPrices(5)
self.getPrices(6)
self.getPrices(7)
self.insertIntoDb()
now = time.time()
print(f'[{self.getCurrentTime()}] Total time: {timedelta(seconds=now - start)}')
return self
def getAllPricesSchedule(self, intervalInSeconds:int=900):
while True:
nextRun = time.time() + intervalInSeconds
print(f"[{self.getCurrentTime()}] Starting")
self.getAllPrices()
sleepTime = nextRun - time.time()
print(f"[{self.getCurrentTime()}] Finished, sleeping for: {sleepTime} ({datetime.fromtimestamp(nextRun).strftime('%Y/%m/%d %H:%M:%S')})")
time.sleep(sleepTime)
def insertIntoDb(self):
connection, cursor = self.__getDbConnection()
for key, value in self.fuelInfos.items():
cursor.execute('SELECT id FROM spritpreis_header WHERE city = %s and street = %s', (value['city'], value["street"]))
result = cursor.fetchone()
if result is None:
cursor.execute("INSERT INTO spritpreis_header (city, street, name) VALUES (%s, %s, %s)", (value['city'], value["street"], value["name"]))
cursor.execute('SELECT id FROM spritpreis_header WHERE city = %s and street = %s', (value['city'], value["street"]))
result = cursor.fetchone()
print(result)
id = result[0]
cursor.execute("INSERT INTO spritpreis_position (of_spritpreis_header, time, diesel_price, e10_price, e5_price, super_plus_price) VALUES (%s, %s, %s, %s, %s, %s)",
(id, value["time"], value["3"], value["5"], value["7"], value["6"]))
cursor.close()
connection.commit()
connection.close()
return self
def exportAsJson(self, outputFile):
start = time.time()
with open(outputFile, "w", encoding="utf-8") as f:
json.dump(self.fuelInfos, f, indent=4)
end = time.time()
self.__writeLog(f"Wrote JSON in {timedelta(seconds=end - start)}")
return self
def getDictFromJson(self, inputFile):
with open(inputFile, "r", encoding="utf-8") as f:
self.fuelInfos = json.load(f)
return self

7
src/main.py Normal file
View File

@@ -0,0 +1,7 @@
from Spritpreise import Spritpreise
from db import *
if __name__ == "__main__":
sprit = Spritpreise(location="Linnich", radius=30)
sprit.setDbConnection(dbHost, dbPort, dbUser, dbPassword, dbType)
sprit.getAllPricesSchedule()

5
src/requerments.txt Normal file
View File

@@ -0,0 +1,5 @@
beautifulsoup4
requests
psycopg2
mysql-connector-python
pytz