Compare commits

..

1 Commits

Author SHA1 Message Date
I-Al-Istannen
4eab927899 Save stacktraces in context manager and print on error 2025-12-09 17:01:24 +01:00

View File

@@ -2,6 +2,7 @@ import asyncio
import getpass
import sys
import threading
import traceback
from abc import ABC, abstractmethod
from collections.abc import Callable
from contextlib import AsyncExitStack
@@ -114,13 +115,23 @@ class ReusableAsyncContextManager(ABC, Generic[T]):
def __init__(self) -> None:
self._active = False
self._stack = AsyncExitStack()
self._create_stacktrace = traceback.format_stack()
self._enter_stacktraces = []
@abstractmethod
async def _on_aenter(self) -> T:
pass
async def __aenter__(self) -> T:
self._enter_stacktraces.append(traceback.format_stack())
if self._active:
print("Context manager was already active. Created at:")
print("".join(self._create_stacktrace))
print("\n== Previous __aenter__ calls")
for i, stacktrace in enumerate(self._enter_stacktraces, start=1):
print(f"\n-- __aenter__ call #{i} at:")
print("".join(stacktrace))
raise RuntimeError("Nested or otherwise concurrent usage is not allowed")
self._active = True
@@ -144,6 +155,8 @@ class ReusableAsyncContextManager(ABC, Generic[T]):
if not self._active:
raise RuntimeError("__aexit__ called too many times")
self._enter_stacktraces.pop()
result = await self._stack.__aexit__(exc_type, exc_value, traceback)
self._active = False
return result