"core" module alpha state

This commit is contained in:
Jörn-Michael Miehe 2023-09-08 18:17:18 +00:00
parent d51db8b836
commit b1748ea0fb
14 changed files with 240 additions and 414 deletions

View file

@ -2,8 +2,8 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from .core.settings import SETTINGS
from .routers import router from .routers import router
from .settings import SETTINGS
app = FastAPI( app = FastAPI(
title="Advent22 API", title="Advent22 API",

View file

@ -1,53 +0,0 @@
import tomllib
from typing import TypeAlias
import tomli_w
from pydantic import BaseModel
from .config import get_config
from .dav_common import dav_get_textfile_content, dav_write_textfile_content
from .settings import SETTINGS
class DoorSaved(BaseModel):
# Tag, an dem die Tür aufgeht
day: int
# Koordinaten für zwei Eckpunkte
x1: int
y1: int
x2: int
y2: int
DoorsSaved: TypeAlias = list[DoorSaved]
class CalendarConfig(BaseModel):
# Dateiname Hintergrundbild
background: str = "adventskalender.jpg"
# Türen für die UI
doors: DoorsSaved = []
async def get_calendar_config() -> CalendarConfig:
cfg = await get_config()
txt = await dav_get_textfile_content(
path=f"files/{cfg.puzzle.calendar}",
)
return CalendarConfig.model_validate(tomllib.loads(txt))
async def set_calendar_config(cal_cfg: CalendarConfig) -> None:
await dav_write_textfile_content(
path=SETTINGS.config_filename,
content=tomli_w.dumps(
cal_cfg.model_dump(
exclude_defaults=True,
exclude_unset=True,
)
),
)

View file

@ -48,7 +48,7 @@ class AdventImage:
self, self,
xy: _XY, xy: _XY,
text: str | bytes, text: str | bytes,
font: "ImageFont._Font", font: ImageFont._Font,
anchor: str | None = "mm", anchor: str | None = "mm",
**text_kwargs, **text_kwargs,
) -> Image._Box | None: ) -> Image._Box | None:
@ -92,7 +92,7 @@ class AdventImage:
self, self,
xy: _XY, xy: _XY,
text: str | bytes, text: str | bytes,
font: "ImageFont._Font", font: ImageFont._Font,
anchor: str | None = "mm", anchor: str | None = "mm",
**text_kwargs, **text_kwargs,
) -> None: ) -> None:

View file

@ -0,0 +1,25 @@
from typing import TypeAlias
from pydantic import BaseModel
class DoorSaved(BaseModel):
# Tag, an dem die Tür aufgeht
day: int
# Koordinaten für zwei Eckpunkte
x1: int
y1: int
x2: int
y2: int
DoorsSaved: TypeAlias = list[DoorSaved]
class CalendarConfig(BaseModel):
# Dateiname Hintergrundbild
background: str = "adventskalender.jpg"
# Türen für die UI
doors: DoorsSaved = []

View file

@ -1,10 +1,5 @@
import tomllib
from pydantic import BaseModel from pydantic import BaseModel
from .dav_common import dav_get_textfile_content
from .settings import SETTINGS
class User(BaseModel): class User(BaseModel):
name: str name: str
@ -43,9 +38,3 @@ class Config(BaseModel):
admin: User admin: User
server: Server server: Server
puzzle: Puzzle puzzle: Puzzle
async def get_config() -> Config:
txt = await dav_get_textfile_content(path=SETTINGS.config_filename)
return Config.model_validate(tomllib.loads(txt))

View file

