Merge branch 'feature/api-refactoring' into develop

This commit is contained in:
Jörn-Michael Miehe 2023-09-08 20:02:24 +00:00
commit 3153f57613
18 changed files with 438 additions and 349 deletions

View file

@ -14,6 +14,7 @@
],
"env": {
"PYDEVD_DISABLE_FILE_VALIDATION": "1",
"CACHE_TTL": "30",
},
"justMyCode": true,
}

View file

@ -2,8 +2,8 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from .core.settings import SETTINGS
from .routers import router
from .settings import SETTINGS
app = FastAPI(
title="Advent22 API",

View file

@ -1,52 +0,0 @@
import tomllib
from typing import TypeAlias
import tomli_w
from pydantic import BaseModel
from .config import Config, get_config
from .dav_common import dav_get_textfile_content, dav_write_textfile_content
class DoorSaved(BaseModel):
# Tag, an dem die Tür aufgeht
day: int
# Koordinaten für zwei Eckpunkte
x1: int
y1: int
x2: int
y2: int
DoorsSaved: TypeAlias = list[DoorSaved]
class CalendarConfig(BaseModel):
# Dateiname Hintergrundbild
background: str = "adventskalender.jpg"
# Türen für die UI
doors: DoorsSaved = []
async def get_calendar_config() -> CalendarConfig:
cfg = await get_config()
txt = await dav_get_textfile_content(
path=f"files/{cfg.puzzle.calendar}",
)
return CalendarConfig.model_validate(tomllib.loads(txt))
async def set_calendar_config(cfg: Config, cal_cfg: CalendarConfig) -> None:
await dav_write_textfile_content(
path=f"files/{cfg.puzzle.calendar}",
content=tomli_w.dumps(
cal_cfg.model_dump(
exclude_defaults=True,
exclude_unset=True,
)
),
)

View file

@ -9,25 +9,24 @@ _RGB: TypeAlias = tuple[int, int, int]
_XY: TypeAlias = tuple[float, float]
@dataclass
@dataclass(slots=True, frozen=True)
class AdventImage:
img: Image.Image
@classmethod
async def load_standard(cls, fp) -> Self:
async def from_img(cls, img: Image.Image) -> Self:
"""
Bild laden und einen quadratischen Ausschnitt
aus der Mitte nehmen
Einen quadratischen Ausschnitt aus der Mitte des Bilds nehmen
"""
# Bild laden
img = Image.open(fp=fp)
# Farbmodell festlegen
img = img.convert(mode="RGB")
# Größen bestimmen
width, height = img.size
square = min(width, height)
# Bild zuschneiden und skalieren
# zuschneiden
img = img.crop(
box=(
int((width - square) / 2),
@ -37,13 +36,13 @@ class AdventImage:
)
)
img = img.resize(
# skalieren
return cls(
img.resize(
size=(500, 500),
resample=Image.LANCZOS,
)
# Farbmodell festlegen
return cls(img=img.convert("RGB"))
)
async def get_text_box(
self,
@ -52,7 +51,7 @@ class AdventImage:
font: "ImageFont._Font",
anchor: str | None = "mm",
**text_kwargs,
) -> tuple[int, int, int, int] | None:
) -> "Image._Box | None":
"""
Koordinaten (links, oben, rechts, unten) des betroffenen
Rechtecks bestimmen, wenn das Bild mit einem Text
@ -77,8 +76,8 @@ class AdventImage:
async def get_average_color(
self,
box: tuple[int, int, int, int],
) -> _RGB:
box: "Image._Box",
) -> tuple[int, int, int]:
"""
Durchschnittsfarbe eines rechteckigen Ausschnitts in
einem Bild berechnen

View file

@ -0,0 +1,57 @@
import tomllib
from typing import TypeAlias
import tomli_w
from fastapi import Depends
from pydantic import BaseModel
from .config import Config, get_config
from .webdav import WebDAV
class DoorSaved(BaseModel):
# Tag, an dem die Tür aufgeht
day: int
# Koordinaten für zwei Eckpunkte
x1: int
y1: int
x2: int
y2: int
DoorsSaved: TypeAlias = list[DoorSaved]
class CalendarConfig(BaseModel):
# Dateiname Hintergrundbild
background: str = "adventskalender.jpg"
# Türen für die UI
doors: DoorsSaved = []
async def change(self, cfg: Config) -> None:
"""
Kalender Konfiguration ändern
"""
await WebDAV.write_str(
path=f"files/{cfg.puzzle.calendar}",
content=tomli_w.dumps(
self.model_dump(
exclude_defaults=True,
exclude_unset=True,
)
),
)
async def get_calendar_config(
cfg: Config = Depends(get_config),
) -> CalendarConfig:
"""
Kalender Konfiguration lesen
"""
txt = await WebDAV.read_str(path=f"files/{cfg.puzzle.calendar}")
return CalendarConfig.model_validate(tomllib.loads(txt))

View file

@ -2,8 +2,8 @@ import tomllib
from pydantic import BaseModel
from .dav_common import dav_get_textfile_content
from .settings import SETTINGS
from .webdav import WebDAV
class User(BaseModel):
@ -45,7 +45,10 @@ class Config(BaseModel):
puzzle: Puzzle
async def get_config() -> Config:
txt = await dav_get_textfile_content(path=SETTINGS.config_filename)
async def get_config() -> "Config":
"""
Globale Konfiguration lesen
"""
txt = await WebDAV.read_str(path=SETTINGS.config_filename)
return Config.model_validate(tomllib.loads(txt))

View file

@ -0,0 +1,106 @@
from io import BytesIO
from typing import cast
from fastapi import Depends
from PIL import Image, ImageFont
from .advent_image import _XY, AdventImage
from .config import Config, get_config
from .image_helpers import list_images_auto, load_image
from .sequence_helpers import Random, set_len, shuffle
from .webdav import WebDAV
async def shuffle_solution(
cfg: Config = Depends(get_config),
) -> str:
"""
Lösung: Reihenfolge zufällig bestimmen
"""
return "".join(await shuffle(cfg.puzzle.solution))
async def shuffle_images_auto(
images: list[str] = Depends(list_images_auto),
) -> list[str]:
"""
Bilder: Reihenfolge zufällig bestimmen
"""
ls = set_len(images, 24)
return await shuffle(ls)
async def get_part(
day: int,
shuffled_solution: str = Depends(shuffle_solution),
) -> str:
"""
Heute angezeigter Teil der Lösung
"""
return shuffled_solution[day]
async def get_random(
day: int,
) -> Random:
"""
Tagesabhängige Zufallszahlen
"""
return await Random.get(day)
async def gen_auto_image(
day: int,
auto_images: list[str] = Depends(shuffle_images_auto),
cfg: Config = Depends(get_config),
rnd: Random = Depends(get_random),
part: str = Depends(get_part),
) -> Image.Image:
"""
Automatisch generiertes Bild erstellen
"""
# Datei existiert garantiert!
img = await load_image(auto_images[day])
image = await AdventImage.from_img(img)
font = ImageFont.truetype(
font=BytesIO(await WebDAV.read_bytes(f"files/{cfg.server.font}")),
size=50,
)
# Buchstaben verstecken
for letter in part:
await image.hide_text(
xy=cast(_XY, tuple(rnd.choices(range(30, 470), k=2))),
text=letter,
font=font,
)
return image.img
async def get_image(
day: int,
auto_images: list[str] = Depends(shuffle_images_auto),
cfg: Config = Depends(get_config),
rnd: Random = Depends(get_random),
part: str = Depends(get_part),
) -> Image.Image:
"""
Bild für einen Tag abrufen
"""
try:
# Versuche, aus "manual"-Ordner zu laden
return await load_image(f"images_manual/{day}.jpg")
except RuntimeError:
# Erstelle automatisch generiertes Bild
return await gen_auto_image(
day=day, auto_images=auto_images, cfg=cfg, rnd=rnd, part=part
)

View file

@ -0,0 +1,46 @@
import re
from io import BytesIO
from fastapi.responses import StreamingResponse
from PIL import Image
from .webdav import WebDAV
async def list_images_auto() -> list[str]:
"""
Finde alle Bilddateien im "automatisch"-Verzeichnis
"""
return await WebDAV.list_files(
directory="/images_auto",
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)

View file

@ -0,0 +1,28 @@
import itertools
import random
from typing import Any, Self, Sequence
from .config import get_config
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_pepper}{bonus_salt}")
async def shuffle(seq: Sequence, rnd: random.Random | None = None) -> list:
# Zufallsgenerator
rnd = rnd or await Random.get()
# Elemente mischen
return rnd.sample(seq, len(seq))
def set_len(seq: Sequence, length: int) -> list:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, length))

