dav_common module

This commit is contained in:
Jörn-Michael Miehe 2022-10-14 21:42:05 +00:00
parent 0c51199215
commit f7f3fec891
2 changed files with 56 additions and 53 deletions

View file

@ -0,0 +1,31 @@
import re
from io import BytesIO
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
_WEBDAV_CLIENT = WebDAVclient({
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
})
async def list_files(regex: re.Pattern, path: str = "") -> list[str]:
ls = _WEBDAV_CLIENT.list(path)
return [
path
for path in ls
if regex.search(path)
]
async def get_file(path: str) -> BytesIO:
resource = _WEBDAV_CLIENT.resource(path)
buffer = BytesIO()
resource.write_to(buffer)
buffer.seek(0)
return buffer

View file

@ -1,28 +1,20 @@
import colorsys
import random
import re
from datetime import date
# from datetime import date
from io import BytesIO
import numpy as np
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from PIL import Image, ImageDraw, ImageFont
from webdav3.client import Client as WebDAVclient
from ..settings import SETTINGS
from ..dav_common import get_file, list_files
router = APIRouter(prefix="/days", tags=["days"])
loesungswort = "ABCDEFGHIJKLMNOPQRSTUVWX"
_WEBDAV_CLIENT = WebDAVclient({
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
})
async def shuffle(string: str) -> str:
rnd = random.Random(loesungswort)
@ -30,7 +22,7 @@ async def shuffle(string: str) -> str:
@router.on_event("startup")
async def narf() -> None:
async def startup() -> None:
print(loesungswort)
print(await shuffle(loesungswort))
@ -42,30 +34,30 @@ async def get_letter(
return (await shuffle(loesungswort))[index]
@router.get("/date")
def get_date() -> int:
return date.today().day
# @router.get("/date")
# def get_date() -> int:
# return date.today().day
@router.get(
"/picture",
response_class=StreamingResponse,
)
async def get_picture():
img = Image.open("hand.png").convert("RGBA")
d1 = ImageDraw.Draw(img)
font = ImageFont.truetype("Lena.ttf", 50)
d1.text((260, 155), "W", font=font, fill=(0, 0, 255))
# d1.text(xy=(400, 210), text="Deine Hände auch?",
# font=Font, fill=(255, 0, 0))
img_buffer = BytesIO()
img.save(img_buffer, format="PNG", quality=85)
img_buffer.seek(0)
# @router.get(
# "/picture",
# response_class=StreamingResponse,
# )
# async def get_picture():
# img = Image.open("hand.png").convert("RGBA")
# d1 = ImageDraw.Draw(img)
# font = ImageFont.truetype("Lena.ttf", 50)
# d1.text((260, 155), "W", font=font, fill=(0, 0, 255))
# # d1.text(xy=(400, 210), text="Deine Hände auch?",
# # font=Font, fill=(255, 0, 0))
# img_buffer = BytesIO()
# img.save(img_buffer, format="PNG", quality=85)
# img_buffer.seek(0)
return StreamingResponse(
content=img_buffer,
media_type="image/png",
)
# return StreamingResponse(
# content=img_buffer,
# media_type="image/png",
# )
_RE_IMAGE_FILE = re.compile(
r"\.(gif|jpe?g|tiff?|png|bmp)$",
@ -73,24 +65,6 @@ _RE_IMAGE_FILE = re.compile(
)
async def list_images(path: str) -> list[str]:
ls = _WEBDAV_CLIENT.list(path)
return [
path
for path in ls
if _RE_IMAGE_FILE.search(path)
]
async def get_file(path: str) -> BytesIO:
resource = _WEBDAV_CLIENT.resource(path)
buffer = BytesIO()
resource.write_to(buffer)
buffer.seek(0)
return buffer
async def load_picture_standard(
index: int,
) -> Image.Image:
@ -99,11 +73,9 @@ async def load_picture_standard(
aus der Mitte nehmen
"""
print(await list_images(""))
# Bild laden
rnd = random.Random(f"{loesungswort}{index}")
dat = rnd.choice(await list_images(""))
dat = rnd.choice(await list_files(_RE_IMAGE_FILE))
img_buffer = await get_file(dat)
img = Image.open(img_buffer)