import itertools import random import re from datetime import date, datetime, timedelta from io import BytesIO from typing import Any, Self, Sequence, TypeVar from fastapi.responses import StreamingResponse from PIL import Image from pydantic import BaseModel from .config import get_config from .webdav import WebDAV T = TypeVar("T") class Random(random.Random): @classmethod async def get(cls, bonus_salt: Any = "") -> Self: cfg = await get_config() return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}") def shuffled(self, population: Sequence[T]) -> Sequence[T]: return self.sample(population, k=len(population)) def set_len(seq: Sequence[T], len: int) -> Sequence[T]: # `seq` unendlich wiederholen infinite = itertools.cycle(seq) # Die ersten `length` einträge nehmen return list(itertools.islice(infinite, len)) async def list_images_auto() -> list[str]: """ Finde alle Bilddateien im "automatisch"-Verzeichnis """ return await WebDAV.list_files( directory="/images_auto", regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE), ) async def load_image(file_name: str) -> Image.Image: """ Versuche, Bild aus Datei zu laden """ if not await WebDAV.file_exists(file_name): raise RuntimeError(f"DAV-File {file_name} does not exist!") return Image.open(BytesIO(await WebDAV.read_bytes(file_name))) async def api_return_image(img: Image.Image) -> StreamingResponse: """ Bild mit API zurückgeben """ # JPEG-Daten in Puffer speichern img_buffer = BytesIO() img.save(img_buffer, format="JPEG", quality=85) img_buffer.seek(0) # zurückgeben return StreamingResponse( media_type="image/jpeg", content=img_buffer, ) class EventDays(BaseModel): """ Kenndaten eines Ereigniszeitraums: - `first`: Datum des ersten Ereignisses - `next`: Datum des nächsten Ereignisses - `last`: Datum des letzten Ereignisses - `end`: Letztes Datum Ereigniszeitraums """ first: date next: date | None last: date end: date @classmethod def get( cls, *, # current date today: date, # month/day when events begin begin_month: int, begin_day: int, # events: e.g. a 2 means there is an event 2 days after begin # -> assume sorted (asc) events_after: list[int], # countdown to closing begins after last event closing_after: int, ) -> Self: """ Kenndaten des aktuellen (laufenden oder zukünftigen) Ereigniszeitraums bestimmen """ # account for the last event, then add closing period duration = timedelta(days=events_after[-1] + closing_after) # the events may begin last year, this year or next year maybe_begin = ( datetime(today.year + year_diff, begin_month, begin_day).date() for year_diff in (-1, 0, +1) ) # find the first begin where the end date is in the future begin = next(begin for begin in maybe_begin if today <= (begin + duration)) # all event dates events = [begin + timedelta(days=event_after) for event_after in events_after] # return relevant event dates return cls( first=events[0], next=next((event for event in events if event > today), None), last=events[-1], end=begin + duration, )