Merge branch 'master' into styling

This commit is contained in:
Alexander Malzkuhn 2025-06-04 06:27:03 +02:00
commit 0ebf563f21
32 changed files with 149 additions and 463 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@
.venv
users/
backup/
Archiv/
Docker/
docker-work/

3
.idea/.gitignore generated vendored
View File

@ -1,3 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.11 (Zeiterfassung)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated
View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.11" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (Zeiterfassung)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated
View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Zeiterfassung.iml" filepath="$PROJECT_DIR$/.idea/Zeiterfassung.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated
View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

24
Dockerfile Normal file
View File

@ -0,0 +1,24 @@
FROM debian:latest
RUN apt update && apt upgrade -y
RUN apt install python3 python3-pip python3.11-venv locales -y
RUN mkdir /app
RUN mkdir /.venv
RUN mkdir /backup
RUN mkdir /settings
RUN python3 -m venv /.venv
RUN /.venv/bin/pip install nicegui
RUN /.venv/bin/pip install segno
RUN /.venv/bin/pip install python-dateutil
RUN sed -i '/de_DE.UTF-8/s/^# //g' /etc/locale.gen && \
locale-gen
ENV LANG de_DE.UTF-8
ENV LANGUAGE de_DE:de
ENV LC_ALL de_DE.UTF-8
COPY main.py /app/main.py
COPY favicon.svg /app/favicon.svg
COPY lib /app/lib/
EXPOSE 8090
ENTRYPOINT ["/.venv/bin/python", "/app/main.py"]
#ENTRYPOINT exec /app/.venv/bin/python /app/main.py --docker

11
docker-compose.yml Normal file
View File

@ -0,0 +1,11 @@
services:
test:
image: test:0
restart: always
ports:
- 8090:8090
volumes:
#- /home/alexander/Dokumente/Python/Zeiterfassung:/app
- /home/alexander/Dokumente/Python/Zeiterfassung/docker-work/users:/users
- /home/alexander/Dokumente/Python/Zeiterfassung/docker-work/backup:/backup
- /home/alexander/Dokumente/Python/Zeiterfassung/docker-work/settings:/settings

View File

