581 lines
21 KiB
Python

# Zeiterfassung
import hashlib
# User bezogene Funktionen
import os
from calendar import monthrange
from stat import S_IREAD, S_IWUSR
from nicegui import ui
import datetime
import time
import json
import shutil
import re
from lib.definitions import userfolder, scriptpath, usersettingsfilename, photofilename, status_in, status_out, \
standard_adminsettings, standard_usersettings, va_file, is_docker
# Benutzerklasse
class user:
def __init__(self, name):
if not is_docker():
self.userfolder = os.path.join(userfolder, name)
else:
self.userfolder = os.path.join("/users", name)
self.settingsfile = os.path.join(self.userfolder, usersettingsfilename)
self.photofile = os.path.join(self.userfolder, photofilename)
# Stammdaten einlesen
with open(self.settingsfile) as json_file:
data = json.load(json_file)
self.password = data["password"]
self.workhours = data["workhours"]
self.username = data["username"]
self.fullname = data["fullname"]
self.api_key = data["api_key"]
def get_stamp_file(self, time_stamp=None):
if time_stamp == None:
year = str(datetime.datetime.now().year)
month = str(datetime.datetime.now().month)
else:
year = str(datetime.datetime.fromtimestamp(time_stamp).year)
month = str(datetime.datetime.fromtimestamp(time_stamp).month)
completepath = os.path.join(self.userfolder, f"{year}-{month}")
return completepath
def timestamp(self, stamptime=-1):
if stamptime == -1:
stamptime = time.time()
timestamp = int(stamptime)
filename = f"{self.get_stamp_file(time_stamp=stamptime)}.txt"
try:
# Öffne die Datei im Anhang-Modus ('a')
with open(filename, 'a') as file:
# Schreibe den Timestamp in die Datei und füge einen Zeilenumbruch hinzu
file.write(f"{timestamp}\n")
except FileNotFoundError:
# Fehlende Verzeichnisse anlegen
folder_path = os.path.dirname(filename)
os.makedirs(folder_path, exist_ok=True)
self.timestamp()
# Nach zugehörigem JSON-File suchen und bei Bedarf anlegen
json_filename = f"{self.get_stamp_file()}.json"
try:
with open(json_filename, 'r') as json_file:
pass
except:
dict = { }
dict["archived"] = 0
dict["total_hours"] = 0
json_dict = json.dumps(dict, indent=4)
with open(json_filename, 'w') as json_file:
json_file.write(json_dict)
def stamp_status(self):
try:
# Öffne die Datei im Lese-Modus ('r')
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
# Zähle die Zeilen
lines = file.readlines()
except FileNotFoundError:
print(f"Die Datei {self.get_stamp_file()}.txt wurde nicht gefunden.")
print("Lege die Datei an.")
with open(f'{self.get_stamp_file()}.txt', 'w') as file:
file.write("")
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
# Zähle die Zeilen
lines = file.readlines()
if len(lines)== 0:
pass
elif len(lines) % 2 == 0:
return status_out
else:
return status_in
def last_2_timestmaps(self):
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
lines = file.readlines()
file.close()
if len(lines) > 2:
second_last_line = int(lines[-2])
last_line = int(lines[-1])
last_2_timestamps = [second_last_line, last_line]
return last_2_timestamps
elif len(lines) == 1:
return int(lines[0])
else:
return -1
def write_settings(self):
dict = { }
dict["username"] = self.username
dict["fullname"] = self.fullname
dict["password"] = self.password
dict["workhours"] = self.workhours
dict["api_key"] = self.api_key
json_dict = json.dumps(dict, indent=4)
with open(self.settingsfile, "w") as outputfile:
outputfile.write(json_dict)
pathcheck = self.userfolder
if not is_docker():
pathcheck = pathcheck.removeprefix(os.path.join(userfolder))
if pathcheck != self.username:
os.rename(self.userfolder, os.path.join(userfolder, self.username))
else:
pathcheck = pathcheck.removeprefix("/users")
if pathcheck != self.username:
os.rename(self.userfolder, os.path.join(userfolder, self.username))
def del_user(self):
shutil.rmtree(self.userfolder)
def get_starting_day(self):
starting_date = list(self.workhours)
starting_date.sort()
year = str(starting_date[0])[:4]
month = str(starting_date[0])[5:7]
day = str(starting_date[0])[8:10]
return [year, month, day]
def get_years(self):
years = [ ]
# Aktuelles Jahr bestimmen
year_now = int(datetime.datetime.fromtimestamp(time.time()).strftime('%Y'))
for i in range(int(self.get_starting_day()[0]), year_now + 1):
years.append(str(i))
for file in os.listdir(self.userfolder):
if re.match(r"\d{4}-\d{1,2}\.json", file):
year = file.split("-")[0]
if year not in years:
years.append(year)
years.sort()
return years
def get_months(self, year):
available_months = [ ]
# Anfangsdatum bestimmen
start_year = int(self.get_starting_day()[0])
start_month = int(self.get_starting_day()[1])
year_now = int(datetime.datetime.now().year)
month_now = int(datetime.datetime.now().month)
if start_year == int(year):
if start_year == year_now:
for i in range(start_month, month_now + 1):
available_months.append(i)
elif start_year < year_now:
for i in range(start_month, 13):
available_months.append(i)
else:
if int(year) == year_now:
for i in range(1, month_now + 1):
available_months.append(i)
elif int(year) < year_now:
for i in range(1, 13):
available_months.append(i)
for file in os.listdir(self.userfolder):
if re.match(r"\d{4}-\d{1,2}\.json", file):
if file.split("-")[0] == str(year):
month = int(file.split("-")[1].split(".")[0])
if month not in available_months:
available_months.append(month)
available_months.sort()
return available_months
def get_timestamps(self, year, month):
try:
with open(os.path.join(self.userfolder, f"{year}-{month}.txt"), "r") as file:
timestamps = file.readlines()
timestamps.sort()
return timestamps
except:
timestamps = [ ]
return timestamps
def write_edited_timestamps(self, timestamps, year, month):
with open(f"{self.userfolder}/{year}-{month}.txt", "w") as file:
file.write(''.join(timestamps))
def get_archive_status(self, year, month):
try:
with open(os.path.join(self.userfolder, f"{year}-{month}.json"), 'r') as json_file:
data = json.load(json_file)
return data["archived"]
except FileNotFoundError:
return False
except:
return -1
def archiving_validity_check(self, year: int, month: int):
timestampfilename = os.path.join(self.userfolder, f"{year}-{month}.txt")
try:
with open(timestampfilename) as timestampfile:
timestamps = timestampfile.readlines()
timestamps.sort()
days_with_errors = [ ]
for day in range(1, monthrange(year, month)[1] + 1):
day_dt = datetime.datetime(year, month, day)
timestamps_of_this_day = [ ]
for i in timestamps:
i_dt = datetime.datetime.fromtimestamp(int(i))
if day_dt.year == i_dt.year and day_dt.month == i_dt.month and day_dt.day == i_dt.day:
timestamps_of_this_day.append(i)
if len(timestamps_of_this_day) % 2 != 0:
days_with_errors.append(day)
return days_with_errors
except:
return [ ]
def archive_hours(self, year, month, overtime: int):
filename = os.path.join(self.userfolder, f"{year}-{month}.json")
with open(filename, 'r') as json_file:
data = json.load(json_file)
data["archived"] = 1
data["overtime"] = overtime
json_dict = json.dumps(data, indent=4)
with open(filename, "w") as outputfile:
outputfile.write(json_dict)
# Dateien auf readonly setzen
os.chmod(filename, S_IREAD)
filename_txt = os.path.join(self.userfolder, f"{year}-{month}.txt")
os.chmod(filename_txt, S_IREAD)
def get_last_months_overtime(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month):
try:
if int(month) == 1:
year = str(int(year) - 1)
month = str(12)
else:
month = str(int(month) - 1)
with open(os.path.join(self.userfolder, f"{year}-{month}.json"), "r") as json_file:
json_data = json.load(json_file)
if json_data["archived"] == 1:
overtime = int(json_data["overtime"])
return overtime
else:
return 0
except:
return 0
def get_absence(self, year, month):
try:
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
absence = json_data["absence"]
return absence
except:
return { }
def get_day_notes(self, year, month, day):
try:
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
day_note = json_data["notes"][str(day)]
return day_note
except:
return { }
def write_notes(self, year, month, day, note_dict):
try:
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
except FileNotFoundError:
dict = {}
dict["archived"] = 0
dict["total_hours"] = 0
dict["notes"] = { }
json_dict = json.dumps(dict, indent=4)
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), 'w') as json_file:
json_file.write(json_dict)
json_data = dict
if len(note_dict) == 1:
user_info = list(note_dict)[0]
try:
json_data["notes"]
except KeyError:
json_data["notes"] = { }
json_data["notes"][str(day)] = { }
json_data["notes"][str(day)][user_info] = note_dict[user_info]
if json_data["notes"][str(day)][user_info] == "":
del json_data["notes"][str(day)][user_info]
else:
json_data["notes"][str(day)] = note_dict
json_output = json.dumps(json_data, indent=4)
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
json_file.write(json_output)
def update_absence(self, year, month, day, absence_type):
try:
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
except:
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
json_data = { }
json_data["archived"] = 0
json_data["overtime"] = 0
json_dict = json.dumps(json_data, indent=4)
json_file.write(json_dict)
try:
json_data["absence"][str(int(day))] = absence_type
except:
json_data.update({ "absence": { str(int(day)): absence_type}})
json_dict = json.dumps(json_data, indent=4)
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
json_file.write(json_dict)
def del_absence(self, year, month, day):
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
del json_data["absence"][str(day)]
json_dict = json.dumps(json_data, indent=4)
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
json_file.write(json_dict)
def get_day_workhours(self, year, month, day):
#global hours_to_work
workhour_entries = list(self.workhours)
workhour_entries.sort()
day_to_check = datetime.datetime(int(year), int(month), int(day))
# Fertage prüfen
settings = load_adminsettings()
holidays = list(settings["holidays"])
today_dt = datetime.datetime(int(year), int(month), int(day))
check_date_list = [ ]
for i in holidays:
i_split = i.split("-")
check_year = int(i_split[0])
check_month = int(i_split[1])
check_day = int(i_split[2])
check_dt = datetime.datetime(check_year, check_month, check_day)
check_date_list.append(check_dt)
if today_dt in check_date_list:
return 0
# Wochenarbeitszeit durchsuchen
for entry in reversed(workhour_entries):
entry_split = entry.split("-")
entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2]))
if entry_dt <= day_to_check:
weekday = day_to_check.strftime("%w")
if int(weekday) == 0:
weekday = str(7)
hours_to_work = self.workhours[entry][weekday]
break
else:
# Wenn vor Einstellungsdatum -1 ausgeben
hours_to_work = -1
return hours_to_work
def get_vacation_claim(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
workhour_entries = list(self.workhours)
workhour_entries.sort()
day_to_check = datetime.datetime(int(year), int(month), int(day))
claim = -1
for entry in reversed(workhour_entries):
entry_split = entry.split("-")
entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2]))
if entry_dt <= day_to_check:
claim = self.workhours[entry]["vacation"]
break
return int(claim)
def count_absence_days(self, absence_code: str, year=datetime.datetime.now().year):
absence_days = 0
for month in range(0, 13):
try:
absence_dict = self.get_absence(year, month)
for entry, absence_type in absence_dict.items():
if absence_type == absence_code:
absence_days += 1
except:
pass
return absence_days
def delete_photo(self):
os.remove(self.photofile)
def get_day_timestamps(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
timestamps = self.get_timestamps(year, month)
check_day_dt = datetime.datetime(year, month, day)
todays_timestamps = []
for i in timestamps:
i_dt = datetime.datetime.fromtimestamp(int(i))
if i_dt.date() == check_day_dt.date():
todays_timestamps.append(int(i))
todays_timestamps.sort()
return todays_timestamps
def get_worked_time(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
todays_timestamps = self.get_day_timestamps(year, month, day)
if len(todays_timestamps) % 2 == 0:
workrange = len(todays_timestamps)
in_time_stamp = -1
else:
workrange = len(todays_timestamps) - 1
in_time_stamp = int(todays_timestamps[-1])
total_time = 0
for i in range(0, workrange, 2):
time_worked = todays_timestamps[i + 1] - todays_timestamps[i]
total_time += time_worked
return [total_time, in_time_stamp]
def vacation_application(self, startdate, enddate):
application_file = os.path.join(self.userfolder, va_file)
try:
with open(application_file, 'r') as json_file:
applications = json.load(json_file)
except FileNotFoundError:
applications = { }
applications[str(len(list(applications)))] = (startdate, enddate)
with open(application_file, 'w') as json_file:
json_file.write(json.dumps(applications, indent=4))
def get_open_vacation_applications(self):
application_file = os.path.join(self.userfolder, va_file)
try:
with open(application_file, 'r') as json_file:
applications = json.load(json_file)
except FileNotFoundError:
applications = { }
return applications
def revoke_vacation_application(self, index):
application_file = os.path.join(self.userfolder, va_file)
with open(application_file, 'r') as json_file:
applications = json.load(json_file)
try:
del(applications[index])
new_applications = { }
new_index = 0
for index, dates in applications.items():
new_applications[new_index] = dates
new_index += 1
with open(application_file, 'w') as json_file:
json_file.write(json.dumps(new_applications, indent=4))
return 0
except KeyError:
ui.notify("Urlaubsantrag wurde schon bearbeitet")
return -1
# Benutzer auflisten
def list_users():
if not os.path.exists(userfolder):
print("Kein Benutzerverzeichnis gefunden. Lege es an.")
os.makedirs(userfolder)
users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))]
if len(users) == 0:
print("Keine Benutzer gefunden. Lege Standardbenutzer an.")
new_user("default")
users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))]
users.sort()
return users
def new_user(username: str):
if not os.path.exists(userfolder):
os.makedirs(userfolder)
if not os.path.exists(os.path.join(userfolder, username)):
os.makedirs(os.path.join(userfolder, username))
start_date_dt = datetime.datetime.now()
start_date = start_date_dt.strftime("%Y-%m-%d")
settings_to_write = standard_usersettings
settings_to_write["workhours"][start_date] = { }
settings_to_write["fullname"] = username
settings_to_write["username"] = username
# API-Key erzeugen
string_to_hash = f'{username}_{datetime.datetime.now().timestamp()}'
hash_string = hashlib.shake_256(bytes(string_to_hash, 'utf-8')).hexdigest(20)
settings_to_write["api_key"] = hash_string
for i in range(1, 8):
settings_to_write["workhours"][start_date][str(i)] = 0
settings_to_write["workhours"][start_date]["vacation"] = 0
with open(f"{userfolder}/{username}/{usersettingsfilename}", 'w') as json_file:
json_dict = json.dumps(standard_usersettings, indent=4)
json_file.write(json_dict)
# Admineinstellungen auslesen
def load_adminsettings():
# Settingsdatei einlesen
settings_filename = os.path.join(scriptpath, usersettingsfilename)
if not os.path.exists(settings_filename):
print("Keine Einstellungsdatei gefunden. Lege Standarddatei an.")
with open(settings_filename, 'w') as json_file:
json_dict = json.dumps(standard_adminsettings, indent=4)
json_file.write(json_dict)
try:
with open(settings_filename) as json_file:
data = json.load(json_file)
return data
except:
return -1
# bestimmte Admineinstellungen speichern
def write_adminsetting(key: str, value):
settings_filename = os.path.join(scriptpath, usersettingsfilename)
admin_settings = load_adminsettings()
try:
admin_settings[key] = value
json_data = json.dumps(admin_settings, indent=4)
with open(settings_filename, 'w') as output_file:
output_file.write(json_data)
except KeyError:
print(f"Kein Einstellungsschlüssel {key} vorhanden.")