190 lines
5 KiB
Python
190 lines
5 KiB
Python
import itertools
|
|
import random
|
|
import re
|
|
from datetime import date, datetime, timedelta
|
|
from io import BytesIO
|
|
from typing import Any, Iterable, Self, Sequence, TypeVar
|
|
|
|
from fastapi.responses import StreamingResponse
|
|
from PIL import Image
|
|
|
|
from .config import get_config
|
|
from .dav.webdav import WebDAV
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class Random(random.Random):
|
|
@classmethod
|
|
async def get(cls, bonus_salt: Any = "") -> Self:
|
|
cfg = await get_config()
|
|
return cls(f"{cfg.solution.clean}{cfg.random_seed}{bonus_salt}")
|
|
|
|
def shuffled(self, population: Sequence[T]) -> Sequence[T]:
|
|
return self.sample(population, k=len(population))
|
|
|
|
|
|
def set_len(seq: Sequence[T], len: int) -> Sequence[T]:
|
|
# `seq` unendlich wiederholen
|
|
infinite = itertools.cycle(seq)
|
|
|
|
# Die ersten `length` einträge nehmen
|
|
return list(itertools.islice(infinite, len))
|
|
|
|
|
|
def spread(
|
|
given: Iterable[int],
|
|
n: int,
|
|
rnd: Random | None = None,
|
|
) -> list[int]:
|
|
"""
|
|
Zu `given` ganzen Zahlen `n` zusätzliche Zahlen hinzunehmen.
|
|
|
|
- Die neuen Werte sind im selben Zahlenbereich wie `given`
|
|
- Zuerst werden alle Werte "zwischen" den `given` Werten genommen
|
|
"""
|
|
|
|
if n == 0:
|
|
return []
|
|
|
|
if len(set(given)) > 1:
|
|
range_given = range(min(given), max(given) + 1)
|
|
first_round = set(range_given) - set(given)
|
|
|
|
elif len(set(given)) == 1:
|
|
if (a := next(iter(given))) > 0:
|
|
range_given = range(1, a + 1)
|
|
else:
|
|
range_given = range(1, n + 1)
|
|
|
|
first_round = set(range_given) - set(given)
|
|
|
|
else:
|
|
range_given = range(1, n + 1)
|
|
first_round = range_given
|
|
|
|
result = sorted(first_round)[: min(n, len(first_round))]
|
|
|
|
full_rounds = (n - len(result)) // len(range_given)
|
|
result += list(range_given) * full_rounds
|
|
|
|
remain = n - len(result)
|
|
if rnd is None:
|
|
result += list(range_given)[:remain]
|
|
|
|
else:
|
|
result += rnd.sample(range_given, remain)
|
|
rnd.shuffle(result)
|
|
|
|
return result
|
|
|
|
|
|
async def list_images_auto() -> list[str]:
|
|
"""
|
|
Finde alle Bilddateien im "automatisch"-Verzeichnis
|
|
"""
|
|
|
|
__DIR = "/images_auto"
|
|
|
|
return [
|
|
f"{__DIR}/{file}"
|
|
for file in await WebDAV.list_files(
|
|
directory=__DIR,
|
|
regex=re.compile(r"\.(gif|jpe?g|tiff?|png|bmp)$", flags=re.IGNORECASE),
|
|
)
|
|
]
|
|
|
|
|
|
async def load_image(file_name: str) -> Image.Image:
|
|
"""
|
|
Versuche, Bild aus Datei zu laden
|
|
"""
|
|
|
|
if not await WebDAV.exists(file_name):
|
|
raise RuntimeError(f"DAV-File {file_name} does not exist!")
|
|
|
|
return Image.open(BytesIO(await WebDAV.read_bytes(file_name)))
|
|
|
|
|
|
async def api_return_image(img: Image.Image) -> StreamingResponse:
|
|
"""
|
|
Bild mit API zurückgeben
|
|
"""
|
|
|
|
# JPEG-Daten in Puffer speichern
|
|
img_buffer = BytesIO()
|
|
img.save(img_buffer, format="JPEG", quality=85)
|
|
img_buffer.seek(0)
|
|
|
|
# zurückgeben
|
|
return StreamingResponse(
|
|
media_type="image/jpeg",
|
|
content=img_buffer,
|
|
)
|
|
|
|
|
|
class EventDates:
|
|
"""
|
|
Events in einem Ereigniszeitraum
|
|
"""
|
|
|
|
__overall_duration: timedelta
|
|
dates: dict[int, date]
|
|
|
|
@property
|
|
def first(self) -> date:
|
|
"""Datum des ersten Ereignisses"""
|
|
return self.dates[min(self.dates.keys())]
|
|
|
|
def get_next(self, *, today: date) -> date | None:
|
|
"""Datum des nächsten Ereignisses"""
|
|
return next(
|
|
(event for event in sorted(self.dates.values()) if event > today), None
|
|
)
|
|
|
|
@property
|
|
def next(self) -> date | None:
|
|
"""Datum des nächsten Ereignisses"""
|
|
return self.get_next(today=datetime.today().date())
|
|
|
|
@property
|
|
def last(self) -> date:
|
|
"""Datum des letzten Ereignisses"""
|
|
return self.dates[max(self.dates.keys())]
|
|
|
|
@property
|
|
def end(self) -> date:
|
|
"""Letztes Datum des Ereigniszeitraums"""
|
|
return self.first + self.__overall_duration
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
# current date
|
|
today: date,
|
|
# month/day when events begin
|
|
begin_month: int,
|
|
begin_day: int,
|
|
# events: e.g. a 2 means there is an event on the 2nd day
|
|
# i.e. 1 day after begin
|
|
# - assume sorted (ascending)
|
|
events: list[int],
|
|
# countdown to closing begins after last event
|
|
close_after: int,
|
|
) -> None:
|
|
# account for the last event, then add closing period
|
|
self.__overall_duration = timedelta(days=events[-1] - 1 + close_after)
|
|
|
|
# the events may begin last year, this year or next year
|
|
maybe_begin = (
|
|
datetime(today.year + year_diff, begin_month, begin_day).date()
|
|
for year_diff in (-1, 0, +1)
|
|
)
|
|
|
|
# find the first begin where the end date is in the future
|
|
begin = next(
|
|
begin for begin in maybe_begin if today <= (begin + self.__overall_duration)
|
|
)
|
|
|
|
# all event dates
|
|
self.dates = {event: begin + timedelta(days=event - 1) for event in events}
|