136 lines
3.6 KiB
Python
136 lines
3.6 KiB
Python
import itertools
|
|
import random
|
|
import re
|
|
from datetime import date, datetime, timedelta
|
|
from io import BytesIO
|
|
from typing import Any, Self, Sequence, TypeVar
|
|
|
|
from fastapi.responses import StreamingResponse
|
|
from PIL import Image
|
|
|
|
from .config import get_config
|
|
from .webdav import WebDAV
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class Random(random.Random):
|
|
@classmethod
|
|
async def get(cls, bonus_salt: Any = "") -> Self:
|
|
cfg = await get_config()
|
|
return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}")
|
|
|
|
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
|
|
return self.sample(population, k=len(population))
|
|
|
|
|
|
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
|
|
# `seq` unendlich wiederholen
|
|
infinite = itertools.cycle(seq)
|
|
|
|
# Die ersten `length` einträge nehmen
|
|
return list(itertools.islice(infinite, len))
|
|
|
|
|
|
async def list_images_auto() -> list[str]:
|
|
"""
|
|
Finde alle Bilddateien im "automatisch"-Verzeichnis
|
|
"""
|
|
|
|
return await WebDAV.list_files(
|
|
directory="/images_auto",
|
|
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
|
|
)
|
|
|
|
|
|
async def load_image(file_name: str) -> Image.Image:
|
|
"""
|
|
Versuche, Bild aus Datei zu laden
|
|
"""
|
|
|
|
if not await WebDAV.file_exists(file_name):
|
|
raise RuntimeError(f"DAV-File {file_name} does not exist!")
|
|
|
|
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
|
|
|
|
|
|
async def api_return_image(img: Image.Image) -> StreamingResponse:
|
|
"""
|
|
Bild mit API zurückgeben
|
|
"""
|
|
|
|
# JPEG-Daten in Puffer speichern
|
|
img_buffer = BytesIO()
|
|
img.save(img_buffer, format="JPEG", quality=85)
|
|
img_buffer.seek(0)
|
|
|
|
# zurückgeben
|
|
return StreamingResponse(
|
|
media_type="image/jpeg",
|
|
content=img_buffer,
|
|
)
|
|
|
|
|
|
class EventDates:
|
|
"""
|
|
Events in einem Ereigniszeitraum
|
|
"""
|
|
|
|
__overall_duration: timedelta
|
|
dates: list[date]
|
|
|
|
@property
|
|
def first(self) -> date:
|
|
"""Datum des ersten Ereignisses"""
|
|
return self.dates[0]
|
|
|
|
def get_next(self, *, today: date) -> date | None:
|
|
"""Datum des nächsten Ereignisses"""
|
|
return next((event for event in self.dates if event > today), None)
|
|
|
|
@property
|
|
def next(self) -> date | None:
|
|
"""Datum des nächsten Ereignisses"""
|
|
return self.get_next(today=datetime.today().date())
|
|
|
|
@property
|
|
def last(self) -> date:
|
|
"""Datum des letzten Ereignisses"""
|
|
return self.dates[-1]
|
|
|
|
@property
|
|
def end(self) -> date:
|
|
"""Letztes Datum des Ereigniszeitraums"""
|
|
return self.first + self.__overall_duration
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
# current date
|
|
today: date,
|
|
# month/day when events begin
|
|
begin_month: int,
|
|
begin_day: int,
|
|
# events: e.g. a 2 means there is an event on the 2nd day
|
|
# i.e. 1 day after begin
|
|
# - assume sorted (ascending)
|
|
events: list[int],
|
|
# countdown to closing begins after last event
|
|
close_after: int,
|
|
) -> None:
|
|
# account for the last event, then add closing period
|
|
self.__overall_duration = timedelta(days=events[-1] - 1 + close_after)
|
|
|
|
# the events may begin last year, this year or next year
|
|
maybe_begin = (
|
|
datetime(today.year + year_diff, begin_month, begin_day).date()
|
|
for year_diff in (-1, 0, +1)
|
|
)
|
|
|
|
# find the first begin where the end date is in the future
|
|
begin = next(
|
|
begin for begin in maybe_begin if today <= (begin + self.__overall_duration)
|
|
)
|
|
|
|
# all event dates
|
|
self.dates = [begin + timedelta(days=event - 1) for event in events]
|