581 lines
21 KiB
Python
581 lines
21 KiB
Python
# Zeiterfassung
|
|
import hashlib
|
|
# User bezogene Funktionen
|
|
|
|
import os
|
|
from calendar import monthrange
|
|
from stat import S_IREAD, S_IWUSR
|
|
from nicegui import ui
|
|
|
|
import datetime
|
|
import time
|
|
import json
|
|
import shutil
|
|
import re
|
|
|
|
from lib.definitions import userfolder, scriptpath, usersettingsfilename, photofilename, status_in, status_out, \
|
|
standard_adminsettings, standard_usersettings, va_file, is_docker
|
|
|
|
# Benutzerklasse
|
|
|
|
class user:
|
|
def __init__(self, name):
|
|
if not is_docker():
|
|
self.userfolder = os.path.join(scriptpath, userfolder, name)
|
|
else:
|
|
self.userfolder = os.path.join("/users", name)
|
|
self.settingsfile = os.path.join(self.userfolder, usersettingsfilename)
|
|
self.photofile = os.path.join(self.userfolder, photofilename)
|
|
|
|
# Stammdaten einlesen
|
|
|
|
with open(self.settingsfile) as json_file:
|
|
data = json.load(json_file)
|
|
|
|
self.password = data["password"]
|
|
self.workhours = data["workhours"]
|
|
self.username = data["username"]
|
|
self.fullname = data["fullname"]
|
|
self.api_key = data["api_key"]
|
|
|
|
def get_stamp_file(self, time_stamp=None):
|
|
if time_stamp == None:
|
|
year = str(datetime.datetime.now().year)
|
|
month = str(datetime.datetime.now().month)
|
|
else:
|
|
year = str(datetime.datetime.fromtimestamp(time_stamp).year)
|
|
month = str(datetime.datetime.fromtimestamp(time_stamp).month)
|
|
completepath = os.path.join(self.userfolder, f"{year}-{month}")
|
|
return completepath
|
|
|
|
def timestamp(self, stamptime=-1):
|
|
|
|
if stamptime == -1:
|
|
stamptime = time.time()
|
|
timestamp = int(stamptime)
|
|
|
|
filename = f"{self.get_stamp_file(time_stamp=stamptime)}.txt"
|
|
|
|
try:
|
|
# Öffne die Datei im Anhang-Modus ('a')
|
|
with open(filename, 'a') as file:
|
|
# Schreibe den Timestamp in die Datei und füge einen Zeilenumbruch hinzu
|
|
file.write(f"{timestamp}\n")
|
|
except FileNotFoundError:
|
|
# Fehlende Verzeichnisse anlegen
|
|
folder_path = os.path.dirname(filename)
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
self.timestamp()
|
|
|
|
# Nach zugehörigem JSON-File suchen und bei Bedarf anlegen
|
|
json_filename = f"{self.get_stamp_file()}.json"
|
|
try:
|
|
with open(json_filename, 'r') as json_file:
|
|
pass
|
|
except:
|
|
dict = { }
|
|
dict["archived"] = 0
|
|
dict["total_hours"] = 0
|
|
|
|
json_dict = json.dumps(dict, indent=4)
|
|
with open(json_filename, 'w') as json_file:
|
|
json_file.write(json_dict)
|
|
|
|
def stamp_status(self):
|
|
try:
|
|
# Öffne die Datei im Lese-Modus ('r')
|
|
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
|
|
# Zähle die Zeilen
|
|
lines = file.readlines()
|
|
except FileNotFoundError:
|
|
print(f"Die Datei {self.get_stamp_file()}.txt wurde nicht gefunden.")
|
|
print("Lege die Datei an.")
|
|
with open(f'{self.get_stamp_file()}.txt', 'w') as file:
|
|
file.write("")
|
|
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
|
|
# Zähle die Zeilen
|
|
lines = file.readlines()
|
|
if len(lines)== 0:
|
|
pass
|
|
elif len(lines) % 2 == 0:
|
|
return status_out
|
|
else:
|
|
return status_in
|
|
|
|
def last_2_timestmaps(self):
|
|
|
|
with open(f"{self.get_stamp_file()}.txt", 'r') as file:
|
|
lines = file.readlines()
|
|
file.close()
|
|
|
|
if len(lines) > 2:
|
|
second_last_line = int(lines[-2])
|
|
last_line = int(lines[-1])
|
|
last_2_timestamps = [second_last_line, last_line]
|
|
return last_2_timestamps
|
|
|
|
elif len(lines) == 1:
|
|
return int(lines[0])
|
|
else:
|
|
return -1
|
|
|
|
def write_settings(self):
|
|
dict = { }
|
|
dict["username"] = self.username
|
|
dict["fullname"] = self.fullname
|
|
dict["password"] = self.password
|
|
dict["workhours"] = self.workhours
|
|
dict["api_key"] = self.api_key
|
|
|
|
json_dict = json.dumps(dict, indent=4)
|
|
|
|
with open(self.settingsfile, "w") as outputfile:
|
|
outputfile.write(json_dict)
|
|
|
|
pathcheck = self.userfolder
|
|
if not is_docker():
|
|
pathcheck = pathcheck.removeprefix(os.path.join(scriptpath, userfolder))
|
|
if pathcheck != self.username:
|
|
os.rename(self.userfolder, os.path.join(scriptpath, userfolder, self.username))
|
|
else:
|
|
pathcheck = pathcheck.removeprefix("/users")
|
|
if pathcheck != self.username:
|
|
os.rename(self.userfolder, os.path.join(userfolder, self.username))
|
|
|
|
|
|
def del_user(self):
|
|
shutil.rmtree(self.userfolder)
|
|
|
|
def get_starting_day(self):
|
|
starting_date = list(self.workhours)
|
|
starting_date.sort()
|
|
year = str(starting_date[0])[:4]
|
|
month = str(starting_date[0])[5:7]
|
|
day = str(starting_date[0])[8:10]
|
|
|
|
return [year, month, day]
|
|
|
|
def get_years(self):
|
|
years = [ ]
|
|
|
|
# Aktuelles Jahr bestimmen
|
|
year_now = int(datetime.datetime.fromtimestamp(time.time()).strftime('%Y'))
|
|
|
|
for i in range(int(self.get_starting_day()[0]), year_now + 1):
|
|
years.append(str(i))
|
|
|
|
for file in os.listdir(self.userfolder):
|
|
if re.match(r"\d{4}-\d{1,2}\.json", file):
|
|
year = file.split("-")[0]
|
|
if year not in years:
|
|
years.append(year)
|
|
|
|
years.sort()
|
|
return years
|
|
|
|
def get_months(self, year):
|
|
available_months = [ ]
|
|
|
|
# Anfangsdatum bestimmen
|
|
start_year = int(self.get_starting_day()[0])
|
|
start_month = int(self.get_starting_day()[1])
|
|
year_now = int(datetime.datetime.now().year)
|
|
month_now = int(datetime.datetime.now().month)
|
|
|
|
if start_year == int(year):
|
|
|
|
if start_year == year_now:
|
|
for i in range(start_month, month_now + 1):
|
|
available_months.append(i)
|
|
elif start_year < year_now:
|
|
for i in range(start_month, 13):
|
|
available_months.append(i)
|
|
else:
|
|
if int(year) == year_now:
|
|
for i in range(1, month_now + 1):
|
|
available_months.append(i)
|
|
elif int(year) < year_now:
|
|
for i in range(1, 13):
|
|
available_months.append(i)
|
|
|
|
for file in os.listdir(self.userfolder):
|
|
|
|
if re.match(r"\d{4}-\d{1,2}\.json", file):
|
|
if file.split("-")[0] == str(year):
|
|
month = int(file.split("-")[1].split(".")[0])
|
|
if month not in available_months:
|
|
available_months.append(month)
|
|
available_months.sort()
|
|
return available_months
|
|
|
|
def get_timestamps(self, year, month):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{year}-{month}.txt"), "r") as file:
|
|
timestamps = file.readlines()
|
|
timestamps.sort()
|
|
return timestamps
|
|
except:
|
|
timestamps = [ ]
|
|
return timestamps
|
|
|
|
def write_edited_timestamps(self, timestamps, year, month):
|
|
with open(f"{self.userfolder}/{year}-{month}.txt", "w") as file:
|
|
file.write(''.join(timestamps))
|
|
|
|
def get_archive_status(self, year, month):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{year}-{month}.json"), 'r') as json_file:
|
|
data = json.load(json_file)
|
|
return data["archived"]
|
|
except FileNotFoundError:
|
|
return False
|
|
except:
|
|
return -1
|
|
|
|
def archiving_validity_check(self, year: int, month: int):
|
|
timestampfilename = os.path.join(self.userfolder, f"{year}-{month}.txt")
|
|
try:
|
|
with open(timestampfilename) as timestampfile:
|
|
timestamps = timestampfile.readlines()
|
|
timestamps.sort()
|
|
days_with_errors = [ ]
|
|
for day in range(1, monthrange(year, month)[1] + 1):
|
|
day_dt = datetime.datetime(year, month, day)
|
|
timestamps_of_this_day = [ ]
|
|
for i in timestamps:
|
|
i_dt = datetime.datetime.fromtimestamp(int(i))
|
|
if day_dt.year == i_dt.year and day_dt.month == i_dt.month and day_dt.day == i_dt.day:
|
|
timestamps_of_this_day.append(i)
|
|
if len(timestamps_of_this_day) % 2 != 0:
|
|
days_with_errors.append(day)
|
|
return days_with_errors
|
|
except:
|
|
return [ ]
|
|
|
|
def archive_hours(self, year, month, overtime: int):
|
|
|
|
filename = os.path.join(self.userfolder, f"{year}-{month}.json")
|
|
with open(filename, 'r') as json_file:
|
|
data = json.load(json_file)
|
|
data["archived"] = 1
|
|
data["overtime"] = overtime
|
|
|
|
json_dict = json.dumps(data, indent=4)
|
|
|
|
with open(filename, "w") as outputfile:
|
|
outputfile.write(json_dict)
|
|
# Dateien auf readonly setzen
|
|
os.chmod(filename, S_IREAD)
|
|
filename_txt = os.path.join(self.userfolder, f"{year}-{month}.txt")
|
|
os.chmod(filename_txt, S_IREAD)
|
|
|
|
def get_last_months_overtime(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month):
|
|
try:
|
|
if int(month) == 1:
|
|
year = str(int(year) - 1)
|
|
month = str(12)
|
|
else:
|
|
month = str(int(month) - 1)
|
|
with open(os.path.join(self.userfolder, f"{year}-{month}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
|
|
if json_data["archived"] == 1:
|
|
overtime = int(json_data["overtime"])
|
|
return overtime
|
|
else:
|
|
return 0
|
|
except:
|
|
return 0
|
|
|
|
def get_absence(self, year, month):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
absence = json_data["absence"]
|
|
return absence
|
|
except:
|
|
return { }
|
|
|
|
def get_day_notes(self, year, month, day):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
day_note = json_data["notes"][str(day)]
|
|
return day_note
|
|
except:
|
|
return { }
|
|
|
|
def write_notes(self, year, month, day, note_dict):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
except FileNotFoundError:
|
|
dict = {}
|
|
dict["archived"] = 0
|
|
dict["total_hours"] = 0
|
|
dict["notes"] = { }
|
|
|
|
json_dict = json.dumps(dict, indent=4)
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), 'w') as json_file:
|
|
json_file.write(json_dict)
|
|
|
|
json_data = dict
|
|
|
|
if len(note_dict) == 1:
|
|
user_info = list(note_dict)[0]
|
|
try:
|
|
json_data["notes"]
|
|
except KeyError:
|
|
json_data["notes"] = { }
|
|
json_data["notes"][str(day)] = { }
|
|
json_data["notes"][str(day)][user_info] = note_dict[user_info]
|
|
if json_data["notes"][str(day)][user_info] == "":
|
|
del json_data["notes"][str(day)][user_info]
|
|
else:
|
|
json_data["notes"][str(day)] = note_dict
|
|
|
|
json_output = json.dumps(json_data, indent=4)
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
|
|
json_file.write(json_output)
|
|
|
|
|
|
def update_absence(self, year, month, day, absence_type):
|
|
try:
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
except:
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
|
|
json_data = { }
|
|
json_data["archived"] = 0
|
|
json_data["overtime"] = 0
|
|
json_dict = json.dumps(json_data, indent=4)
|
|
json_file.write(json_dict)
|
|
try:
|
|
json_data["absence"][str(int(day))] = absence_type
|
|
except:
|
|
json_data.update({ "absence": { str(int(day)): absence_type}})
|
|
json_dict = json.dumps(json_data, indent=4)
|
|
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
|
|
json_file.write(json_dict)
|
|
|
|
def del_absence(self, year, month, day):
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
|
|
json_data = json.load(json_file)
|
|
|
|
del json_data["absence"][str(day)]
|
|
json_dict = json.dumps(json_data, indent=4)
|
|
|
|
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "w") as json_file:
|
|
json_file.write(json_dict)
|
|
|
|
def get_day_workhours(self, year, month, day):
|
|
#global hours_to_work
|
|
workhour_entries = list(self.workhours)
|
|
workhour_entries.sort()
|
|
day_to_check = datetime.datetime(int(year), int(month), int(day))
|
|
|
|
# Fertage prüfen
|
|
settings = load_adminsettings()
|
|
holidays = list(settings["holidays"])
|
|
|
|
today_dt = datetime.datetime(int(year), int(month), int(day))
|
|
check_date_list = [ ]
|
|
for i in holidays:
|
|
i_split = i.split("-")
|
|
check_year = int(i_split[0])
|
|
check_month = int(i_split[1])
|
|
check_day = int(i_split[2])
|
|
check_dt = datetime.datetime(check_year, check_month, check_day)
|
|
check_date_list.append(check_dt)
|
|
if today_dt in check_date_list:
|
|
return 0
|
|
|
|
# Wochenarbeitszeit durchsuchen
|
|
for entry in reversed(workhour_entries):
|
|
|
|
entry_split = entry.split("-")
|
|
entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2]))
|
|
|
|
if entry_dt <= day_to_check:
|
|
weekday = day_to_check.strftime("%w")
|
|
if int(weekday) == 0:
|
|
weekday = str(7)
|
|
hours_to_work = self.workhours[entry][weekday]
|
|
break
|
|
else:
|
|
# Wenn vor Einstellungsdatum -1 ausgeben
|
|
hours_to_work = -1
|
|
return hours_to_work
|
|
|
|
def get_vacation_claim(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
|
|
workhour_entries = list(self.workhours)
|
|
workhour_entries.sort()
|
|
day_to_check = datetime.datetime(int(year), int(month), int(day))
|
|
|
|
claim = -1
|
|
|
|
for entry in reversed(workhour_entries):
|
|
|
|
entry_split = entry.split("-")
|
|
entry_dt = datetime.datetime(int(entry_split[0]), int(entry_split[1]), int(entry_split[2]))
|
|
|
|
if entry_dt <= day_to_check:
|
|
claim = self.workhours[entry]["vacation"]
|
|
break
|
|
|
|
return int(claim)
|
|
|
|
def count_absence_days(self, absence_code: str, year=datetime.datetime.now().year):
|
|
absence_days = 0
|
|
for month in range(0, 13):
|
|
try:
|
|
absence_dict = self.get_absence(year, month)
|
|
for entry, absence_type in absence_dict.items():
|
|
if absence_type == absence_code:
|
|
absence_days += 1
|
|
|
|
except:
|
|
pass
|
|
return absence_days
|
|
|
|
def delete_photo(self):
|
|
os.remove(self.photofile)
|
|
|
|
def get_day_timestamps(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
|
|
timestamps = self.get_timestamps(year, month)
|
|
check_day_dt = datetime.datetime(year, month, day)
|
|
todays_timestamps = []
|
|
|
|
for i in timestamps:
|
|
i_dt = datetime.datetime.fromtimestamp(int(i))
|
|
if i_dt.date() == check_day_dt.date():
|
|
todays_timestamps.append(int(i))
|
|
|
|
todays_timestamps.sort()
|
|
|
|
return todays_timestamps
|
|
|
|
def get_worked_time(self, year=datetime.datetime.now().year, month=datetime.datetime.now().month, day=datetime.datetime.now().day):
|
|
|
|
todays_timestamps = self.get_day_timestamps(year, month, day)
|
|
|
|
if len(todays_timestamps) % 2 == 0:
|
|
workrange = len(todays_timestamps)
|
|
in_time_stamp = -1
|
|
else:
|
|
workrange = len(todays_timestamps) - 1
|
|
in_time_stamp = int(todays_timestamps[-1])
|
|
total_time = 0
|
|
|
|
for i in range(0, workrange, 2):
|
|
time_worked = todays_timestamps[i + 1] - todays_timestamps[i]
|
|
total_time += time_worked
|
|
|
|
return [total_time, in_time_stamp]
|
|
|
|
def vacation_application(self, startdate, enddate):
|
|
application_file = os.path.join(self.userfolder, va_file)
|
|
try:
|
|
with open(application_file, 'r') as json_file:
|
|
applications = json.load(json_file)
|
|
except FileNotFoundError:
|
|
applications = { }
|
|
applications[str(len(list(applications)))] = (startdate, enddate)
|
|
with open(application_file, 'w') as json_file:
|
|
json_file.write(json.dumps(applications, indent=4))
|
|
|
|
def get_open_vacation_applications(self):
|
|
application_file = os.path.join(self.userfolder, va_file)
|
|
try:
|
|
with open(application_file, 'r') as json_file:
|
|
applications = json.load(json_file)
|
|
except FileNotFoundError:
|
|
applications = { }
|
|
return applications
|
|
|
|
def revoke_vacation_application(self, index):
|
|
application_file = os.path.join(self.userfolder, va_file)
|
|
with open(application_file, 'r') as json_file:
|
|
applications = json.load(json_file)
|
|
try:
|
|
del(applications[index])
|
|
new_applications = { }
|
|
new_index = 0
|
|
for index, dates in applications.items():
|
|
new_applications[new_index] = dates
|
|
new_index += 1
|
|
with open(application_file, 'w') as json_file:
|
|
json_file.write(json.dumps(new_applications, indent=4))
|
|
return 0
|
|
except KeyError:
|
|
ui.notify("Urlaubsantrag wurde schon bearbeitet")
|
|
return -1
|
|
|
|
# Benutzer auflisten
|
|
def list_users():
|
|
|
|
if not os.path.exists(userfolder):
|
|
print("Kein Benutzerverzeichnis gefunden. Lege es an.")
|
|
os.makedirs(userfolder)
|
|
|
|
users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))]
|
|
if len(users) == 0:
|
|
print("Keine Benutzer gefunden. Lege Standardbenutzer an.")
|
|
new_user("default")
|
|
users = [d for d in os.listdir(userfolder) if os.path.isdir(os.path.join(userfolder, d))]
|
|
|
|
users.sort()
|
|
return users
|
|
|
|
def new_user(username: str):
|
|
if not os.path.exists(userfolder):
|
|
os.makedirs(userfolder)
|
|
if not os.path.exists(os.path.join(userfolder, username)):
|
|
os.makedirs(os.path.join(userfolder, username))
|
|
start_date_dt = datetime.datetime.now()
|
|
start_date = start_date_dt.strftime("%Y-%m-%d")
|
|
settings_to_write = standard_usersettings
|
|
settings_to_write["workhours"][start_date] = { }
|
|
settings_to_write["fullname"] = username
|
|
settings_to_write["username"] = username
|
|
# API-Key erzeugen
|
|
string_to_hash = f'{username}_{datetime.datetime.now().timestamp()}'
|
|
hash_string = hashlib.shake_256(bytes(string_to_hash, 'utf-8')).hexdigest(20)
|
|
settings_to_write["api_key"] = hash_string
|
|
for i in range(1, 8):
|
|
settings_to_write["workhours"][start_date][str(i)] = 0
|
|
settings_to_write["workhours"][start_date]["vacation"] = 0
|
|
with open(f"{userfolder}/{username}/{usersettingsfilename}", 'w') as json_file:
|
|
json_dict = json.dumps(standard_usersettings, indent=4)
|
|
json_file.write(json_dict)
|
|
|
|
# Admineinstellungen auslesen
|
|
def load_adminsettings():
|
|
# Settingsdatei einlesen
|
|
settings_filename = os.path.join(scriptpath, usersettingsfilename)
|
|
if not os.path.exists(settings_filename):
|
|
print("Keine Einstellungsdatei gefunden. Lege Standarddatei an.")
|
|
with open(settings_filename, 'w') as json_file:
|
|
json_dict = json.dumps(standard_adminsettings, indent=4)
|
|
json_file.write(json_dict)
|
|
try:
|
|
with open(settings_filename) as json_file:
|
|
data = json.load(json_file)
|
|
return data
|
|
except:
|
|
return -1
|
|
|
|
# bestimmte Admineinstellungen speichern
|
|
def write_adminsetting(key: str, value):
|
|
settings_filename = os.path.join(scriptpath, usersettingsfilename)
|
|
admin_settings = load_adminsettings()
|
|
try:
|
|
admin_settings[key] = value
|
|
json_data = json.dumps(admin_settings, indent=4)
|
|
with open(settings_filename, 'w') as output_file:
|
|
output_file.write(json_data)
|
|
except KeyError:
|
|
print(f"Kein Einstellungsschlüssel {key} vorhanden.")
|
|
|