@ -1,7 +1,6 @@
from datetime import datetime
import dateutil.easter
from PIL.SpiderImagePlugin import isInt
from dateutil.easter import *
from nicegui import ui, app, events
@ -45,6 +44,14 @@ def page_admin():
updates_available = ValueBinder()
updates_available.value = False
enabled_because_not_docker = ValueBinder
if is_docker():
enabled_because_not_docker.value = False
scriptpath = "/app"
backupfolder = "/backup"
else:
enabled_because_not_docker.value = True
with ui.tabs() as tabs:
time_overview = ui.tab('Zeitdaten')
@ -739,30 +746,6 @@ Dies kann nicht rückgängig gemacht werden!''')
with ui.card():
ui.markdown("**Administrationsbenutzer:**")
with ui.grid(columns=2):
def save_admin_settings():
write_adminsetting("admin_user", admin_user.value)
if admin_password.value != "":
write_adminsetting("admin_password", hash_password(admin_password.value))
else:
write_adminsetting("admin_password", data["admin_password"])
write_adminsetting("port", port.value)
write_adminsetting("secret", secret)
write_adminsetting("touchscreen", touchscreen_switch.value)
write_adminsetting("times_on_touchscreen", timestamp_switch.value)
write_adminsetting("photos_on_touchscreen", photo_switch.value)
write_adminsetting("picture_height", picture_height_input.value)
write_adminsetting("button_height", button_height_input.value)
write_adminsetting("user_notes", notes_switch.value)
write_adminsetting("holidays", data["holidays"])
write_adminsetting("vacation_application", va_switch.value)
if int(old_port) != int(port.value):
with ui.dialog() as dialog, ui.card():
ui.markdown("Damit die Porteinstellungen wirksam werden, muss der Server neu gestartet werden.")
ui.button("OK", on_click=lambda: dialog.close())
dialog.open()
ui.notify("Einstellungen gespeichert")
timetable.refresh()
ui.markdown("Benutzername des Adminstrators")
admin_user = ui.input().tooltip("Geben Sie hier den Benutzernamen für den Adminstationsnutzer ein")
@ -784,7 +767,9 @@ Dies kann nicht rückgängig gemacht werden!''')
ui.markdown("Port:")
port = ui.input(validation={"Nur ganzzahlige Portnummern erlaubt": lambda value: check_is_number(value),
"Portnummer zu klein": lambda value: len(value)>=2}).tooltip("Geben Sie hier die Portnummer ein, unter der die Zeiterfassung erreichbar ist.").props('size=5')
"Portnummer zu klein": lambda value: len(value)>=2}).tooltip("Geben Sie hier die Portnummer ein, unter der die Zeiterfassung erreichbar ist.").props('size=5').bind_enabled_from(enabled_because_not_docker, 'value')
if is_docker():
port.tooltip("Diese Einstellung ist beim Einsatz von Docker deaktiviert.")
old_port = data["port"]
port.value = old_port
@ -826,12 +811,12 @@ Dies kann nicht rückgängig gemacht werden!''')
ui.markdown("**Einstellungen für Benutzerfrontend**")
notes_switch = ui.switch("Notizfunktion aktiviert", value=data["user_notes"])
va_switch = ui.switch("Urlaubsanträge", value=data["vacation_application"])
reset_visibility = ValueBinder()
def holiday_section():
with ui.card():
ui.markdown('**Feiertage:**')
reset_visibility = ValueBinder()
reset_visibility.value = False
def new_holiday_entry():
@ -1031,6 +1016,33 @@ Dies kann nicht rückgängig gemacht werden!''')
holiday_section()
def save_admin_settings():
write_adminsetting("admin_user", admin_user.value)
if admin_password.value != "":
write_adminsetting("admin_password", hash_password(admin_password.value))
else:
write_adminsetting("admin_password", data["admin_password"])
write_adminsetting("port", port.value)
write_adminsetting("secret", secret)
write_adminsetting("touchscreen", touchscreen_switch.value)
write_adminsetting("times_on_touchscreen", timestamp_switch.value)
write_adminsetting("photos_on_touchscreen", photo_switch.value)
write_adminsetting("picture_height", picture_height_input.value)
write_adminsetting("button_height", button_height_input.value)
write_adminsetting("user_notes", notes_switch.value)
write_adminsetting("holidays", data["holidays"])
write_adminsetting("vacation_application", va_switch.value)
if int(old_port) != int(port.value):
with ui.dialog() as dialog, ui.card():
ui.markdown(
"Damit die Porteinstellungen wirksam werden, muss der Server neu gestartet werden.")
ui.button("OK", on_click=lambda: dialog.close())
dialog.open()
ui.notify("Einstellungen gespeichert")
reset_visibility.value = False
timetable.refresh()
ui.button("Speichern", on_click=save_admin_settings).tooltip("Hiermit werden sämtliche oben gemachten Einstellungen gespeichert.")
with ui.tab_panel(users):
@ -1393,7 +1405,7 @@ Dies kann nicht rückgängig gemacht werden!''')
ui.label("Backupeinstellungen").classes('font-bold')
with ui.grid(columns='auto auto auto'):
ui.markdown("Backupordner:")
backupfolder_input = ui.input(value=backupfolder).props(f"size={len(backupfolder)}")
backupfolder_input = ui.input(value=backupfolder).props(f"size={len(backupfolder)}").bind_enabled_from(enabled_because_not_docker, 'value')
def save_new_folder_name():
if os.path.exists(backupfolder_input.value):
write_adminsetting("backup_folder", backupfolder_input.value)
@ -1405,7 +1417,9 @@ Dies kann nicht rückgängig gemacht werden!''')
ui.label("exisitiert nicht und kann daher nicht verwendet werden.")
ui.button("OK", on_click=dialog.close)
dialog.open()
ui.button("Speichern", on_click=save_new_folder_name).tooltip("Hiermit können Sie das Backupverzeichnis ändeern")
save_backup_folder_button = ui.button("Speichern", on_click=save_new_folder_name).tooltip("Hiermit können Sie das Backupverzeichnis ändeern").bind_enabled_from(enabled_because_not_docker, 'value')
if is_docker():
save_backup_folder_button.tooltip("Diese Einstellung ist beim Einsatz von Docker deaktiviert.")
ui.markdown("API-Schlüssel:")
backup_api_key_input = ui.input(value=api_key).tooltip("Hier den API-Schlüssel eintragen, der für Backuperzeugung mittels API-Aufruf verwendet werden soll.")
@ -1465,7 +1479,10 @@ Dies kann nicht rückgängig gemacht werden!''')
except ValueError:
button_string = date_string
ui.markdown(button_string)
ui.markdown(f'{round(size/1_000_000,2)} MB')
if size > 1_000_000:
ui.markdown(f'{round(size/1_000_000,2)} MB')
else:
ui.markdown(f'{round(size / 1_000, 2)} kB')
ui.markdown(version)
ui.button(icon='download', on_click=lambda file=date_string: ui.download.file(
os.path.join(scriptpath, backupfolder, f'{file}.zip'))).tooltip(
@ -1526,7 +1543,7 @@ Dies kann nicht rückgängig gemacht werden!''')
for file in files:
add = os.path.join(root, file)
target.write(add)
target.write(usersettingsfilename)
target.write(os.path.join(scriptpath, usersettingsfilename), arcname=usersettingsfilename)
target.writestr("app_version.txt", data=app_version)
backup_list.refresh()
n.dismiss()

