Browse Source

use Optional type hint to be compatible with python < 3.10

add limiter option to contextmanager_in_threadpool()
pull/11895/head
Alexander 'Leo' Bergolth 1 year ago
parent
commit
100b0f9507
  1. 16
      fastapi/concurrency.py

16
fastapi/concurrency.py

@ -1,5 +1,5 @@
from contextlib import asynccontextmanager as asynccontextmanager from contextlib import asynccontextmanager as asynccontextmanager
from typing import AsyncGenerator, ContextManager, TypeVar from typing import AsyncGenerator, ContextManager, TypeVar, Optional
import functools import functools
import sys import sys
import typing import typing
@ -21,7 +21,7 @@ _T = TypeVar("_T")
async def run_in_threadpool( async def run_in_threadpool(
func: typing.Callable[_P, _T], *args: _P.args, func: typing.Callable[_P, _T], *args: _P.args,
_limiter: anyio.CapacityLimiter | None = None, _limiter: Optional[anyio.CapacityLimiter] = None,
**kwargs: _P.kwargs **kwargs: _P.kwargs
) -> _T: ) -> _T:
if kwargs: # pragma: no cover if kwargs: # pragma: no cover
@ -31,7 +31,7 @@ async def run_in_threadpool(
@asynccontextmanager @asynccontextmanager
async def contextmanager_in_threadpool( async def contextmanager_in_threadpool(
cm: ContextManager[_T], cm: ContextManager[_T], limiter: Optional[anyio.CapacityLimiter] = None,
) -> AsyncGenerator[_T, None]: ) -> AsyncGenerator[_T, None]:
# blocking __exit__ from running waiting on a free thread # blocking __exit__ from running waiting on a free thread
# can create race conditions/deadlocks if the context manager itself # can create race conditions/deadlocks if the context manager itself
@ -41,16 +41,16 @@ async def contextmanager_in_threadpool(
# works (1 is arbitrary) # works (1 is arbitrary)
exit_limiter = CapacityLimiter(1) exit_limiter = CapacityLimiter(1)
try: try:
yield await run_in_threadpool(cm.__enter__) yield await run_in_threadpool(cm.__enter__, _limiter=limiter)
except Exception as e: except Exception as e:
ok = bool( ok = bool(
await anyio.to_thread.run_sync( await run_in_threadpool(
cm.__exit__, type(e), e, None, limiter=exit_limiter cm.__exit__, type(e), e, None, _limiter=exit_limiter
) )
) )
if not ok: if not ok:
raise e raise e
else: else:
await anyio.to_thread.run_sync( await run_in_threadpool(
cm.__exit__, None, None, None, limiter=exit_limiter cm.__exit__, None, None, None, _limiter=exit_limiter
) )

Loading…
Cancel
Save