View file

@ -58,7 +58,7 @@ class Settings(BaseSettings):
webdav: DavSettings = DavSettings()
cache_ttl: int = 30
cache_ttl: int = 60 * 30
config_filename: str = "config.toml"

View file

@ -0,0 +1,81 @@
import re
from io import BytesIO
from cache import AsyncTTL
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
class WebDAV:
_webdav_client = WebDAVclient(
{
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
}
)
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def list_files(
cls,
directory: str = "",
*,
regex: re.Pattern[str] = re.compile(""),
) -> list[str]:
"""
Liste aller Dateien im Ordner `directory`, die zur RegEx `regex` passen
"""
ls = cls._webdav_client.list(directory)
return [f"{directory}/{path}" for path in ls if regex.search(path)]
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def file_exists(cls, path: str) -> bool:
"""
`True`, wenn an Pfad `path` eine Datei existiert
"""
return cls._webdav_client.check(path)
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def read_bytes(cls, path: str) -> bytes:
"""
Datei aus Pfad `path` als bytes laden
"""
buffer = BytesIO()
cls._webdav_client.resource(path).write_to(buffer)
buffer.seek(0)
return buffer.read()
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def read_str(cls, path: str, encoding="utf-8") -> str:
"""
Datei aus Pfad `path` als string laden
"""
return (await cls.read_bytes(path)).decode(encoding=encoding).strip()
@classmethod
async def write_bytes(cls, path: str, buffer: bytes) -> None:
"""
Bytes `buffer` in Datei in Pfad `path` schreiben
"""
cls._webdav_client.resource(path).read_from(buffer)
@classmethod
async def write_str(cls, path: str, content: str, encoding="utf-8") -> None:
"""
String `content` in Datei in Pfad `path` schreiben
"""
await cls.write_bytes(path, content.encode(encoding=encoding))