@ -0,0 +1,152 @@
import tomllib
from typing import cast
import tomli_w
from fastapi import Depends
from PIL import Image, ImageFont
from .advent_image import _XY, AdventImage
from .calendar_config import CalendarConfig
from .config import Config
from .image_helpers import list_images_auto, load_image
from .sequence_helpers import Random, set_len, shuffle
from .settings import SETTINGS
from .webdav import WebDAV
class AllTime:
@staticmethod
async def get_config() -> Config:
"""
Globale Konfiguration lesen
"""
txt = await WebDAV.read_str(path=SETTINGS.config_filename)
return Config.model_validate(tomllib.loads(txt))
@staticmethod
async def get_calendar_config(
cfg: Config = Depends(get_config),
) -> CalendarConfig:
"""
Kalender Konfiguration lesen
"""
txt = await WebDAV.read_str(path=f"files/{cfg.puzzle.calendar}")
return CalendarConfig.model_validate(tomllib.loads(txt))
@staticmethod
async def set_calendar_config(
cal_cfg: CalendarConfig,
cfg: Config = Depends(get_config),
) -> None:
"""
Kalender Konfiguration ändern
"""
await WebDAV.write_str(
path=f"files/{cfg.puzzle.calendar}",
content=tomli_w.dumps(
cal_cfg.model_dump(
exclude_defaults=True,
exclude_unset=True,
)
),
)
@staticmethod
async def shuffle_solution(
cfg: Config = Depends(get_config),
) -> str:
"""
Lösung: Reihenfolge zufällig bestimmen
"""
return "".join(await shuffle(cfg.puzzle.solution))
@staticmethod
async def shuffle_images_auto(
images: list[str] = Depends(list_images_auto),
) -> list[str]:
"""
Bilder: Reihenfolge zufällig bestimmen
"""
ls = set_len(images, 24)
return await shuffle(ls)
class Today:
@staticmethod
async def get_part(
day: int,
shuffled_solution: str = Depends(AllTime.shuffle_solution),
) -> str:
"""
Heute angezeigter Teil der Lösung
"""
return shuffled_solution[day]
@staticmethod
async def get_random(
day: int,
) -> Random:
"""
Tagesabhängige Zufallszahlen
"""
return await Random.get(day)
@staticmethod
async def gen_auto_image(
day: int,
images: list[str] = Depends(AllTime.shuffle_images_auto),
cfg: Config = Depends(AllTime.get_config),
rnd: Random = Depends(get_random),
part: str = Depends(get_part),
) -> Image.Image:
"""
Automatisch generiertes Bild erstellen
"""
# Datei existiert garantiert!
img = await load_image(images[day])
image = await AdventImage.from_img(img)
font = ImageFont.truetype(
font=await WebDAV.read_bytes(f"files/{cfg.server.font}"),
size=50,
)
# Buchstaben verstecken
for letter in part:
await image.hide_text(
xy=cast(_XY, tuple(rnd.choices(range(30, 470), k=2))),
text=letter,
font=font,
)
return image.img
@staticmethod
async def get_image(
day: int,
images: list[str] = Depends(AllTime.shuffle_images_auto),
cfg: Config = Depends(AllTime.get_config),
rnd: Random = Depends(get_random),
part: str = Depends(get_part),
) -> Image.Image:
"""
Bild für einen Tag abrufen
"""
try:
# Versuche, aus "manual"-Ordner zu laden
return await load_image(f"images_manual/{day}.jpg")
except RuntimeError:
# Erstelle automatisch generiertes Bild
return await Today.gen_auto_image(
day=day, images=images, cfg=cfg, rnd=rnd, part=part
)

View file

@ -0,0 +1,46 @@
import re
from io import BytesIO
from fastapi.responses import StreamingResponse
from PIL import Image
from .webdav import WebDAV
async def list_images_auto() -> list[str]:
"""
Finde alle Bilddateien im "automatisch"-Verzeichnis
"""
return await WebDAV.list_files(
directory="/images_auto",
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(await WebDAV.read_bytes(file_name))
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)

View file

@ -1,13 +1,14 @@
import itertools
import random import random
from typing import Any, Self, Sequence from typing import Any, Self, Sequence
from ..config import get_config from .depends import AllTime
class Random(random.Random): class Random(random.Random):
@classmethod @classmethod
async def get(cls, bonus_salt: Any = "") -> Self: async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config() cfg = await AllTime.get_config()
return cls(f"{cfg.puzzle.solution}{bonus_salt}{cfg.puzzle.random_pepper}") return cls(f"{cfg.puzzle.solution}{bonus_salt}{cfg.puzzle.random_pepper}")
@ -17,3 +18,11 @@ async def shuffle(seq: Sequence, rnd: random.Random | None = None) -> list:
# Elemente mischen # Elemente mischen
return rnd.sample(seq, len(seq)) return rnd.sample(seq, len(seq))
def set_len(seq: Sequence, length: int) -> list:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, length))

