advent22/api/advent22_api/core/webdav.py
Jörn-Michael Miehe af00dafb6c router integration: stuck
apparently, a @staticmethod that Depends on another @staticmethod in the same class is bad
2023-09-08 19:08:13 +00:00

81 lines
2.2 KiB
Python

import re
from io import BytesIO
from cache import AsyncTTL
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
class WebDAV:
_webdav_client = WebDAVclient(
{
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
}
)
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def list_files(
cls,
directory: str = "",
*,
regex: re.Pattern[str] = re.compile(""),
) -> list[str]:
"""
Liste aller Dateien im Ordner `directory`, die zur RegEx `regex` passen
"""
ls = cls._webdav_client.list(directory)
return [f"{directory}/{path}" for path in ls if regex.search(path)]
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def file_exists(cls, path: str) -> bool:
"""
`True`, wenn an Pfad `path` eine Datei existiert
"""
return cls._webdav_client.check(path)
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def read_bytes(cls, path: str) -> bytes:
"""
Datei aus Pfad `path` als bytes laden
"""
buffer = BytesIO()
cls._webdav_client.resource(path).write_to(buffer)
buffer.seek(0)
return buffer.read()
@classmethod
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def read_str(cls, path: str, encoding="utf-8") -> str:
"""
Datei aus Pfad `path` als string laden
"""
return (await cls.read_bytes(path)).decode(encoding=encoding).strip()
@classmethod
async def write_bytes(cls, path: str, buffer: bytes) -> None:
"""
Bytes `buffer` in Datei in Pfad `path` schreiben
"""
cls._webdav_client.resource(path).read_from(buffer)
@classmethod
async def write_str(cls, path: str, content: str, encoding="utf-8") -> None:
"""
String `content` in Datei in Pfad `path` schreiben
"""
await cls.write_bytes(path, content.encode(encoding=encoding))