View file

@ -1,55 +0,0 @@
import re
from io import BytesIO, TextIOWrapper
from cache import AsyncTTL
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
_WEBDAV_CLIENT = WebDAVclient(
{
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
}
)
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_list_files(regex: re.Pattern, directory: str = "") -> list[str]:
ls = _WEBDAV_CLIENT.list(directory)
return [f"{directory}/{path}" for path in ls if regex.search(path)]
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_file_exists(path: str) -> bool:
return _WEBDAV_CLIENT.check(path)
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_get_file(path: str) -> BytesIO:
resource = _WEBDAV_CLIENT.resource(path)
buffer = BytesIO()
resource.write_to(buffer)
return buffer
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_get_textfile_content(path: str, encoding="utf-8") -> str:
buffer = await dav_get_file(path)
tio = TextIOWrapper(buffer, encoding=encoding)
tio.seek(0)
return tio.read().strip()
async def dav_write_file(path: str, buffer: BytesIO) -> None:
resource = _WEBDAV_CLIENT.resource(path)
resource.read_from(buffer)
async def dav_write_textfile_content(path: str, content: str, encoding="utf-8") -> None:
buffer = BytesIO(content.encode(encoding=encoding))
buffer.seek(0)
await dav_write_file(path, buffer)

View file

@ -2,7 +2,7 @@
import uvicorn
from .settings import SETTINGS
from .core.settings import SETTINGS
def main() -> None:

View file

@ -1,152 +0,0 @@
import itertools
import random
import re
from io import BytesIO
from typing import Any, Self, Sequence, cast
from fastapi import Depends
from fastapi.responses import StreamingResponse
from PIL import Image, ImageFont
from ..config import Config, get_config
from ..dav_common import dav_file_exists, dav_get_file, dav_list_files
from ._image import _XY, AdventImage
##########
# RANDOM #
##########
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{bonus_salt}{cfg.puzzle.random_pepper}")
async def set_length(seq: Sequence, length: int) -> list:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, length))
async def shuffle(seq: Sequence, rnd: random.Random | None = None) -> list:
# Zufallsgenerator
rnd = rnd or await Random.get()
# Elemente mischen
return rnd.sample(seq, len(seq))
#########
# IMAGE #
#########
async def get_letter(
index: int,
cfg: Config = Depends(get_config),
) -> str:
return (await shuffle(cfg.puzzle.solution))[index]
_RE_IMAGE_FILE = re.compile(
r"\.(gif|jpe?g|tiff?|png|bmp)$",
flags=re.IGNORECASE,
)
async def list_images_auto() -> list[str]:
"""
Finde alle Bilder im "automatisch"-Verzeichnis
"""
ls = await dav_list_files(_RE_IMAGE_FILE, "/images_auto")
ls = await set_length(ls, 24)
return await shuffle(ls)
async def load_image(
file_name: str,
) -> AdventImage:
"""
Versuche, Bild aus Datei zu laden
"""
if not await dav_file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
img_buffer = await dav_get_file(file_name)
img_buffer.seek(0)
return await AdventImage.load_standard(img_buffer)
async def get_auto_image(
index: int,
letter: str,
images: list[str],
cfg: Config,
) -> AdventImage:
"""
Erstelle automatisch generiertes Bild
"""
# hier niemals RuntimeError!
image = await load_image(images[index])
rnd = await Random.get(index)
font = await dav_get_file(f"files/{cfg.server.font}")
font.seek(0)
# Buchstabe verstecken
await image.hide_text(
xy=cast(_XY, tuple(rnd.choices(range(30, 470), k=2))),
text=letter,
font=ImageFont.truetype(font, 50),
)
return image
async def get_image(
index: int,
letter: str = Depends(get_letter),
images: list[str] = Depends(list_images_auto),
cfg: Config = Depends(get_config),
) -> AdventImage:
"""
Bild für einen Tag erstellen
"""
try:
# Versuche, aus "manual"-Ordner zu laden
return await load_image(f"images_manual/{index}.jpg")
except RuntimeError:
# Erstelle automatisch generiertes Bild
return await get_auto_image(
index=index,
letter=letter,
images=images,
cfg=cfg,
)
async def api_return_image(
img: Image.Image,
) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# Bilddaten in Puffer laden
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
content=img_buffer,
media_type="image/jpeg",
)

View file

@ -0,0 +1,60 @@
import secrets
from datetime import date
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from ..core.config import Config, get_config
security = HTTPBasic()
async def user_is_admin(
credentials: HTTPBasicCredentials = Depends(security),
cfg: Config = Depends(get_config),
) -> bool:
"""
True iff der user "admin" ist
"""
username_correct = secrets.compare_digest(credentials.username, cfg.admin.name)
password_correct = secrets.compare_digest(credentials.password, cfg.admin.password)
return username_correct and password_correct
async def require_admin(
is_admin: bool = Depends(user_is_admin),
) -> None:
"""
HTTP 401 iff der user nicht "admin" ist
"""
if not is_admin:
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
async def user_visible_doors() -> int:
"""
Anzahl der user-sichtbaren Türchen
"""
today = date.today()
if today.month == 12:
return today.day
if today.month in (1, 2, 3):
return 24
return 0
async def user_can_view_door(
day: int,
) -> bool:
"""
True iff das Türchen von Tag `day` user-sichtbar ist
"""
return day < await user_visible_doors()