View file

@ -4,7 +4,7 @@ from io import BytesIO
from cache import AsyncTTL from cache import AsyncTTL
from webdav3.client import Client as WebDAVclient from webdav3.client import Client as WebDAVclient
from ..settings import SETTINGS from .settings import SETTINGS
class WebDAV: class WebDAV:
@ -21,8 +21,8 @@ class WebDAV:
@classmethod @classmethod
async def list_files( async def list_files(
cls, cls,
*,
directory: str = "", directory: str = "",
*,
regex: re.Pattern[str] = re.compile(""), regex: re.Pattern[str] = re.compile(""),
) -> list[str]: ) -> list[str]:
""" """

View file

@ -1,55 +0,0 @@
import re
from io import BytesIO, TextIOWrapper
from cache import AsyncTTL
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
_WEBDAV_CLIENT = WebDAVclient(
{
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
}
)
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_list_files(regex: re.Pattern[str], directory: str = "") -> list[str]:
ls = _WEBDAV_CLIENT.list(directory)
return [f"{directory}/{path}" for path in ls if regex.search(path)]
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_file_exists(path: str) -> bool:
return _WEBDAV_CLIENT.check(path)
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_get_file(path: str) -> BytesIO:
resource = _WEBDAV_CLIENT.resource(path)
buffer = BytesIO()
resource.write_to(buffer)
return buffer
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_get_textfile_content(path: str, encoding="utf-8") -> str:
buffer = await dav_get_file(path)
tio = TextIOWrapper(buffer, encoding=encoding)
tio.seek(0)
return tio.read().strip()
async def dav_write_file(path: str, buffer: BytesIO) -> None:
resource = _WEBDAV_CLIENT.resource(path)
resource.read_from(buffer)
async def dav_write_textfile_content(path: str, content: str, encoding="utf-8") -> None:
buffer = BytesIO(content.encode(encoding=encoding))
buffer.seek(0)
await dav_write_file(path, buffer)

View file

@ -2,7 +2,7 @@
import uvicorn import uvicorn
from .settings import SETTINGS from .core.settings import SETTINGS
def main() -> None: def main() -> None:

View file

@ -1,137 +0,0 @@
import colorsys
from dataclasses import dataclass
from typing import Self, TypeAlias, cast
import numpy as np
from PIL import Image, ImageDraw, ImageFont
_RGB: TypeAlias = tuple[int, int, int]
_XY: TypeAlias = tuple[float, float]
@dataclass
class AdventImage:
img: Image.Image
@classmethod
async def load_standard(cls, fp) -> Self:
"""
Bild laden und einen quadratischen Ausschnitt
aus der Mitte nehmen
"""
# Bild laden
img = Image.open(fp=fp)
# Größen bestimmen
width, height = img.size
square = min(width, height)
# Bild zuschneiden und skalieren
img = img.crop(
box=(
int((width - square) / 2),
int((height - square) / 2),
int((width + square) / 2),
int((height + square) / 2),
)
)
img = img.resize(
size=(500, 500),
resample=Image.LANCZOS,
)
# Farbmodell festlegen
return cls(img=img.convert("RGB"))
async def get_text_box(
self,
xy: _XY,
text: str | bytes,
font: "ImageFont._Font",
anchor: str | None = "mm",
**text_kwargs,
) -> tuple[int, int, int, int] | None:
"""
Koordinaten (links, oben, rechts, unten) des betroffenen
Rechtecks bestimmen, wenn das Bild mit einem Text
versehen wird
"""
# Neues 1-Bit Bild, gleiche Größe
mask = Image.new(mode="1", size=self.img.size, color=0)
# Text auf Maske auftragen
ImageDraw.Draw(mask).text(
xy=xy,
text=text,
font=font,
anchor=anchor,
fill=1,
**text_kwargs,
)
# betroffenen Pixelbereich bestimmen
return mask.getbbox()
async def get_average_color(
self,
box: tuple[int, int, int, int],
) -> _RGB:
"""
Durchschnittsfarbe eines rechteckigen Ausschnitts in
einem Bild berechnen
"""
pixel_data = self.img.crop(box).getdata()
mean_color: np.ndarray = np.mean(pixel_data, axis=0)
return cast(_RGB, tuple(mean_color.astype(int)))
async def hide_text(
self,
xy: _XY,
text: str | bytes,
font: "ImageFont._Font",
anchor: str | None = "mm",
**text_kwargs,
) -> None:
"""
Text `text` in Bild an Position `xy` verstecken.
Weitere Parameter wie bei `ImageDraw.text()`.
"""
# betroffenen Bildbereich bestimmen
text_box = await self.get_text_box(
xy=xy, text=text, font=font, anchor=anchor, **text_kwargs
)
if text_box is not None:
# Durchschnittsfarbe bestimmen
text_color = await self.get_average_color(
box=text_box,
)
# etwas heller/dunkler machen
tc_h, tc_s, tc_v = colorsys.rgb_to_hsv(*text_color)
tc_v = int((tc_v - 127) * 0.97) + 127
if tc_v < 127:
tc_v += 3
else:
tc_v -= 3
text_color = colorsys.hsv_to_rgb(tc_h, tc_s, tc_v)
text_color = tuple(int(val) for val in text_color)
# Buchstaben verstecken
ImageDraw.Draw(self.img).text(
xy=xy,
text=text,
font=font,
fill=cast(_RGB, text_color),
anchor=anchor,
**text_kwargs,
)

