advent22/api/advent22_api/core/helpers.py

198 lines
5.3 KiB
Python

import itertools
import random
import re
from datetime import date, datetime, timedelta
from io import BytesIO
from typing import Any, Awaitable, Callable, Iterable, Self, Sequence, TypeVar
from fastapi.responses import StreamingResponse
from PIL import Image
from .config import get_config
from .dav.webdav import WebDAV
T = TypeVar("T")
RE_IMG = re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE)
RE_TTF = re.compile(r"_(\d+)\.ttf$", flags=re.IGNORECASE)
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.solution.clean}{cfg.random_seed}{bonus_salt}")
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
return self.sample(population, k=len(population))
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, len))
def spread(
given: Iterable[int],
n: int,
rnd: Random | None = None,
) -> list[int]:
"""
Zu `given` ganzen Zahlen `n` zusätzliche Zahlen hinzunehmen.
- Die neuen Werte sind im selben Zahlenbereich wie `given`
- Zuerst werden alle Werte "zwischen" den `given` Werten genommen
"""
if n == 0:
return []
if len(set(given)) > 1:
range_given = range(min(given), max(given) + 1)
first_round = set(range_given) - set(given)
elif len(set(given)) == 1:
if (a := next(iter(given))) > 0:
range_given = range(1, a + 1)
else:
range_given = range(1, n + 1)
first_round = set(range_given) - set(given)
else:
range_given = range(1, n + 1)
first_round = range_given
result = sorted(first_round)[: min(n, len(first_round))]
full_rounds = (n - len(result)) // len(range_given)
result += list(range_given) * full_rounds
remain = n - len(result)
if rnd is None:
result += list(range_given)[:remain]
else:
result += rnd.sample(range_given, remain)
rnd.shuffle(result)
return result
def list_helper(
directory: str,
regex: re.Pattern[str],
) -> Callable[[], Awaitable[list[str]]]:
"""
Finde alle Dateien im Verzeichnis `dir`, passend zu `re`
"""
async def _list_helper() -> list[str]:
return [
f"{directory}/{file}"
for file in await WebDAV.list_files(directory=directory, regex=regex)
]
return _list_helper
list_images_auto = list_helper("/images_auto", RE_IMG)
list_images_manual = list_helper("/images_manual", RE_IMG)
list_fonts = list_helper("/files", RE_TTF)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)
class EventDates:
"""
Events in einem Ereigniszeitraum
"""
__overall_duration: timedelta
dates: dict[int, date]
@property
def first(self) -> date:
"""Datum des ersten Ereignisses"""
return self.dates[min(self.dates.keys())]
def get_next(self, *, today: date) -> date | None:
"""Datum des nächsten Ereignisses"""
return next(
(event for event in sorted(self.dates.values()) if event > today), None
)
@property
def next(self) -> date | None:
"""Datum des nächsten Ereignisses"""
return self.get_next(today=datetime.today().date())
@property
def last(self) -> date:
"""Datum des letzten Ereignisses"""
return self.dates[max(self.dates.keys())]
@property
def end(self) -> date:
"""Letztes Datum des Ereigniszeitraums"""
return self.first + self.__overall_duration
def __init__(
self,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event on the 2nd day
# i.e. 1 day after begin
# - assume sorted (ascending)
events: list[int],
# countdown to closing begins after last event
close_after: int,
) -> None:
# account for the last event, then add closing period
self.__overall_duration = timedelta(days=events[-1] - 1 + close_after)
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(
begin for begin in maybe_begin if today <= (begin + self.__overall_duration)
)
# all event dates
self.dates = {event: begin + timedelta(days=event - 1) for event in events}