View file

@ -2,11 +2,12 @@ from datetime import date
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from PIL import Image
from ..config import Config, get_config
from ._image import AdventImage
from ._misc import api_return_image, get_image, shuffle
from .user import user_is_admin
from ..core.config import get_config
from ..core.depends import get_image, get_part, shuffle_solution
from ..core.image_helpers import api_return_image
from ._security import user_can_view_door, user_is_admin, user_visible_doors
router = APIRouter(prefix="/days", tags=["days"])
@ -15,48 +16,47 @@ router = APIRouter(prefix="/days", tags=["days"])
async def startup() -> None:
cfg = await get_config()
print(cfg.puzzle.solution)
print("".join(await shuffle(cfg.puzzle.solution)))
@router.get("/letter/{index}")
async def get_letter(
index: int,
cfg: Config = Depends(get_config),
) -> str:
return (await shuffle(cfg.puzzle.solution))[index]
shuffled_solution = await shuffle_solution(cfg)
print(shuffled_solution)
@router.get("/date")
async def get_date() -> str:
"""
Aktuelles Server-Datum
"""
return date.today().isoformat()
@router.get("/visible_days")
async def get_visible_days() -> int:
today = date.today()
"""
Sichtbare Türchen
"""
if today.month == 12:
return today.day
if today.month in (1, 2, 3):
return 24
return 0
return await user_visible_doors()
async def user_can_view(
index: int,
) -> bool:
return index < await get_visible_days()
@router.get("/part/{day}")
async def get_part_for_day(
part: str = Depends(get_part),
) -> str:
"""
Heutiger Lösungsteil
"""
return part
@router.get(
"/image/{index}",
"/image/{day}",
response_class=StreamingResponse,
)
async def get_image_for_day(
image: AdventImage = Depends(get_image),
can_view: bool = Depends(user_can_view),
image: Image.Image = Depends(get_image),
can_view: bool = Depends(user_can_view_door),
is_admin: bool = Depends(user_is_admin),
) -> StreamingResponse:
"""
@ -66,4 +66,4 @@ async def get_image_for_day(
if not (can_view or is_admin):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Wie unhöflich!!!")
return await api_return_image(image.img)
return await api_return_image(image)

View file

@ -1,16 +1,9 @@
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from PIL import Image
from ..calendar_config import (
CalendarConfig,
DoorsSaved,
get_calendar_config,
set_calendar_config,
)
from ..config import Config, get_config
from ..dav_common import dav_get_file
from ._misc import api_return_image
from ..core.calendar_config import CalendarConfig, DoorsSaved, get_calendar_config
from ..core.config import Config, get_config
from ..core.image_helpers import api_return_image, load_image
router = APIRouter(prefix="/general", tags=["general"])
@ -26,9 +19,7 @@ async def get_image_for_day(
Hintergrundbild laden
"""
return await api_return_image(
Image.open(await dav_get_file(f"files/{cal_cfg.background}"))
)
return await api_return_image(await load_image(f"files/{cal_cfg.background}"))
@router.get("/doors")
@ -56,4 +47,4 @@ async def put_doors(
doors,
key=lambda door: door.day,
)
await set_calendar_config(cfg, cal_cfg)
await cal_cfg.change(cfg)

View file

@ -1,32 +1,8 @@
import secrets
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from ..config import Config, get_config
from ._security import require_admin
router = APIRouter(prefix="/user", tags=["user"])
security = HTTPBasic()
async def user_is_admin(
credentials: HTTPBasicCredentials = Depends(security),
config: Config = Depends(get_config),
) -> bool:
username_correct = secrets.compare_digest(credentials.username, config.admin.name)
password_correct = secrets.compare_digest(
credentials.password, config.admin.password
)
return username_correct and password_correct
async def require_admin(
is_admin: bool = Depends(user_is_admin),
) -> None:
if not is_admin:
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
@router.get("/admin")