View File

@ -6,12 +6,21 @@ from pathlib import Path
import hashlib
app_title = "Zeiterfassung"
app_version = ("0.0.0")
app_version = "0.0.0"
# Standardpfade
scriptpath = str(Path(os.path.dirname(os.path.abspath(__file__))).parent.absolute())
def is_docker():
cgroup = Path('/proc/self/cgroup')
return Path('/.dockerenv').is_file() or (cgroup.is_file() and 'docker' in cgroup.read_text())
if is_docker():
scriptpath = "/settings"
backupfolder = "/backup"
else:
scriptpath = str(Path(os.path.dirname(os.path.abspath(__file__))).parent.absolute())
backupfolder = str(os.path.join(scriptpath, "backup"))
userfolder = "users"
backupfolder = str(os.path.join(scriptpath, "backup"))
# Dateinamen
@ -37,7 +46,7 @@ standard_adminsettings = { "admin_user": "admin",
"button_height": 300,
"user_notes": True,
"vacation_application": True,
"backupfolder": backupfolder,
"backup_folder": backupfolder,
"backup_api_key": hashlib.shake_256(bytes(backupfolder, 'utf-8')).hexdigest(20),
"holidays": { }
}

View File

@ -199,6 +199,7 @@ def homepage():
overviews = ui.tab('Übersichten')
absence = ui.tab('Urlaubsantrag')
absence.set_visibility(load_adminsettings()["vacation_application"])
pw_change = ui.tab("Passwort")
with ui.grid(columns='1fr auto 1fr').classes('w-full items-center'):
ui.space()
@ -281,6 +282,31 @@ def homepage():
ui.button("Zurückziehen", on_click=retract_va).tooltip("Hiermit wird der oben gewählte Urlaubsantrag zurückgezogen.").classes('w-full')
open_vacation_applications()
with ui.tab_panel(pw_change):
ui.label("Passwort ändern").classes('font-bold')
with ui.grid(columns='auto auto').classes('items-end'):
ui.label("Altes Passwort:")
old_pw_input = ui.input(password=True)
ui.label("Neues Passwort:")
new_pw_input = ui.input(password=True)
ui.label("Neues Passwort bestätigen:")
new_pw_confirm_input = ui.input(password=True)
def revert_pw_inputs():
old_pw_input.value = ""
new_pw_input.value = ""
new_pw_confirm_input.value = ""
def save_new_password():
if hash_password(old_pw_input.value) == current_user.password:
if new_pw_input.value == new_pw_confirm_input.value:
current_user.password = hash_password(new_pw_input.value)
current_user.write_settings()
ui.notify("Neues Passwort gespeichert")
else:
ui.notify("Passwortbestätigung stimmt nicht überein")
else:
ui.notify("Altes Passwort nicht korrekt")
ui.button("Speichern", on_click=save_new_password)
ui.button("Zurücksetzen", on_click=revert_pw_inputs)
ui.space()
else:

View File

@ -1,13 +0,0 @@
{
"admin_user": "admin",
"admin_password": "8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918",
"port": "8090",
"secret": "ftgzuhjikg,mt5jn46uzer8sfi9okrmtzjhndfierko5zltjhdgise",
"times_on_touchscreen": true,
"photos_on_touchscreen": true,
"touchscreen": true,
"picure_height": 200,
"button_height": 300,
"user_notes": true,
"holidays": {}
}

View File

