52 lines
1.2 KiB
Python
52 lines
1.2 KiB
Python
import asyncio
|
|
import functools
|
|
from contextlib import asynccontextmanager, contextmanager
|
|
from io import BytesIO
|
|
from typing import AsyncContextManager, AsyncIterator, ContextManager, Iterator
|
|
|
|
|
|
def get_buf_sync() -> ContextManager[BytesIO]:
|
|
@functools.lru_cache
|
|
def inner() -> BytesIO:
|
|
with open(__file__, "rb") as file:
|
|
return BytesIO(file.readline())
|
|
|
|
@contextmanager
|
|
def ctx() -> Iterator[BytesIO]:
|
|
buf = inner()
|
|
buf.seek(0)
|
|
yield buf
|
|
|
|
return ctx()
|
|
|
|
|
|
def main_sync() -> None:
|
|
for _ in range(2):
|
|
with get_buf_sync() as buffer:
|
|
print(buffer.read())
|
|
|
|
|
|
async def get_buf_async() -> AsyncContextManager[BytesIO]:
|
|
@functools.lru_cache
|
|
async def inner() -> BytesIO:
|
|
with open(__file__, "rb") as file:
|
|
return BytesIO(file.readline())
|
|
|
|
@asynccontextmanager
|
|
async def ctx() -> AsyncIterator[BytesIO]:
|
|
buf = await inner()
|
|
buf.seek(0)
|
|
yield buf
|
|
|
|
return ctx()
|
|
|
|
|
|
async def main_async() -> None:
|
|
for _ in range(2):
|
|
async with await get_buf_async() as buffer:
|
|
print(buffer.read())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_sync()
|
|
asyncio.run(main_async())
|