advent22/api/advent22_api/core/helpers.py

186 lines
5.1 KiB
Python
Raw Normal View History

import itertools
import random
2023-09-08 18:17:18 +00:00
import re
from datetime import date, datetime, timedelta
2023-09-08 18:17:18 +00:00
from io import BytesIO
from typing import Any, Self, Sequence, TypeVar
2023-09-08 18:17:18 +00:00
from fastapi.responses import StreamingResponse
from PIL import Image
from pydantic import BaseModel
2023-09-08 18:17:18 +00:00
from .config import get_config
2023-09-08 18:17:18 +00:00
from .webdav import WebDAV
T = TypeVar("T")
class Random(random.Random):
@classmethod
async def get(cls, bonus_salt: Any = "") -> Self:
cfg = await get_config()
return cls(f"{cfg.puzzle.solution}{cfg.puzzle.random_seed}{bonus_salt}")
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
return self.sample(population, k=len(population))
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
# `seq` unendlich wiederholen
infinite = itertools.cycle(seq)
# Die ersten `length` einträge nehmen
return list(itertools.islice(infinite, len))
2023-09-08 18:17:18 +00:00
async def list_images_auto() -> list[str]:
"""
Finde alle Bilddateien im "automatisch"-Verzeichnis
"""
return await WebDAV.list_files(
directory="/images_auto",
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
)
async def load_image(file_name: str) -> Image.Image:
"""
Versuche, Bild aus Datei zu laden
"""
if not await WebDAV.file_exists(file_name):
raise RuntimeError(f"DAV-File {file_name} does not exist!")
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
2023-09-08 18:17:18 +00:00
async def api_return_image(img: Image.Image) -> StreamingResponse:
"""
Bild mit API zurückgeben
"""
# JPEG-Daten in Puffer speichern
img_buffer = BytesIO()
img.save(img_buffer, format="JPEG", quality=85)
img_buffer.seek(0)
# zurückgeben
return StreamingResponse(
media_type="image/jpeg",
content=img_buffer,
)
2023-09-19 16:49:10 +00:00
class EventDays2:
"""
Events in einem Ereigniszeitraum
"""
__overall_duration: timedelta
__events: dict[int, date]
def first(self) -> date:
"""Datum des ersten Ereignisses"""
return self.__events[min(self.__events.keys())]
def last(self) -> date:
"""Datum des letzten Ereignisses"""
return self.__events[max(self.__events.keys())]
def end(self) -> date:
"""Letztes Datum des Ereigniszeitraums"""
return self.__events[max(self.__events.keys())]
def __init__(
self,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event 2 days after begin
# -> assume sorted (asc)
events_after: list[int],
# countdown to closing begins after last event
closing_after: int,
) -> None:
# account for the last event, then add closing period
self.__overall_duration = timedelta(days=events_after[-1] + closing_after)
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(
begin for begin in maybe_begin if today <= (begin + self.__overall_duration)
)
# all event dates
self.__events = {
event_after: begin + timedelta(days=event_after - 1)
for event_after in events_after
}
class EventDays(BaseModel):
2023-09-15 16:54:29 +00:00
"""
Kenndaten eines Ereigniszeitraums:
- `first`: Datum des ersten Ereignisses
- `next`: Datum des nächsten Ereignisses
- `last`: Datum des letzten Ereignisses
2023-09-18 21:12:32 +00:00
- `end`: Letztes Datum des Ereigniszeitraums
2023-09-15 16:54:29 +00:00
"""
first: date
next: date | None
last: date
end: date
@classmethod
def get(
cls,
*,
# current date
today: date,
# month/day when events begin
begin_month: int,
begin_day: int,
# events: e.g. a 2 means there is an event 2 days after begin
# -> assume sorted (asc)
events_after: list[int],
# countdown to closing begins after last event
closing_after: int,
) -> Self:
"""
2023-09-15 16:54:29 +00:00
Kenndaten des aktuellen (laufenden oder zukünftigen) Ereigniszeitraums bestimmen
"""
# account for the last event, then add closing period
duration = timedelta(days=events_after[-1] + closing_after)
# the events may begin last year, this year or next year
maybe_begin = (
datetime(today.year + year_diff, begin_month, begin_day).date()
for year_diff in (-1, 0, +1)
)
# find the first begin where the end date is in the future
begin = next(begin for begin in maybe_begin if today <= (begin + duration))
# all event dates
events = [begin + timedelta(days=event_after) for event_after in events_after]
# return relevant event dates
return cls(
first=events[0],
next=next((event for event in events if event > today), None),
last=events[-1],
end=begin + duration,
)