advent22/api/advent22_api/dav_common.py

45 lines
1.2 KiB
Python
Raw Normal View History

2022-10-14 21:42:05 +00:00
import re
2022-10-14 23:29:05 +00:00
from io import BytesIO, TextIOWrapper
2022-10-14 21:42:05 +00:00
2022-10-27 23:32:45 +00:00
from cache import AsyncTTL
2022-10-14 21:42:05 +00:00
from webdav3.client import Client as WebDAVclient
from .settings import SETTINGS
2023-09-03 16:44:18 +00:00
_WEBDAV_CLIENT = WebDAVclient(
{
"webdav_hostname": SETTINGS.webdav.url,
"webdav_login": SETTINGS.webdav.username,
"webdav_password": SETTINGS.webdav.password,
"disable_check": SETTINGS.webdav.disable_check,
}
)
2022-10-14 21:42:05 +00:00
2022-10-27 23:32:45 +00:00
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
2022-10-14 22:15:20 +00:00
async def dav_list_files(regex: re.Pattern, directory: str = "") -> list[str]:
ls = _WEBDAV_CLIENT.list(directory)
2023-09-03 16:44:18 +00:00
return [f"{directory}/{path}" for path in ls if regex.search(path)]
2022-10-14 21:42:05 +00:00
2022-11-04 20:00:43 +00:00
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_file_exists(path: str) -> bool:
return _WEBDAV_CLIENT.check(path)
2022-10-27 23:32:45 +00:00
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
2022-10-14 22:15:20 +00:00
async def dav_get_file(path: str) -> BytesIO:
2022-10-14 21:42:05 +00:00
resource = _WEBDAV_CLIENT.resource(path)
buffer = BytesIO()
resource.write_to(buffer)
return buffer
2022-10-14 23:29:05 +00:00
2022-10-27 23:32:45 +00:00
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
async def dav_get_textfile_content(path: str, encoding="utf-8") -> str:
2022-10-14 23:29:05 +00:00
buffer = await dav_get_file(path)
2022-10-27 23:32:45 +00:00
tio = TextIOWrapper(buffer, encoding=encoding)
tio.seek(0)
return tio.read().strip()