View file

@ -1,150 +0,0 @@
import itertools
import random
import re
from io import BytesIO
from typing import Any, Self, Sequence, cast
from fastapi import Depends
from fastapi.responses import StreamingResponse
from PIL import Image, ImageFont
from ..config import Config, get_config
from ..dav_common import dav_file_exists, dav_get_file, dav_list_files
from ._image import _XY, AdventImage
##########
# RANDOM #
##########
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{bonus_salt}{cfg.puzzle.random_pepper}")
async def set_length(seq: Sequence, length: int) -> list:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, length))
async def shuffle(seq: Sequence, rnd: random.Random | None = None) -> list:
# Zufallsgenerator
rnd = rnd or await Random.get()
# Elemente mischen
return rnd.sample(seq, len(seq))
#########
# IMAGE #
#########
async def get_letter(
index: int,
cfg: Config = Depends(get_config),
) -> str:
return (await shuffle(cfg.puzzle.solution))[index]
async def list_images_auto() -> list[str]:
"""
Finde alle Bilder im "automatisch"-Verzeichnis
"""
ls = await dav_list_files(
re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
"/images_auto",
)
ls = await set_length(ls, 24)
return await shuffle(ls)
async def load_image(
file_name: str,
) -> AdventImage:
"""
Versuche, Bild aus Datei zu laden
"""
if not await dav_file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
img_buffer = await dav_get_file(file_name)
img_buffer.seek(0)
return await AdventImage.load_standard(img_buffer)
async def get_auto_image(
index: int,
letter: str,
images: list[str],
cfg: Config,
) -> AdventImage:
"""
Erstelle automatisch generiertes Bild
"""
# hier niemals RuntimeError!
image = await load_image(images[index])
rnd = await Random.get(index)
font = await dav_get_file(f"files/{cfg.server.font}")
font.seek(0)
# Buchstabe verstecken
await image.hide_text(
xy=cast(_XY, tuple(rnd.choices(range(30, 470), k=2))),
text=letter,
font=ImageFont.truetype(font, 50),
)
return image
async def get_image(
index: int,
letter: str = Depends(get_letter),
images: list[str] = Depends(list_images_auto),
cfg: Config = Depends(get_config),
) -> AdventImage:
"""
Bild für einen Tag erstellen
"""
try:
# Versuche, aus "manual"-Ordner zu laden
return await load_image(f"images_manual/{index}.jpg")
except RuntimeError:
# Erstelle automatisch generiertes Bild
return await get_auto_image(
index=index,
letter=letter,
images=images,
cfg=cfg,
)
async def api_return_image(
img: Image.Image,
) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# Bilddaten in Puffer laden
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
content=img_buffer,
media_type="image/jpeg",
)