import itertools import random import re from datetime import date, datetime, timedelta from io import BytesIO from typing import Any, Self, Sequence, TypeVar from fastapi.responses import StreamingResponse from PIL import Image from .config import get_config from .webdav import WebDAV T = TypeVar("T") class Random(random.Random): @classmethod async def get(cls, bonus_salt: Any = "") -> Self: cfg = await get_config() return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}") def shuffled(self, population: Sequence[T]) -> Sequence[T]: return self.sample(population, k=len(population)) def set_len(seq: Sequence[T], len: int) -> Sequence[T]: # `seq` unendlich wiederholen infinite = itertools.cycle(seq) # Die ersten `length` einträge nehmen return list(itertools.islice(infinite, len)) async def list_images_auto() -> list[str]: """ Finde alle Bilddateien im "automatisch"-Verzeichnis """ return await WebDAV.list_files( directory="/images_auto", regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE), ) async def load_image(file_name: str) -> Image.Image: """ Versuche, Bild aus Datei zu laden """ if not await WebDAV.file_exists(file_name): raise RuntimeError(f"DAV-File {file_name} does not exist!") return Image.open(BytesIO(await WebDAV.read_bytes(file_name))) async def api_return_image(img: Image.Image) -> StreamingResponse: """ Bild mit API zurückgeben """ # JPEG-Daten in Puffer speichern img_buffer = BytesIO() img.save(img_buffer, format="JPEG", quality=85) img_buffer.seek(0) # zurückgeben return StreamingResponse( media_type="image/jpeg", content=img_buffer, ) class EventDates: """ Events in einem Ereigniszeitraum """ __overall_duration: timedelta dates: list[date] @property def first(self) -> date: """Datum des ersten Ereignisses""" return self.dates[0] def get_next(self, *, today: date) -> date | None: """Datum des nächsten Ereignisses""" return next((event for event in self.dates if event > today), None) @property def next(self) -> date | None: """Datum des nächsten Ereignisses""" return self.get_next(today=datetime.today().date()) @property def last(self) -> date: """Datum des letzten Ereignisses""" return self.dates[-1] @property def end(self) -> date: """Letztes Datum des Ereigniszeitraums""" return self.first + self.__overall_duration def __init__( self, *, # current date today: date, # month/day when events begin begin_month: int, begin_day: int, # events: e.g. a 2 means there is an event on the 2nd day # i.e. 1 day after begin # - assume sorted (ascending) events: list[int], # countdown to closing begins after last event closing_after: int, ) -> None: # account for the last event, then add closing period self.__overall_duration = timedelta(days=events[-1] - 1 + closing_after) # the events may begin last year, this year or next year maybe_begin = ( datetime(today.year + year_diff, begin_month, begin_day).date() for year_diff in (-1, 0, +1) ) # find the first begin where the end date is in the future begin = next( begin for begin in maybe_begin if today <= (begin + self.__overall_duration) ) # all event dates self.dates = [begin + timedelta(days=event - 1) for event in events]