@ -14,25 +14,26 @@ import shutil
import re
from lib.definitions import userfolder, scriptpath, usersettingsfilename, photofilename, status_in, status_out, \
standard_adminsettings, standard_usersettings, va_file
standard_adminsettings, standard_usersettings, va_file, is_docker
# Benutzerklasse
class user:
def __init__(self, name):
self.userfolder = os.path.join(scriptpath, userfolder, name)
if not is_docker():
self.userfolder = os.path.join(scriptpath, userfolder, name)
else:
self.userfolder = os.path.join("/users", name)
self.settingsfile = os.path.join(self.userfolder, usersettingsfilename)
self.photofile = os.path.join(self.userfolder, photofilename)
# Stammdaten einlesen
try:
with open(self.settingsfile) as json_file:
#try:
with open(self.settingsfile) as json_file:
data = json.load(json_file)
except:
print("Fehler beim Erstellen des Datenarrays.")
#Hier muss noch Fehlerbehandlungcode hin
#except:
# print("Fehler beim Erstellen des Datenarrays.")
# #TODO Hier muss noch Fehlerbehandlungcode hin
self.password = data["password"]
self.workhours = data["workhours"]
@ -301,10 +302,8 @@ class user:
return { }
def write_notes(self, year, month, day, note_dict):
print(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"))
with open(os.path.join(self.userfolder, f"{int(year)}-{int(month)}.json"), "r") as json_file:
json_data = json.load(json_file)
print(json_data)
if len(note_dict) == 1:
user_info = list(note_dict)[0]
json_data["notes"][str(day)] = { }

View File

@ -15,8 +15,7 @@ import argparse
from lib.web_ui import hash_password
class Commandline_Header:
def commandline_header():
message_string = f"{app_title} {app_version}"
underline = ""
for i in range(len(message_string)):
@ -37,8 +36,7 @@ def main():
def startup_message():
Commandline_Header()
commandline_header()
url_string = ""
for i in list(app.urls):
url_string += f"{i}, "
@ -60,8 +58,13 @@ if __name__ in ("__main__", "__mp_main__"):
parser = argparse.ArgumentParser(description=f'{app_title} {app_version}')
parser.add_argument('--admin-access', help='Zugangsdaten für Administrator einstellen', action="store_true")
args = parser.parse_args()
if is_docker():
scriptpath = "/app"
backupfolder = "/backup"
if args.admin_access:
Commandline_Header()
commandline_header()
print("Lade Administrationseinstellungen")
admin_settings = load_adminsettings()
print("Geben Sie den neuen Benutzernamen für den Administrationsbenutzer an:")

View File

@ -1,23 +0,0 @@
import json
import urllib.request
from nicegui import ui, app
import segno
@app.get("/data")
async def deliver_data():
with open("settings.json") as json_file:
data = json.load(json_file)
return data
string = ""
for i in range(1000):
string += str(i)
qr_code = segno.make_qr(string).svg_data_uri()
#qr_code.save("qr_code.png", scale=5, border=0)
ui.image(qr_code)
ui.run(language="de-DE", port=9000)

View File

@ -1,163 +0,0 @@
#!/usr/bin/env python3
import base64
import signal
import time
import argparse
import requests
import cv2
import numpy as np
from fastapi import Response
from playsound3 import playsound
from definitions import app_title, app_version
from nicegui import Client, app, core, run, ui
class Commandline_Header:
message_string = f"{app_title} {app_version}"
underline = ""
for i in range(len(message_string)):
underline += "-"
print(message_string)
print(underline)
def visual_interface(port=9000):
# In case you don't have a webcam, this will provide a black placeholder image.
black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII='
placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png')
global convert
def convert(frame: np.ndarray) -> bytes:
"""Converts a frame from OpenCV to a JPEG image.
This is a free function (not in a class or inner-function),
to allow run.cpu_bound to pickle it and send it to a separate process.
"""
_, imencode_image = cv2.imencode('.jpg', frame)
return imencode_image.tobytes()
global setup
def setup() -> None:
url_string = ""
for i in list(app.urls):
url_string += f"{i}, "
url_string = url_string[0:-2]
print("Weboberfläche erreichbar unter: " + url_string)
# OpenCV is used to access the webcam.
video_capture = cv2.VideoCapture(0)
detector = cv2.QRCodeDetector()
blocker = False
blockset = 0
@app.get('/video/frame')
# Thanks to FastAPI's `app.get` it is easy to create a web route which always provides the latest image from OpenCV.
async def grab_video_frame() -> Response:
nonlocal blocker
if time.time() - blockset > 5:
blocker = False
if not video_capture.isOpened():
return placeholder
# The `video_capture.read` call is a blocking function.
# So we run it in a separate thread (default executor) to avoid blocking the event loop.
_, frame = await run.io_bound(video_capture.read)
if frame is None:
return placeholder
# `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL.
jpeg = await run.cpu_bound(convert, frame)
# QR-Handling
def function_call():
r = requests.get(str(a))
print(r.content())
print("Inside Function_call")
#b = webbrowser.open(str(a))
if r.status_code == 200:
print('Erkannt')
if r.json()["stampstatus"]:
playsound('ui-on.mp3')
elif not r.json()["stampstatus"]:
playsound('ui-off.mp3')
else:
playsound('ui-sound.mp3')
nonlocal blocker
nonlocal blockset
blocker = True
blockset = time.time()
if not blocker:
_, img = video_capture.read()
# detect and decode
data, bbox, _ = detector.detectAndDecode(img)
# check if there is a QRCode in the image
if data:
a = data
function_call()
# cv2.imshow("QRCODEscanner", img)
if cv2.waitKey(1) == ord("q"):
function_call()
return Response(content=jpeg, media_type='image/jpeg')
# For non-flickering image updates and automatic bandwidth adaptation an interactive image is much better than `ui.image()`.
video_image = ui.interactive_image().classes('w-full h-full')
# A timer constantly updates the source of the image.
# Because data from same paths is cached by the browser,
# we must force an update by adding the current timestamp to the source.
ui.timer(interval=0.1, callback=lambda: video_image.set_source(f'/video/frame?{time.time()}'))
async def disconnect() -> None:
"""Disconnect all clients from current running server."""
for client_id in Client.instances:
await core.sio.disconnect(client_id)
def handle_sigint(signum, frame) -> None:
# `disconnect` is async, so it must be called from the event loop; we use `ui.timer` to do so.
ui.timer(0.1, disconnect, once=True)
# Delay the default handler to allow the disconnect to complete.
ui.timer(1, lambda: signal.default_int_handler(signum, frame), once=True)
async def cleanup() -> None:
# This prevents ugly stack traces when auto-reloading on code change,
# because otherwise disconnected clients try to reconnect to the newly started server.
await disconnect()
# Release the webcam hardware so it can be used by other applications again.
video_capture.release()
app.on_shutdown(cleanup)
# We also need to disconnect clients when the app is stopped with Ctrl+C,
# because otherwise they will keep requesting images which lead to unfinished subprocesses blocking the shutdown.
signal.signal(signal.SIGINT, handle_sigint)
# All the setup is only done when the server starts. This avoids the webcam being accessed
# by the auto-reload main process (see https://github.com/zauberzeug/nicegui/discussions/2321).
app.on_startup(setup)
ui.run(favicon="favicon.svg", port=port, language='de-DE', show_welcome_message=False)
if __name__ in ("__main__", "__mp_main__"):
parser = argparse.ArgumentParser(description=f'{app_title}-QR-Scanner {app_version}')
parser.add_argument('--webgui', help='Web-GUI starten', action="store_true")
parser.add_argument('-p', help="Port, über den die Weboberfläche erreichbar ist")
args = parser.parse_args()
Commandline_Header()
print("QR-Scanner")
if args.webgui:
try:
port = int(args.p)
except:
port = False
if not port == False:
visual_interface(port)
else:
print("Ungültiger Port")
print("Beende")
quit()

View File

@ -1,22 +0,0 @@
import cv2
import webbrowser
cap = cv2.VideoCapture(0)
# initialize the cv2 QRCode detector
detector = cv2.QRCodeDetector()
while True:
_, img = cap.read()
# detect and decode
data, bbox, _ = detector.detectAndDecode(img)
# check if there is a QRCode in the image
if data:
a = data
break
cv2.imshow("QRCODEscanner", img)
if cv2.waitKey(1) == ord("q"):
break
b = webbrowser.open(str(a))
cap.release()
cv2.destroyAllWindows()

View File

@ -73,6 +73,7 @@
"2030-10-30": "Reformationstag",
"2030-12-25": "1. Weihnachtsfeiertag",
"2030-12-26": "2. Weihnachtsfeiertag",
"2025-06-11": "Testeintrag"
"2025-06-11": "Testeintrag",
"2025-05-31": "Testeintrag"
}
}

