# Zeiterfassung import hashlib # User bezogene Funktionen import os from calendar import monthrange from stat import S_IREAD, S_IWUSR from nicegui import ui import datetime import time import json import shutil import re from lib.definitions import userfolder, scriptpath, usersettingsfilename, photofilename, status_in, status_out, \ standard_adminsettings, standard_usersettings, va_file, is_docker # Benutzerklasse class user: def __init__(self, name): if not is_docker(): self.userfolder = os.path.join(userfolder, name) else: self.userfolder = os.path.join("/users", name) self.settingsfile = os.path.join(self.userfolder, usersettingsfilename) self.photofile = os.path.join(self.userfolder, photofilename) # Stammdaten einlesen with open(self.settingsfile) as json_file: data = json.load(json_file) self.password = data["password"] self.workhours = data["workhours"] self.username = data["username"] self.fullname = data["fullname"] self.api_key = data["api_key"] def get_stamp_file(self, time_stamp=None): if time_stamp == None: year = str(datetime.datetime.now().year) month = str(datetime.datetime.now().month) else: year = str(datetime.datetime.fromtimestamp(time_stamp).year) month = str(datetime.datetime.fromtimestamp(time_stamp).month) completepath = os.path.join(self.userfolder, f"{year}-{month}") return completepath def timestamp(self, stamptime=-1): if stamptime == -1: stamptime = time.time() timestamp = int(stamptime) filename = f"{self.get_stamp_file(time_stamp=stamptime)}.txt" try: # Öffne die Datei im Anhang-Modus ('a') with open(filename, 'a') as file: # Schreibe den Timestamp in die Datei und füge einen Zeilenumbruch hinzu file.write(f"{timestamp}\n") except FileNotFoundError: # Fehlende Verzeichnisse anlegen folder_path = os.path.dirname(filename) os.makedirs(folder_path, exist_ok=True) self.timestamp() # Nach zugehörigem JSON-File suchen und bei Bedarf anlegen json_filename = f"{self.get_stamp_file()}.json" try: with open(json_filename, 'r') as json_file: pass except: dict = { } dict["archived"] = 0 dict["total_hours"] = 0 json_dict = json.dumps(dict, indent=4) with open(json_filename, 'w') as json_file: json_file.write(json_dict) def stamp_status(self): try: # Öffne die Datei im Lese-Modus ('r') with open(f"{self.get_stamp_file()}.txt", 'r') as file: # Zähle die Zeilen lines = file.readlines() except FileNotFoundError: print(f"Die Datei {self.get_stamp_file()}.txt wurde nicht gefunden.") print("Lege die Datei an.") with open(f'{self.get_stamp_file()}.txt', 'w') as file: file.write("") with open(f"{self.get_stamp_file()}.txt", 'r') as file: # Zähle die Zeilen lines = file.readlines() if len(lines)== 0: pass elif len(lines) % 2 == 0: return status_out else: return status_in def last_2_timestmaps(self): with open(f"{self.get_stamp_file()}.txt", 'r') as file: lines = file.readlines() file.close() if len(lines) > 2: second_last_line = int(lines[-2]) last_line = int(lines[-1]) last_2_timestamps = [second_last_line, last_line] return last_2_timestamps elif len(lines) == 1: return int(lines[0]) else: return -1 def write_settings(self): dict = { } dict["username"] = self.username dict["fullname"] = self.fullname dict["password"] = self.password dict["workhours"] = self.workhours dict["api_key"] = self.api_key json_dict = json.dumps(dict, indent=4) with open(self.settingsfile, "w") as outputfile: outputfile.write(json_dict) pathcheck = self.userfolder if not is_docker(): pathcheck = pathcheck.removeprefix(os.path.join(userfolder)) if pathcheck != self.username: os.rename(self.userfolder, os.path.join(userfolder, self.username)) else: pathcheck = pathcheck.removeprefix("/users") if pathcheck != self.username: os.rename(self.userfolder, os.path.join(userfolder, self.username)) def del_user(self): shutil.rmtree(self.userfolder) def get_starting_day(self): starting_date = list(self.workhours) starting_date.sort() year = str(starting_date[0])[:4] month = str(starting_date[0])[5:7] day = str(starting_date[0])[8:10] return [year, month, day] def get_years(self): years = [ ] # Aktuelles Jahr bestimmen year_now = int(datetime.datetime.fromtimestamp(time.time()).strftime('%Y')) for i in range(int(self.get_starting_day()[0]), year_now + 1): years.append(str(i)) for file in os.listdir(self.userfolder): if re.match(r"\d{4}-\d{1,2}\.json", file): year = file.split("-")[0] if year not in years: years.append(year) years.sort() return years def get_months(self, year): available_months = [ ] # Anfangsdatum bestimmen start_year = int(self.get_starting_day()[0]) start_month = int(self.get_starting_day()[1]) year_now = int(datetime.datetime.now().year) month_now = int(datetime.datetime.now().month) if start_year == int(year): if start_year == year_now: for i in range(start_month, month_now + 1): available_months.append(i) elif start_year < year_now: for i in range(start_month, 13): available_months.append(i) else: if int(year) == year_now: for i in range(1, month_now + 1): available_months.append(i) elif int(year) < year_now: for i in range(1, 13): available_months.append(i) for file in os.listdir(self.userfolder): if re.match(r"\d{4}-\d{1,2}\.json", file): if file.split("-")[0] == str(year): month = int(file.split("-")[1].split(".")[0]) if month not in available_months: available_months.append(month) available_months.sort() return available_months def get_timestamps(self, year, month): try: with open(os.path.join(self.userfolder, f"{year}-{month}.txt"), "r") as file: timestamps = file.readlines() timestamps.sort() return timestamps except: timestamps = [ ] return timestamps def write_edited_timestamps(self, timestamps, year, month): with open(f"{self.userfolder}/{year}-{month}.txt", "w") as file: file.write(''.join(timestamps)) def get_archive_status(self, year, month): try: with open(os.path.join(self.userfolder, f"{year}-{month}.json"), 'r') as json_file: data = json.load(json_file) return data["archived"] except FileNotFoundError: return False except: return -1 def archiving_validity_check(self, year: int, month: int): timestampfilename = os.path.join(self.userfolder, f"{year}-{month}.txt") try: with open(timestampfilename) as timestampfile: timestamps = timestampfile.readlines() timestamps.sort() days_with_errors = [ ] for day in range(1, monthrange(year, month)[1] + 1): day_dt = datetime.datetime(year, month, day) timestamps_of_this_day = [ ] for i in timestamps: i_dt = datetime.datetime.fromtimestamp(int(i)) if day_dt.year == i_dt.year and day_dt.month == i_dt.month and day_dt.day == i_dt.day: timestamps_of_this_day.append(i) if len(timestamps_of_this_day) % 2 != 0: days_with_errors.append(day) return days_with_errors except: return [ ] def archive_hours(self, year, month, overtime: int): filename = os.path.join(self.userfolder, f"{year}-{month}.json") with open(filename, 'r') as json_file: data = json.load(json_file) data["archived"] = 1 data["overtime"] = overtime json_dict = json.dumps(data, indent=4) with open(filename, "w") as outputfile: outputfile.write(json_dict) # Dateien auf readonly setzen os.chmod(filename, S_IREAD) filename_txt = os.path.join(self.userfolder, f"{year}-{month}.txt") os.chmod(filename_txt, S_IREAD) def get_last_months_overtime(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month): try: if int(month) == 1: year = str(int(year) - 1) month = str(12) else: month = str(int(month) - 1) with open(os.path.join(self.userfolder, f"{year}-{month}.json"), "r") as json_file: json_data = json.load(json_file) if json_data["archived"] == 1: overtime = int(json_data["overtime"]) return overtime else: return 0 except: return 0 def get_absence(self, year, month): try: with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file: json_data = json.load(json_file) absence = json_data["absence"] return absence except: return { } def get_day_notes(self, year, month, day): try: with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file: json_data = json.load(json_file) day_note = json_data["notes"][str(day)] return day_note except: return { } def write_notes(self, year, month, day, note_dict): try: with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file: json_data = json.load(json_file) except FileNotFoundError: dict = {} dict["archived"] = 0 dict["total_hours"] = 0 dict["notes"] = { } json_dict = json.dumps(dict, indent=4) with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), 'w') as json_file: json_file.write(json_dict) json_data = dict if len(note_dict) == 1: user_info = list(note_dict)[0] try: json_data["notes"] except KeyError: json_data["notes"] = { } json_data["notes"][str(day)] = { } json_data["notes"][str(day)][user_info] = note_dict[user_info] if json_data["notes"][str(day)][user_info] == "": del json_data["notes"][str(day)][user_info] else: json_data["notes"][str(day)] = note_dict json_output = json.dumps(json_data, indent=4) with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file: json_file.write(json_output) def update_absence(self, year, month, day, absence_type): try: with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file: json_data = json.load(json_file) except: with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file: json_data = { } json_data["archived"] = 0 json_data["overtime"] = 0 json_dict = json.dumps(json_data, indent=4) json_file.write(json_dict) try: json_data["absence"][str(int(day))] = absence_type except: json_data.update({ "absence": { str(int(day)): absence_type}}) json_dict = json.dumps(json_data, indent=4) with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file: json_file.write(json_dict) def del_absence(self, year, month, day): with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file: json_data = json.load(json_file) del json_data["absence"][str(day)] json_dict = json.dumps(json_data, indent=4) with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file: json_file.write(json_dict) def get_day_workhours(self, year, month, day): #global hours_to_work workhour_entries = list(self.workhours) workhour_entries.sort() day_to_check = datetime.datetime(int(year), int(month), int(day)) # Fertage prüfen settings = load_adminsettings() holidays = list(settings["holidays"]) today_dt = datetime.datetime(int(year), int(month), int(day)) check_date_list = [ ] for i in holidays: i_split = i.split("-") check_year = int(i_split[0]) check_month = int(i_split[1]) check_day = int(i_split[2]) check_dt = datetime.datetime(check_year, check_month, check_day) check_date_list.append(check_dt) if today_dt in check_date_list: return 0 # Wochenarbeitszeit durchsuchen for entry in reversed(workhour_entries): entry_split = entry.split("-") entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2])) if entry_dt <= day_to_check: weekday = day_to_check.strftime("%w") if int(weekday) == 0: weekday = str(7) hours_to_work = self.workhours[entry][weekday] break else: # Wenn vor Einstellungsdatum -1 ausgeben hours_to_work = -1 return hours_to_work def get_vacation_claim(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day): workhour_entries = list(self.workhours) workhour_entries.sort() day_to_check = datetime.datetime(int(year), int(month), int(day)) claim = -1 for entry in reversed(workhour_entries): entry_split = entry.split("-") entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2])) if entry_dt <= day_to_check: claim = self.workhours[entry]["vacation"] break return int(claim) def count_absence_days(self, absence_code: str, year=datetime.datetime.now().year): absence_days = 0 for month in range(0, 13): try: absence_dict = self.get_absence(year, month) for entry, absence_type in absence_dict.items(): if absence_type == absence_code: absence_days += 1 except: pass return absence_days def delete_photo(self): os.remove(self.photofile) def get_day_timestamps(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day): timestamps = self.get_timestamps(year, month) check_day_dt = datetime.datetime(year, month, day) todays_timestamps = [] for i in timestamps: i_dt = datetime.datetime.fromtimestamp(int(i)) if i_dt.date() == check_day_dt.date(): todays_timestamps.append(int(i)) todays_timestamps.sort() return todays_timestamps def get_worked_time(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day): todays_timestamps = self.get_day_timestamps(year, month, day) if len(todays_timestamps) % 2 == 0: workrange = len(todays_timestamps) in_time_stamp = -1 else: workrange = len(todays_timestamps) - 1 in_time_stamp = int(todays_timestamps[-1]) total_time = 0 for i in range(0, workrange, 2): time_worked = todays_timestamps[i + 1] - todays_timestamps[i] total_time += time_worked return [total_time, in_time_stamp] def vacation_application(self, startdate, enddate): application_file = os.path.join(self.userfolder, va_file) try: with open(application_file, 'r') as json_file: applications = json.load(json_file) except FileNotFoundError: applications = { } applications[str(len(list(applications)))] = (startdate, enddate) with open(application_file, 'w') as json_file: json_file.write(json.dumps(applications, indent=4)) def get_open_vacation_applications(self): application_file = os.path.join(self.userfolder, va_file) try: with open(application_file, 'r') as json_file: applications = json.load(json_file) except FileNotFoundError: applications = { } return applications def revoke_vacation_application(self, index): application_file = os.path.join(self.userfolder, va_file) with open(application_file, 'r') as json_file: applications = json.load(json_file) try: del(applications[index]) new_applications = { } new_index = 0 for index, dates in applications.items(): new_applications[new_index] = dates new_index += 1 with open(application_file, 'w') as json_file: json_file.write(json.dumps(new_applications, indent=4)) return 0 except KeyError: ui.notify("Urlaubsantrag wurde schon bearbeitet") return -1 # Benutzer auflisten def list_users(): if not os.path.exists(userfolder): print("Kein Benutzerverzeichnis gefunden. Lege es an.") os.makedirs(userfolder) users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))] if len(users) == 0: print("Keine Benutzer gefunden. Lege Standardbenutzer an.") new_user("default") users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))] users.sort() return users def new_user(username: str): if not os.path.exists(userfolder): os.makedirs(userfolder) if not os.path.exists(os.path.join(userfolder, username)): os.makedirs(os.path.join(userfolder, username)) start_date_dt = datetime.datetime.now() start_date = start_date_dt.strftime("%Y-%m-%d") settings_to_write = standard_usersettings settings_to_write["workhours"][start_date] = { } settings_to_write["fullname"] = username settings_to_write["username"] = username # API-Key erzeugen string_to_hash = f'{username}_{datetime.datetime.now().timestamp()}' hash_string = hashlib.shake_256(bytes(string_to_hash, 'utf-8')).hexdigest(20) settings_to_write["api_key"] = hash_string for i in range(1, 8): settings_to_write["workhours"][start_date][str(i)] = 0 settings_to_write["workhours"][start_date]["vacation"] = 0 with open(f"{userfolder}/{username}/{usersettingsfilename}", 'w') as json_file: json_dict = json.dumps(standard_usersettings, indent=4) json_file.write(json_dict) # Admineinstellungen auslesen def load_adminsettings(): # Settingsdatei einlesen settings_filename = os.path.join(scriptpath, usersettingsfilename) if not os.path.exists(settings_filename): print("Keine Einstellungsdatei gefunden. Lege Standarddatei an.") with open(settings_filename, 'w') as json_file: json_dict = json.dumps(standard_adminsettings, indent=4) json_file.write(json_dict) try: with open(settings_filename) as json_file: data = json.load(json_file) return data except: return -1 # bestimmte Admineinstellungen speichern def write_adminsetting(key: str, value): settings_filename = os.path.join(scriptpath, usersettingsfilename) admin_settings = load_adminsettings() try: admin_settings[key] = value json_data = json.dumps(admin_settings, indent=4) with open(settings_filename, 'w') as output_file: output_file.write(json_data) except KeyError: print(f"Kein Einstellungsschlüssel {key} vorhanden.")