164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
import base64
|
|
import signal
|
|
import time
|
|
import argparse
|
|
import requests
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from fastapi import Response
|
|
from playsound3 import playsound
|
|
from definitions import app_title, app_version
|
|
|
|
from nicegui import Client, app, core, run, ui
|
|
|
|
class Commandline_Header:
|
|
message_string = f"{app_title} {app_version}"
|
|
underline = ""
|
|
for i in range(len(message_string)):
|
|
underline += "-"
|
|
print(message_string)
|
|
print(underline)
|
|
|
|
def visual_interface(port=9000):
|
|
# In case you don't have a webcam, this will provide a black placeholder image.
|
|
black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII='
|
|
placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png')
|
|
|
|
global convert
|
|
def convert(frame: np.ndarray) -> bytes:
|
|
"""Converts a frame from OpenCV to a JPEG image.
|
|
|
|
This is a free function (not in a class or inner-function),
|
|
to allow run.cpu_bound to pickle it and send it to a separate process.
|
|
"""
|
|
_, imencode_image = cv2.imencode('.jpg', frame)
|
|
return imencode_image.tobytes()
|
|
|
|
global setup
|
|
def setup() -> None:
|
|
|
|
url_string = ""
|
|
for i in list(app.urls):
|
|
url_string += f"{i}, "
|
|
url_string = url_string[0:-2]
|
|
print("Weboberfläche erreichbar unter: " + url_string)
|
|
|
|
# OpenCV is used to access the webcam.
|
|
video_capture = cv2.VideoCapture(0)
|
|
detector = cv2.QRCodeDetector()
|
|
|
|
blocker = False
|
|
blockset = 0
|
|
|
|
|
|
@app.get('/video/frame')
|
|
# Thanks to FastAPI's `app.get` it is easy to create a web route which always provides the latest image from OpenCV.
|
|
async def grab_video_frame() -> Response:
|
|
nonlocal blocker
|
|
if time.time() - blockset > 5:
|
|
blocker = False
|
|
|
|
if not video_capture.isOpened():
|
|
return placeholder
|
|
# The `video_capture.read` call is a blocking function.
|
|
# So we run it in a separate thread (default executor) to avoid blocking the event loop.
|
|
_, frame = await run.io_bound(video_capture.read)
|
|
if frame is None:
|
|
return placeholder
|
|
# `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL.
|
|
jpeg = await run.cpu_bound(convert, frame)
|
|
|
|
# QR-Handling
|
|
|
|
def function_call():
|
|
r = requests.get(str(a))
|
|
print(r.content())
|
|
print("Inside Function_call")
|
|
#b = webbrowser.open(str(a))
|
|
if r.status_code == 200:
|
|
print('Erkannt')
|
|
if r.json()["stampstatus"]:
|
|
playsound('ui-on.mp3')
|
|
elif not r.json()["stampstatus"]:
|
|
playsound('ui-off.mp3')
|
|
else:
|
|
playsound('ui-sound.mp3')
|
|
nonlocal blocker
|
|
nonlocal blockset
|
|
blocker = True
|
|
blockset = time.time()
|
|
|
|
if not blocker:
|
|
_, img = video_capture.read()
|
|
# detect and decode
|
|
data, bbox, _ = detector.detectAndDecode(img)
|
|
# check if there is a QRCode in the image
|
|
if data:
|
|
a = data
|
|
function_call()
|
|
# cv2.imshow("QRCODEscanner", img)
|
|
if cv2.waitKey(1) == ord("q"):
|
|
function_call()
|
|
|
|
return Response(content=jpeg, media_type='image/jpeg')
|
|
|
|
# For non-flickering image updates and automatic bandwidth adaptation an interactive image is much better than `ui.image()`.
|
|
video_image = ui.interactive_image().classes('w-full h-full')
|
|
# A timer constantly updates the source of the image.
|
|
# Because data from same paths is cached by the browser,
|
|
# we must force an update by adding the current timestamp to the source.
|
|
|
|
ui.timer(interval=0.1, callback=lambda: video_image.set_source(f'/video/frame?{time.time()}'))
|
|
|
|
async def disconnect() -> None:
|
|
"""Disconnect all clients from current running server."""
|
|
for client_id in Client.instances:
|
|
await core.sio.disconnect(client_id)
|
|
|
|
def handle_sigint(signum, frame) -> None:
|
|
# `disconnect` is async, so it must be called from the event loop; we use `ui.timer` to do so.
|
|
ui.timer(0.1, disconnect, once=True)
|
|
# Delay the default handler to allow the disconnect to complete.
|
|
ui.timer(1, lambda: signal.default_int_handler(signum, frame), once=True)
|
|
|
|
async def cleanup() -> None:
|
|
# This prevents ugly stack traces when auto-reloading on code change,
|
|
# because otherwise disconnected clients try to reconnect to the newly started server.
|
|
await disconnect()
|
|
# Release the webcam hardware so it can be used by other applications again.
|
|
video_capture.release()
|
|
|
|
app.on_shutdown(cleanup)
|
|
# We also need to disconnect clients when the app is stopped with Ctrl+C,
|
|
# because otherwise they will keep requesting images which lead to unfinished subprocesses blocking the shutdown.
|
|
signal.signal(signal.SIGINT, handle_sigint)
|
|
|
|
|
|
# All the setup is only done when the server starts. This avoids the webcam being accessed
|
|
# by the auto-reload main process (see https://github.com/zauberzeug/nicegui/discussions/2321).
|
|
app.on_startup(setup)
|
|
ui.run(favicon="favicon.svg", port=port, language='de-DE', show_welcome_message=False)
|
|
|
|
if __name__ in ("__main__", "__mp_main__"):
|
|
parser = argparse.ArgumentParser(description=f'{app_title}-QR-Scanner {app_version}')
|
|
parser.add_argument('--webgui', help='Web-GUI starten', action="store_true")
|
|
parser.add_argument('-p', help="Port, über den die Weboberfläche erreichbar ist")
|
|
args = parser.parse_args()
|
|
|
|
Commandline_Header()
|
|
print("QR-Scanner")
|
|
|
|
if args.webgui:
|
|
try:
|
|
port = int(args.p)
|
|
except:
|
|
port = False
|
|
if not port == False:
|
|
visual_interface(port)
|
|
else:
|
|
print("Ungültiger Port")
|
|
print("Beende")
|
|
quit()
|