advent22/api/advent22_api/core/helpers.py

217 lines
5.7 KiB
Python
Raw Permalink Normal View History

import itertools
import random
2023-09-08 18:17:18 +00:00
import re
from datetime import date, datetime, timedelta
2023-09-08 18:17:18 +00:00
from io import BytesIO
from typing import Any, Awaitable, Callable, Iterable, Self, Sequence, TypeVar
2023-09-08 18:17:18 +00:00
from fastapi.responses import StreamingResponse
from PIL import Image
from .config import get_config
2023-10-29 16:08:16 +00:00
from .dav.webdav import WebDAV
2023-09-08 18:17:18 +00:00
T = TypeVar("T")
RE_IMG = re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE)
2023-11-01 00:30:33 +00:00
RE_TTF = re.compile(r"_(\d+)\.ttf$", flags=re.IGNORECASE)
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
2023-10-31 19:18:18 +00:00
return cls(f"{cfg.solution.clean}{cfg.random_seed}{bonus_salt}")
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
return self.sample(population, k=len(population))
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, len))
2023-09-08 18:17:18 +00:00
2023-09-22 23:45:27 +00:00
def spread(
given: Iterable[int],
n: int,
rnd: Random | None = None,
) -> list[int]:
"""
Zu `given` ganzen Zahlen `n` zusätzliche Zahlen hinzunehmen.
- Die neuen Werte sind im selben Zahlenbereich wie `given`
- Zuerst werden alle Werte "zwischen" den `given` Werten genommen
"""
if n == 0:
return []
if len(set(given)) > 1:
range_given = range(min(given), max(given) + 1)
first_round = set(range_given) - set(given)
elif len(set(given)) == 1:
if (a := next(iter(given))) > 0:
range_given = range(1, a + 1)
else:
range_given = range(1, n + 1)
first_round = set(range_given) - set(given)
else:
range_given = range(1, n + 1)
first_round = range_given
result = sorted(first_round)[: min(n, len(first_round))]
full_rounds = (n - len(result)) // len(range_given)
result += list(range_given) * full_rounds
remain = n - len(result)
if rnd is None:
result += list(range_given)[:remain]
else:
result += rnd.sample(range_given, remain)
rnd.shuffle(result)
return result
def list_helper(
directory: str,
regex: re.Pattern[str],
) -> Callable[[], Awaitable[list[str]]]:
2023-09-08 18:17:18 +00:00
"""
Finde alle Dateien im Verzeichnis `dir`, passend zu `re`
2023-09-08 18:17:18 +00:00
"""
async def _list_helper() -> list[str]:
return [
f"{directory}/{file}"
for file in await WebDAV.list_files(directory=directory, regex=regex)
]
2023-10-27 21:12:28 +00:00
return _list_helper
list_images_auto = list_helper("/images_auto", RE_IMG)
list_images_manual = list_helper("/images_manual", RE_IMG)
list_fonts = list_helper("/files", RE_TTF)
2023-09-08 18:17:18 +00:00
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
2023-10-27 21:12:28 +00:00
if not await WebDAV.exists(file_name):
2023-09-08 18:17:18 +00:00
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
2023-09-08 18:17:18 +00:00
2023-11-02 12:48:52 +00:00
async def api_return_ico(img: Image.Image) -> StreamingResponse:
2023-09-08 18:17:18 +00:00
"""
2023-11-02 12:48:52 +00:00
ICO-Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.resize(size=(256, 256), resample=Image.LANCZOS)
img.save(img_buffer, format="ICO")
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/x-icon",
content=img_buffer,
)
async def api_return_jpeg(img: Image.Image) -> StreamingResponse:
"""
JPEG-Bild mit API zurückgeben
2023-09-08 18:17:18 +00:00
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)
class EventDates:
2023-09-19 16:49:10 +00:00
"""
Events in einem Ereigniszeitraum
"""
__overall_duration: timedelta
2023-09-21 11:26:02 +00:00
dates: dict[int, date]
2023-09-19 16:49:10 +00:00
@property
2023-09-19 16:49:10 +00:00
def first(self) -> date:
"""Datum des ersten Ereignisses"""
2023-09-21 11:26:02 +00:00
return self.dates[min(self.dates.keys())]
2023-09-19 16:49:10 +00:00
def get_next(self, *, today: date) -> date | None:
"""Datum des nächsten Ereignisses"""
2023-09-21 11:26:02 +00:00
return next(
(event for event in sorted(self.dates.values()) if event > today), None
)
@property
def next(self) -> date | None:
"""Datum des nächsten Ereignisses"""
2023-11-22 18:34:38 +00:00
return self.get_next(today=date.today())
@property
2023-09-19 16:49:10 +00:00
def last(self) -> date:
"""Datum des letzten Ereignisses"""
2023-09-21 11:26:02 +00:00
return self.dates[max(self.dates.keys())]
2023-09-19 16:49:10 +00:00
@property
2023-09-19 16:49:10 +00:00
def end(self) -> date:
"""Letztes Datum des Ereigniszeitraums"""
return self.first + self.__overall_duration
2023-09-19 16:49:10 +00:00
def __init__(
self,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event on the 2nd day
# i.e. 1 day after begin
# - assume sorted (ascending)
events: list[int],
2023-09-19 16:49:10 +00:00
# countdown to closing begins after last event
2023-09-20 16:14:58 +00:00
close_after: int,
2023-09-19 16:49:10 +00:00
) -> None:
# account for the last event, then add closing period
2023-09-20 16:14:58 +00:00
self.__overall_duration = timedelta(days=events[-1] - 1 + close_after)
2023-09-19 16:49:10 +00:00
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(
begin for begin in maybe_begin if today <= (begin + self.__overall_duration)
)
# all event dates
2023-09-21 11:26:02 +00:00
self.dates = {event: begin + timedelta(days=event - 1) for event in events}