advent22/api/advent22_api/core/helpers.py

131 lines
3.5 KiB
Python
Raw Normal View History

import itertools
import random
2023-09-08 18:17:18 +00:00
import re
from datetime import date, datetime, timedelta
2023-09-08 18:17:18 +00:00
from io import BytesIO
from typing import Any, Self, Sequence, TypeVar
2023-09-08 18:17:18 +00:00
from fastapi.responses import StreamingResponse
from PIL import Image
from pydantic import BaseModel
2023-09-08 18:17:18 +00:00
from .config import get_config
2023-09-08 18:17:18 +00:00
from .webdav import WebDAV
T = TypeVar("T")
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}")
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
return self.sample(population, k=len(population))
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, len))
2023-09-08 18:17:18 +00:00
async def list_images_auto() -> list[str]:
"""
Finde alle Bilddateien im "automatisch"-Verzeichnis
"""
return await WebDAV.list_files(
directory="/images_auto",
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
2023-09-08 18:17:18 +00:00
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)
class EventDays(BaseModel):
2023-09-15 16:54:29 +00:00
"""
Kenndaten eines Ereigniszeitraums:
- `first`: Datum des ersten Ereignisses
- `next`: Datum des nächsten Ereignisses
- `last`: Datum des letzten Ereignisses
- `end`: Letztes Datum Ereigniszeitraums
"""
first: date
next: date | None
last: date
end: date
@classmethod
def get(
cls,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event 2 days after begin
# -> assume sorted (asc)
events_after: list[int],
# countdown to closing begins after last event
closing_after: int,
) -> Self:
"""
2023-09-15 16:54:29 +00:00
Kenndaten des aktuellen (laufenden oder zukünftigen) Ereigniszeitraums bestimmen
"""
# account for the last event, then add closing period
duration = timedelta(days=events_after[-1] + closing_after)
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(begin for begin in maybe_begin if today <= (begin + duration))
# all event dates
events = [begin + timedelta(days=event_after) for event_after in events_after]
# return relevant event dates
return cls(
first=events[0],
next=next((event for event in events if event > today), None),
last=events[-1],
end=begin + duration,
)