From 57c152bd7f9576f6fa9be54732da7555d9c92df9 Mon Sep 17 00:00:00 2001 From: Alexander Malzkuhn Date: Sun, 18 May 2025 22:06:26 +0200 Subject: [PATCH 1/2] Erste Version QR-Scanner mit UI --- qr_scanner.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 qr_scanner.py diff --git a/qr_scanner.py b/qr_scanner.py new file mode 100644 index 0000000..dea233b --- /dev/null +++ b/qr_scanner.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +import base64 +import signal +import time +import webbrowser +import argparse +import requests + +import cv2 +import numpy as np +from fastapi import Response +from definitions import app_title, app_version + +from nicegui import Client, app, core, run, ui + +class Commandline_Header: + message_string = f"{app_title} {app_version}" + underline = "" + for i in range(len(message_string)): + underline += "-" + print(message_string) + print(underline) + +def visual_interface(port=9000): + # In case you don't have a webcam, this will provide a black placeholder image. + black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII=' + placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png') + + global convert + def convert(frame: np.ndarray) -> bytes: + """Converts a frame from OpenCV to a JPEG image. + + This is a free function (not in a class or inner-function), + to allow run.cpu_bound to pickle it and send it to a separate process. + """ + _, imencode_image = cv2.imencode('.jpg', frame) + return imencode_image.tobytes() + + global setup + def setup() -> None: + + url_string = "" + for i in list(app.urls): + url_string += f"{i}, " + url_string = url_string[0:-2] + print("Weboberfläche erreichbar unter: " + url_string) + + # OpenCV is used to access the webcam. + video_capture = cv2.VideoCapture(0) + detector = cv2.QRCodeDetector() + + blocker = False + blockset = 0 + + + @app.get('/video/frame') + # Thanks to FastAPI's `app.get` it is easy to create a web route which always provides the latest image from OpenCV. + async def grab_video_frame() -> Response: + nonlocal blocker + if time.time() - blockset > 5: + blocker = False + + if not video_capture.isOpened(): + return placeholder + # The `video_capture.read` call is a blocking function. + # So we run it in a separate thread (default executor) to avoid blocking the event loop. + _, frame = await run.io_bound(video_capture.read) + if frame is None: + return placeholder + # `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL. + jpeg = await run.cpu_bound(convert, frame) + + # QR-Handling + + def function_call(): + r = requests.get(str(a)) + #b = webbrowser.open(str(a)) + if r.status_code == 200: + print(f'Request geglückt: {a}') + nonlocal blocker + nonlocal blockset + blocker = True + blockset = time.time() + + if not blocker: + _, img = video_capture.read() + # detect and decode + data, bbox, _ = detector.detectAndDecode(img) + # check if there is a QRCode in the image + if data: + a = data + function_call() + # cv2.imshow("QRCODEscanner", img) + if cv2.waitKey(1) == ord("q"): + function_call() + + return Response(content=jpeg, media_type='image/jpeg') + + # For non-flickering image updates and automatic bandwidth adaptation an interactive image is much better than `ui.image()`. + video_image = ui.interactive_image().classes('w-full h-full') + # A timer constantly updates the source of the image. + # Because data from same paths is cached by the browser, + # we must force an update by adding the current timestamp to the source. + + ui.timer(interval=0.1, callback=lambda: video_image.set_source(f'/video/frame?{time.time()}')) + + async def disconnect() -> None: + """Disconnect all clients from current running server.""" + for client_id in Client.instances: + await core.sio.disconnect(client_id) + + def handle_sigint(signum, frame) -> None: + # `disconnect` is async, so it must be called from the event loop; we use `ui.timer` to do so. + ui.timer(0.1, disconnect, once=True) + # Delay the default handler to allow the disconnect to complete. + ui.timer(1, lambda: signal.default_int_handler(signum, frame), once=True) + + async def cleanup() -> None: + # This prevents ugly stack traces when auto-reloading on code change, + # because otherwise disconnected clients try to reconnect to the newly started server. + await disconnect() + # Release the webcam hardware so it can be used by other applications again. + video_capture.release() + + app.on_shutdown(cleanup) + # We also need to disconnect clients when the app is stopped with Ctrl+C, + # because otherwise they will keep requesting images which lead to unfinished subprocesses blocking the shutdown. + signal.signal(signal.SIGINT, handle_sigint) + + + # All the setup is only done when the server starts. This avoids the webcam being accessed + # by the auto-reload main process (see https://github.com/zauberzeug/nicegui/discussions/2321). + app.on_startup(setup) + ui.run(favicon="favicon.svg", port=port, language='de-DE', show_welcome_message=False) + +if __name__ in ("__main__", "__mp_main__"): + parser = argparse.ArgumentParser(description=f'{app_title}-QR-Scanner {app_version}') + parser.add_argument('--webgui', help='Web-GUI starten', action="store_true") + parser.add_argument('-p', help="Port, über den die Weboberfläche erreichbar ist") + args = parser.parse_args() + + Commandline_Header() + print("QR-Scanner") + + if args.webgui: + try: + port = int(args.p) + except: + port = False + if not port == False: + visual_interface(port) + else: + print("Ungültiger Port") + print("Beende") + quit() From 16dfddfc799931cc956a221a5f27f5bd5323e805 Mon Sep 17 00:00:00 2001 From: Alexander Malzkuhn Date: Wed, 21 May 2025 14:53:09 +0200 Subject: [PATCH 2/2] Umstellung auf JSON Antwort --- api.py | 28 +++++++++++++++------------- qr_scanner.py | 12 ++++++++++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/api.py b/api.py index ca2816b..db2cafd 100644 --- a/api.py +++ b/api.py @@ -428,10 +428,10 @@ def page_overview_absence(username: str, year: int): else: login = login_mask(target=f'/api/absence/{username}/{year}') -@ui.page('/api/stamp/{api_key}') -def page_api_stamp(api_key: str): +@app.get('/api/stamp/{api_key}') +def json_stamp(api_key: str): userlist = list_users() - user_dict = { } + user_dict = {} # Dictionary mit Usernamen befüllen for i in userlist: user_dict[i] = "" @@ -442,20 +442,23 @@ def page_api_stamp(api_key: str): except: pass - found_key = False - - ui.page_title(f'{app_title} {app_version}') - + returndata = {} for user_key, api_value in user_dict.items(): if api_key == api_value: current_user = user(user_key) current_user.timestamp() - found_key = True - ui.label(f'Zeitstempel {datetime.now().strftime("%H:%M")} für {current_user.fullname} eingetragen') - break - if found_key == False: - ui.label("Keinen passenden Benutzer gefunden") + + returndata["username"] = current_user.username + if current_user.stamp_status() == status_in: + returndata["stampstatus"] = True + else: + returndata["stampstatus"] = False + break + else: + returndata["username"] = None + + return returndata @app.get("/api/json/{api_key}") def json_info(api_key: str): @@ -481,7 +484,6 @@ def json_info(api_key: str): month = now_dt.month day = now_dt.day - found_key = True data = { } data["user"] = current_user.username if current_user.stamp_status() == status_in: diff --git a/qr_scanner.py b/qr_scanner.py index dea233b..3f4a21e 100644 --- a/qr_scanner.py +++ b/qr_scanner.py @@ -2,13 +2,13 @@ import base64 import signal import time -import webbrowser import argparse import requests import cv2 import numpy as np from fastapi import Response +from playsound3 import playsound from definitions import app_title, app_version from nicegui import Client, app, core, run, ui @@ -74,9 +74,17 @@ def visual_interface(port=9000): def function_call(): r = requests.get(str(a)) + print(r.content()) + print("Inside Function_call") #b = webbrowser.open(str(a)) if r.status_code == 200: - print(f'Request geglückt: {a}') + print('Erkannt') + if r.json()["stampstatus"]: + playsound('ui-on.mp3') + elif not r.json()["stampstatus"]: + playsound('ui-off.mp3') + else: + playsound('ui-sound.mp3') nonlocal blocker nonlocal blockset blocker = True