View File

@ -1,12 +0,0 @@
{
"admin_user": "admin",
"admin_password": "8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92",
"port": "8090",
"secret": "ftgzuhjikg,mt5jn46uzer8sfi9okrmtzjhndfierko5zltjhdgise",
"holidays": {
"2024-05-01": "Tag der Arbeit",
"2024-12-25": "1. Weihnachtsfeiertag",
"2025-01-01": "Neujahr",
"2025-05-01": "Tag der Arbeit"
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,22 +0,0 @@
{
"2024-04-01": {
"0": "0",
"1": "8",
"2": "8",
"3": "8",
"4": "8",
"5": "8",
"6": "0",
"vacation": "30"
},
"2024-04-07": {
"0": "0",
"1": "6",
"2": "6",
"3": "6",
"4": "8",
"5": "6",
"6": "0",
"vacation": "28"
}
}

View File

@ -1,5 +1,6 @@
{
"archived": 0,
"archived": 1,
"total_hours": 0,
"absence": {}
"absence": {},
"overtime": -406441
}

View File

@ -1,113 +0,0 @@
#!/usr/bin/env python3
import base64
import signal
import time
import webbrowser
import cv2
import numpy as np
from fastapi import Response
from nicegui import Client, app, core, run, ui
# In case you don't have a webcam, this will provide a black placeholder image.
black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII='
placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png')
def convert(frame: np.ndarray) -> bytes:
"""Converts a frame from OpenCV to a JPEG image.
This is a free function (not in a class or inner-function),
to allow run.cpu_bound to pickle it and send it to a separate process.
"""
_, imencode_image = cv2.imencode('.jpg', frame)
return imencode_image.tobytes()
def setup() -> None:
# OpenCV is used to access the webcam.
video_capture = cv2.VideoCapture(0)
detector = cv2.QRCodeDetector()
blocker = False
blockset = 0
@app.get('/video/frame')
# Thanks to FastAPI's `app.get` it is easy to create a web route which always provides the latest image from OpenCV.
async def grab_video_frame() -> Response:
nonlocal blocker
if time.time() - blockset > 5:
blocker = False
if not video_capture.isOpened():
return placeholder
# The `video_capture.read` call is a blocking function.
# So we run it in a separate thread (default executor) to avoid blocking the event loop.
_, frame = await run.io_bound(video_capture.read)
if frame is None:
return placeholder
# `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL.
jpeg = await run.cpu_bound(convert, frame)
# QR-Handling
def function_call():
b = webbrowser.open(str(a))
print('\a')
nonlocal blocker
nonlocal blockset
blocker = True
blockset = time.time()
if not blocker:
_, img = video_capture.read()
# detect and decode
data, bbox, _ = detector.detectAndDecode(img)
# check if there is a QRCode in the image
if data:
a = data
function_call()
# cv2.imshow("QRCODEscanner", img)
if cv2.waitKey(1) == ord("q"):
function_call()
return Response(content=jpeg, media_type='image/jpeg')
# For non-flickering image updates and automatic bandwidth adaptation an interactive image is much better than `ui.image()`.
video_image = ui.interactive_image().classes('w-full h-full')
# A timer constantly updates the source of the image.
# Because data from same paths is cached by the browser,
# we must force an update by adding the current timestamp to the source.
ui.timer(interval=0.1, callback=lambda: video_image.set_source(f'/video/frame?{time.time()}'))
async def disconnect() -> None:
"""Disconnect all clients from current running server."""
for client_id in Client.instances:
await core.sio.disconnect(client_id)
def handle_sigint(signum, frame) -> None:
# `disconnect` is async, so it must be called from the event loop; we use `ui.timer` to do so.
ui.timer(0.1, disconnect, once=True)
# Delay the default handler to allow the disconnect to complete.
ui.timer(1, lambda: signal.default_int_handler(signum, frame), once=True)
async def cleanup() -> None:
# This prevents ugly stack traces when auto-reloading on code change,
# because otherwise disconnected clients try to reconnect to the newly started server.
await disconnect()
# Release the webcam hardware so it can be used by other applications again.
video_capture.release()
app.on_shutdown(cleanup)
# We also need to disconnect clients when the app is stopped with Ctrl+C,
# because otherwise they will keep requesting images which lead to unfinished subprocesses blocking the shutdown.
signal.signal(signal.SIGINT, handle_sigint)
# All the setup is only done when the server starts. This avoids the webcam being accessed
# by the auto-reload main process (see https://github.com/zauberzeug/nicegui/discussions/2321).
app.on_startup(setup)
ui.run(port=9005)