advent22/api/advent22_api/core/helpers.py

130 lines
3.5 KiB
Python

import itertools
import random
import re
from datetime import date, datetime, timedelta
from io import BytesIO
from typing import Any, Self, Sequence, TypeVar
from fastapi.responses import StreamingResponse
from PIL import Image
from pydantic import BaseModel
from .config import get_config
from .webdav import WebDAV
T = TypeVar("T")
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}")
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
return self.sample(population, k=len(population))
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, len))
async def list_images_auto() -> list[str]:
"""
Finde alle Bilddateien im "automatisch"-Verzeichnis
"""
return await WebDAV.list_files(
directory="/images_auto",
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)
class EventDays(BaseModel):
"""
Kenndaten eines Ereigniszeitraums:
- `first`: Datum des ersten Ereignisses
- `next`: Datum des nächsten Ereignisses
- `last`: Datum des letzten Ereignisses
- `end`: Letztes Datum des Ereigniszeitraums
"""
first: date
next: date | None
last: date
end: date
@classmethod
def get(
cls,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event 2 days after begin
# -> assume sorted (asc)
events_after: list[int],
# countdown to closing begins after last event
closing_after: int,
) -> Self:
"""
Kenndaten des aktuellen (laufenden oder zukünftigen) Ereigniszeitraums bestimmen
"""
# account for the last event, then add closing period
duration = timedelta(days=events_after[-1] + closing_after)
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(begin for begin in maybe_begin if today <= (begin + duration))
# all event dates
events = [begin + timedelta(days=event_after) for event_after in events_after]
# return relevant event dates
return cls(
first=events[0],
next=next((event for event in events if event > today), None),
last=events[-1],
end=begin + duration,
)