WIP: core module
This commit is contained in:
parent
30c5788d4d
commit
452040a0ae
3 changed files with 246 additions and 0 deletions
133
api/advent22_api/core/advent_image.py
Normal file
133
api/advent22_api/core/advent_image.py
Normal file
|
@ -0,0 +1,133 @@
|
|||
import colorsys
|
||||
from dataclasses import dataclass
|
||||
from typing import Self
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class AdventImage:
|
||||
img: Image.Image
|
||||
|
||||
@classmethod
|
||||
async def from_img(cls, img: Image.Image) -> Self:
|
||||
"""
|
||||
Einen quadratischen Ausschnitt aus der Mitte des Bilds nehmen
|
||||
"""
|
||||
|
||||
# Farbmodell festlegen
|
||||
img = img.convert(mode="RGB")
|
||||
|
||||
# Größen bestimmen
|
||||
width, height = img.size
|
||||
square = min(width, height)
|
||||
|
||||
# zuschneiden
|
||||
img = img.crop(
|
||||
box=(
|
||||
int((width - square) / 2),
|
||||
int((height - square) / 2),
|
||||
int((width + square) / 2),
|
||||
int((height + square) / 2),
|
||||
)
|
||||
)
|
||||
|
||||
# skalieren
|
||||
return cls(
|
||||
img.resize(
|
||||
size=(500, 500),
|
||||
resample=Image.LANCZOS,
|
||||
)
|
||||
)
|
||||
|
||||
async def get_text_box(
|
||||
self,
|
||||
xy: tuple[float, float],
|
||||
text: str | bytes,
|
||||
font: "ImageFont._Font",
|
||||
anchor: str | None = "mm",
|
||||
**text_kwargs,
|
||||
) -> Image._Box | None:
|
||||
"""
|
||||
Koordinaten (links, oben, rechts, unten) des betroffenen
|
||||
Rechtecks bestimmen, wenn das Bild mit einem Text
|
||||
versehen wird
|
||||
"""
|
||||
|
||||
# Neues 1-Bit Bild, gleiche Größe
|
||||
mask = Image.new(mode="1", size=self.img.size, color=0)
|
||||
|
||||
# Text auf Maske auftragen
|
||||
ImageDraw.Draw(mask).text(
|
||||
xy=xy,
|
||||
text=text,
|
||||
font=font,
|
||||
anchor=anchor,
|
||||
fill=1,
|
||||
**text_kwargs,
|
||||
)
|
||||
|
||||
# betroffenen Pixelbereich bestimmen
|
||||
return mask.getbbox()
|
||||
|
||||
async def get_average_color(
|
||||
self,
|
||||
box: Image._Box,
|
||||
) -> tuple[int, int, int]:
|
||||
"""
|
||||
Durchschnittsfarbe eines rechteckigen Ausschnitts in
|
||||
einem Bild berechnen
|
||||
"""
|
||||
|
||||
pixel_data = self.img.crop(box).getdata()
|
||||
mean_color: np.ndarray = np.mean(a=pixel_data, axis=0)
|
||||
|
||||
return tuple(mean_color.astype(int)[:3])
|
||||
|
||||
async def hide_text(
|
||||
self,
|
||||
xy: tuple[float, float],
|
||||
text: str | bytes,
|
||||
font: "ImageFont._Font",
|
||||
anchor: str | None = "mm",
|
||||
**text_kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Text `text` in Bild an Position `xy` verstecken.
|
||||
Weitere Parameter wie bei `ImageDraw.text()`.
|
||||
"""
|
||||
|
||||
# betroffenen Bildbereich bestimmen
|
||||
text_box = await self.get_text_box(
|
||||
xy=xy, text=text, font=font, anchor=anchor, **text_kwargs
|
||||
)
|
||||
|
||||
if text_box is not None:
|
||||
# Durchschnittsfarbe bestimmen
|
||||
text_color = await self.get_average_color(
|
||||
box=text_box,
|
||||
)
|
||||
|
||||
# etwas heller/dunkler machen
|
||||
tc_h, tc_s, tc_v = colorsys.rgb_to_hsv(*text_color)
|
||||
tc_v = int((tc_v - 127) * 0.97) + 127
|
||||
|
||||
if tc_v < 127:
|
||||
tc_v += 3
|
||||
|
||||
else:
|
||||
tc_v -= 3
|
||||
|
||||
text_color = colorsys.hsv_to_rgb(tc_h, tc_s, tc_v)
|
||||
text_color = tuple(int(val) for val in text_color)
|
||||
|
||||
# Buchstaben verstecken
|
||||
ImageDraw.Draw(self.img).text(
|
||||
xy=xy,
|
||||
text=text,
|
||||
font=font,
|
||||
fill=text_color,
|
||||
anchor=anchor,
|
||||
**text_kwargs,
|
||||
)
|
19
api/advent22_api/core/random.py
Normal file
19
api/advent22_api/core/random.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
import random
|
||||
from typing import Any, Self, Sequence
|
||||
|
||||
from ..config import get_config
|
||||
|
||||
|
||||
class Random(random.Random):
|
||||
@classmethod
|
||||
async def get(cls, bonus_salt: Any = "") -> Self:
|
||||
cfg = await get_config()
|
||||
return cls(f"{cfg.puzzle.solution}{bonus_salt}{cfg.puzzle.random_pepper}")
|
||||
|
||||
|
||||
async def shuffle(seq: Sequence, rnd: random.Random | None = None) -> list:
|
||||
# Zufallsgenerator
|
||||
rnd = rnd or await Random.get()
|
||||
|
||||
# Elemente mischen
|
||||
return rnd.sample(seq, len(seq))
|
94
api/advent22_api/core/webdav.py
Normal file
94
api/advent22_api/core/webdav.py
Normal file
|
@ -0,0 +1,94 @@
|
|||
import re
|
||||
from contextlib import contextmanager
|
||||
from io import BytesIO, TextIOWrapper
|
||||
from typing import ContextManager, Iterator
|
||||
|
||||
from cache import AsyncTTL
|
||||
from webdav3.client import Client as WebDAVclient
|
||||
|
||||
from ..settings import SETTINGS
|
||||
|
||||
|
||||
class WebDAV:
|
||||
_webdav_client = WebDAVclient(
|
||||
{
|
||||
"webdav_hostname": SETTINGS.webdav.url,
|
||||
"webdav_login": SETTINGS.webdav.username,
|
||||
"webdav_password": SETTINGS.webdav.password,
|
||||
"disable_check": SETTINGS.webdav.disable_check,
|
||||
}
|
||||
)
|
||||
|
||||
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
||||
@classmethod
|
||||
async def list_files(
|
||||
cls,
|
||||
*,
|
||||
directory: str = "",
|
||||
regex: re.Pattern[str] = re.compile(""),
|
||||
) -> list[str]:
|
||||
"""
|
||||
Liste aller Dateien im Ordner `directory`, die zur RegEx `regex` passen
|
||||
"""
|
||||
|
||||
ls = cls._webdav_client.list(directory)
|
||||
|
||||
return [f"{directory}/{path}" for path in ls if regex.search(path)]
|
||||
|
||||
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
||||
@classmethod
|
||||
async def file_exists(cls, path: str) -> bool:
|
||||
"""
|
||||
`True`, wenn an Pfad `path` eine Datei existiert
|
||||
"""
|
||||
|
||||
return cls._webdav_client.check(path)
|
||||
|
||||
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
||||
@classmethod
|
||||
async def read_buffer(cls, path: str) -> ContextManager[BytesIO]:
|
||||
"""
|
||||
Datei aus Pfad `path` in einen `BytesIO` Puffer laden
|
||||
"""
|
||||
|
||||
buffer = BytesIO()
|
||||
cls._webdav_client.resource(path).write_to(buffer)
|
||||
|
||||
@contextmanager
|
||||
def ctx() -> Iterator[BytesIO]:
|
||||
buffer.seek(0)
|
||||
yield buffer
|
||||
|
||||
return ctx()
|
||||
|
||||
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
||||
@classmethod
|
||||
async def read_text(cls, path: str, encoding="utf-8") -> str:
|
||||
"""
|
||||
Datei aus Pfad `path` als string zurückgeben
|
||||
"""
|
||||
|
||||
with await cls.read_buffer(path) as buffer:
|
||||
tio = TextIOWrapper(buffer, encoding=encoding)
|
||||
tio.seek(0)
|
||||
|
||||
return tio.read().strip()
|
||||
|
||||
@classmethod
|
||||
async def write_buffer(cls, path: str, buffer: BytesIO) -> None:
|
||||
"""
|
||||
Puffer `buffer` in Datei in Pfad `path` schreiben
|
||||
"""
|
||||
|
||||
cls._webdav_client.resource(path).read_from(buffer)
|
||||
|
||||
@classmethod
|
||||
async def write_text(cls, path: str, content: str, encoding="utf-8") -> None:
|
||||
"""
|
||||
String `content` in Datei in Pfad `path` schreiben
|
||||
"""
|
||||
|
||||
buffer = BytesIO(content.encode(encoding=encoding))
|
||||
buffer.seek(0)
|
||||
|
||||
await cls.write_buffer(path, buffer)
|
Loading…
Reference in a new issue