Browse Source

Merge branch 'feature/lifespan-scoped-dependencies' of github.com:UltimateLobster/fastapi into feature/lifespan-scoped-dependencies

pull/12529/head
Nir Schulman 8 months ago
parent
commit
17143bb3ac
  1. 52
      docs/en/docs/tutorial/dependencies/lifespan-scoped-dependencies.md
  2. 9
      docs_src/dependencies/tutorial013a.py
  3. 6
      docs_src/dependencies/tutorial013a_an_py39.py
  4. 23
      docs_src/dependencies/tutorial013b.py
  5. 16
      docs_src/dependencies/tutorial013b_an_py39.py
  6. 13
      docs_src/dependencies/tutorial013c.py
  7. 18
      docs_src/dependencies/tutorial013c_an_py39.py
  8. 9
      docs_src/dependencies/tutorial013d.py
  9. 13
      docs_src/dependencies/tutorial013d_an_py39.py
  10. 10
      tests/test_tutorial/test_dependencies/test_tutorial013a.py
  11. 10
      tests/test_tutorial/test_dependencies/test_tutorial013a_an_py39.py
  12. 29
      tests/test_tutorial/test_dependencies/test_tutorial013b.py
  13. 29
      tests/test_tutorial/test_dependencies/test_tutorial013b_an_py39.py
  14. 10
      tests/test_tutorial/test_dependencies/test_tutorial013c.py
  15. 10
      tests/test_tutorial/test_dependencies/test_tutorial013c_an_py39.py
  16. 10
      tests/test_tutorial/test_dependencies/test_tutorial013d.py
  17. 10
      tests/test_tutorial/test_dependencies/test_tutorial013d_an_py39.py

52
docs/en/docs/tutorial/dependencies/lifespan-scoped-dependencies.md

@ -4,68 +4,68 @@ So far we've used dependencies which are "endpoint scoped". Meaning, they are
called again and again for every incoming request to the endpoint. However, called again and again for every incoming request to the endpoint. However,
this is not ideal for all kinds of dependencies. this is not ideal for all kinds of dependencies.
Sometimes dependencies have a large setup/teardown time, or there is a need Sometimes dependencies have a large setup/teardown time, or there is a need
for their value to be shared throughout the lifespan of the application. An for their value to be shared throughout the lifespan of the application. An
example of this would be a connection to a database. Databases are typically example of this would be a connection to a database. Databases are typically
less efficient when working with lots of connections and would prefer that less efficient when working with lots of connections and would prefer that
clients would create a single connection for their operations. clients would create a single connection for their operations.
For such cases, you might want to use "lifespan scoped" dependencies. For such cases, you might want to use "lifespan scoped" dependencies.
## Intro ## Intro
Lifespan scoped dependencies work similarly to the dependencies we've worked Lifespan scoped dependencies work similarly to the dependencies we've worked
with so far (which are endpoint scoped). However, they are called once and only with so far (which are endpoint scoped). However, they are called once and only
once in the application's lifespan (instead of being called again and again for once in the application's lifespan (instead of being called again and again for
every request). The returned value will be shared across all requests that need every request). The returned value will be shared across all requests that need
it. it.
## Create a lifespan scoped dependency ## Create a lifespan scoped dependency
You may declare a dependency as a lifespan scoped dependency by passing You may declare a dependency as a lifespan scoped dependency by passing
`dependency_scope="lifespan"` to the `Depends` function: `dependency_scope="lifespan"` to the `Depends` function:
{* ../../docs_src/dependencies/tutorial013a_an_py39.py hl[16] *} {* ../../docs_src/dependencies/tutorial013a_an_py39.py hl[16] *}
/// tip /// tip
In the example above we saved the annotation to a separate variable, and then In the example above we saved the annotation to a separate variable, and then
reused it in our endpoints. This is not a requirement, we could also declare reused it in our endpoints. This is not a requirement, we could also declare
the exact same annotation in both endpoints. However, it is recommended that you the exact same annotation in both endpoints. However, it is recommended that you
do save the annotation to a variable so you won't accidentally forget to pass do save the annotation to a variable so you won't accidentally forget to pass
`dependency_scope="lifespan"` to some of the endpoints (Causing the endpoint `dependency_scope="lifespan"` to some of the endpoints (Causing the endpoint
to create a new database connection for every request). to create a new database connection for every request).
/// ///
In this example, the `get_database_connection` dependency will be executed once, In this example, the `get_database_connection` dependency will be executed once,
during the application's startup. **FastAPI** will internally save the resulting during the application's startup. **FastAPI** will internally save the resulting
connection object, and whenever the `read_users` and `read_items` endpoints are connection object, and whenever the `read_users` and `read_items` endpoints are
called, they will be using the previously saved connection. Once the application called, they will be using the previously saved connection. Once the application
shuts down, **FastAPI** will make sure to gracefully close the connection object. shuts down, **FastAPI** will make sure to gracefully close the connection object.
## The `use_cache` argument ## The `use_cache` argument
The `use_cache` argument works similarly to the way it worked with endpoint The `use_cache` argument works similarly to the way it worked with endpoint
scoped dependencies. Meaning as **FastAPI** gathers lifespan scoped dependencies, it scoped dependencies. Meaning as **FastAPI** gathers lifespan scoped dependencies, it
will cache dependencies it already encountered before. However, you can disable will cache dependencies it already encountered before. However, you can disable
this behavior by passing `use_cache=False` to `Depends`: this behavior by passing `use_cache=False` to `Depends`:
{* ../../docs_src/dependencies/tutorial013b_an_py39.py hl[16] *} {* ../../docs_src/dependencies/tutorial013b_an_py39.py hl[16] *}
In this example, the `read_users` and `read_groups` endpoints are using In this example, the `read_users` and `read_groups` endpoints are using
`use_cache=False` whereas the `read_items` and `read_item` are using `use_cache=False` whereas the `read_items` and `read_item` are using
`use_cache=True`. That means that we'll have a total of 3 connections created `use_cache=True`. That means that we'll have a total of 3 connections created
for the duration of the application's lifespan. One connection will be shared for the duration of the application's lifespan. One connection will be shared
across all requests for the `read_items` and `read_item` endpoints. A second across all requests for the `read_items` and `read_item` endpoints. A second
connection will be shared across all requests for the `read_users` endpoint. The connection will be shared across all requests for the `read_users` endpoint. The
third and final connection will be shared across all requests for the third and final connection will be shared across all requests for the
`read_groups` endpoint. `read_groups` endpoint.
## Lifespan Scoped Sub-Dependencies ## Lifespan Scoped Sub-Dependencies
Just like with endpoint scoped dependencies, lifespan scoped dependencies may Just like with endpoint scoped dependencies, lifespan scoped dependencies may
use other lifespan scoped sub-dependencies themselves: use other lifespan scoped sub-dependencies themselves:
{* ../../docs_src/dependencies/tutorial013c_an_py39.py hl[16] *} {* ../../docs_src/dependencies/tutorial013c_an_py39.py hl[16] *}
@ -87,8 +87,8 @@ sub-dependencies.
## Dependency Scope Conflicts ## Dependency Scope Conflicts
By definition, lifespan scoped dependencies are being setup in the application's By definition, lifespan scoped dependencies are being setup in the application's
startup process, before any request is ever being made to any endpoint. startup process, before any request is ever being made to any endpoint.
Therefore, it is not possible for a lifespan scoped dependency to use any Therefore, it is not possible for a lifespan scoped dependency to use any
parameters that require the scope of an endpoint. parameters that require the scope of an endpoint.
That includes but not limited to: That includes but not limited to:

9
docs_src/dependencies/tutorial013a.py

@ -18,6 +18,7 @@ class MyDatabaseConnection:
async def get_records(self, table_name: str) -> List[dict]: async def get_records(self, table_name: str) -> List[dict]:
pass pass
app = FastAPI() app = FastAPI()
@ -30,10 +31,14 @@ GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="li
@app.get("/users/") @app.get("/users/")
async def read_users(database_connection: MyDatabaseConnection = GlobalDatabaseConnection): async def read_users(
database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
):
return await database_connection.get_records("users") return await database_connection.get_records("users")
@app.get("/items/") @app.get("/items/")
async def read_items(database_connection: MyDatabaseConnection = GlobalDatabaseConnection): async def read_items(
database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
):
return await database_connection.get_records("items") return await database_connection.get_records("items")

6
docs_src/dependencies/tutorial013a_an_py39.py

@ -8,6 +8,7 @@ class MyDatabaseConnection:
""" """
This is a mock just for example purposes. This is a mock just for example purposes.
""" """
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
return self return self
@ -26,7 +27,10 @@ async def get_database_connection():
yield connection yield connection
GlobalDatabaseConnection = Annotated[MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")] GlobalDatabaseConnection = Annotated[
MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")
]
@app.get("/users/") @app.get("/users/")
async def read_users(database_connection: GlobalDatabaseConnection): async def read_users(database_connection: GlobalDatabaseConnection):

23
docs_src/dependencies/tutorial013b.py

@ -21,6 +21,7 @@ class MyDatabaseConnection:
async def get_record(self, table_name: str, record_id: str) -> dict: async def get_record(self, table_name: str, record_id: str) -> dict:
pass pass
app = FastAPI() app = FastAPI()
@ -29,26 +30,36 @@ async def get_database_connection():
yield connection yield connection
GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="lifespan") GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="lifespan")
DedicatedDatabaseConnection = Depends(get_database_connection, dependency_scope="lifespan", use_cache=False) DedicatedDatabaseConnection = Depends(
get_database_connection, dependency_scope="lifespan", use_cache=False
)
@app.get("/groups/") @app.get("/groups/")
async def read_groups(database_connection: MyDatabaseConnection = DedicatedDatabaseConnection): async def read_groups(
database_connection: MyDatabaseConnection = DedicatedDatabaseConnection,
):
return await database_connection.get_records("groups") return await database_connection.get_records("groups")
@app.get("/users/") @app.get("/users/")
async def read_users(database_connection: MyDatabaseConnection = DedicatedDatabaseConnection): async def read_users(
database_connection: MyDatabaseConnection = DedicatedDatabaseConnection,
):
return await database_connection.get_records("users") return await database_connection.get_records("users")
@app.get("/items/") @app.get("/items/")
async def read_items(database_connection: MyDatabaseConnection = GlobalDatabaseConnection): async def read_items(
database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
):
return await database_connection.get_records("items") return await database_connection.get_records("items")
@app.get("/items/{item_id}") @app.get("/items/{item_id}")
async def read_item( async def read_item(
item_id: str = Path(), item_id: str = Path(),
database_connection: MyDatabaseConnection = GlobalDatabaseConnection database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
): ):
return await database_connection.get_record("items", item_id) return await database_connection.get_record("items", item_id)

16
docs_src/dependencies/tutorial013b_an_py39.py

@ -21,6 +21,7 @@ class MyDatabaseConnection:
async def get_record(self, table_name: str, record_id: str) -> dict: async def get_record(self, table_name: str, record_id: str) -> dict:
pass pass
app = FastAPI() app = FastAPI()
@ -29,13 +30,20 @@ async def get_database_connection():
yield connection yield connection
GlobalDatabaseConnection = Annotated[MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")] GlobalDatabaseConnection = Annotated[
DedicatedDatabaseConnection = Annotated[MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan", use_cache=False)] MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")
]
DedicatedDatabaseConnection = Annotated[
MyDatabaseConnection,
Depends(get_database_connection, dependency_scope="lifespan", use_cache=False),
]
@app.get("/groups/") @app.get("/groups/")
async def read_groups(database_connection: DedicatedDatabaseConnection): async def read_groups(database_connection: DedicatedDatabaseConnection):
return await database_connection.get_records("groups") return await database_connection.get_records("groups")
@app.get("/users/") @app.get("/users/")
async def read_users(database_connection: DedicatedDatabaseConnection): async def read_users(database_connection: DedicatedDatabaseConnection):
return await database_connection.get_records("users") return await database_connection.get_records("users")
@ -45,9 +53,9 @@ async def read_users(database_connection: DedicatedDatabaseConnection):
async def read_items(database_connection: GlobalDatabaseConnection): async def read_items(database_connection: GlobalDatabaseConnection):
return await database_connection.get_records("items") return await database_connection.get_records("items")
@app.get("/items/{item_id}") @app.get("/items/{item_id}")
async def read_item( async def read_item(
database_connection: GlobalDatabaseConnection, database_connection: GlobalDatabaseConnection, item_id: Annotated[str, Path()]
item_id: Annotated[str, Path()]
): ):
return await database_connection.get_record("items", item_id) return await database_connection.get_record("items", item_id)

13
docs_src/dependencies/tutorial013c.py

@ -9,6 +9,7 @@ class MyDatabaseConnection:
""" """
This is a mock just for example purposes. This is a mock just for example purposes.
""" """
connection_string: str connection_string: str
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
@ -20,6 +21,7 @@ class MyDatabaseConnection:
async def get_record(self, table_name: str, record_id: str) -> dict: async def get_record(self, table_name: str, record_id: str) -> dict:
pass pass
app = FastAPI() app = FastAPI()
@ -28,20 +30,21 @@ async def get_configuration() -> dict:
"database_url": "sqlite:///database.db", "database_url": "sqlite:///database.db",
} }
GlobalConfiguration = Depends(get_configuration, dependency_scope="lifespan") GlobalConfiguration = Depends(get_configuration, dependency_scope="lifespan")
async def get_database_connection( async def get_database_connection(configuration: dict = GlobalConfiguration):
configuration: dict = GlobalConfiguration
):
async with MyDatabaseConnection(configuration["database_url"]) as connection: async with MyDatabaseConnection(configuration["database_url"]) as connection:
yield connection yield connection
GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="lifespan") GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="lifespan")
@app.get("/users/{user_id}") @app.get("/users/{user_id}")
async def read_user( async def read_user(
database_connection: MyDatabaseConnection = GlobalDatabaseConnection, database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
user_id: str = Path() user_id: str = Path(),
): ):
return await database_connection.get_record("users", user_id) return await database_connection.get_record("users", user_id)

18
docs_src/dependencies/tutorial013c_an_py39.py

@ -10,6 +10,7 @@ class MyDatabaseConnection:
""" """
This is a mock just for example purposes. This is a mock just for example purposes.
""" """
connection_string: str connection_string: str
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
@ -33,20 +34,25 @@ async def get_configuration() -> dict:
"database_url": "sqlite:///database.db", "database_url": "sqlite:///database.db",
} }
GlobalConfiguration = Annotated[dict, Depends(get_configuration, dependency_scope="lifespan")]
GlobalConfiguration = Annotated[
dict, Depends(get_configuration, dependency_scope="lifespan")
]
async def get_database_connection(configuration: GlobalConfiguration): async def get_database_connection(configuration: GlobalConfiguration):
async with MyDatabaseConnection( async with MyDatabaseConnection(configuration["database_url"]) as connection:
configuration["database_url"]) as connection:
yield connection yield connection
GlobalDatabaseConnection = Annotated[get_database_connection, Depends(get_database_connection, dependency_scope="lifespan")]
GlobalDatabaseConnection = Annotated[
get_database_connection,
Depends(get_database_connection, dependency_scope="lifespan"),
]
@app.get("/users/{user_id}") @app.get("/users/{user_id}")
async def read_user( async def read_user(
database_connection: GlobalDatabaseConnection, database_connection: GlobalDatabaseConnection, user_id: Annotated[str, Path()]
user_id: Annotated[str, Path()]
): ):
return await database_connection.get_record("users", user_id) return await database_connection.get_record("users", user_id)

9
docs_src/dependencies/tutorial013d.py

@ -21,6 +21,7 @@ class MyDatabaseConnection:
async def get_record(self, table_name: str, record_id: str) -> dict: async def get_record(self, table_name: str, record_id: str) -> dict:
pass pass
app = FastAPI() app = FastAPI()
@ -33,14 +34,12 @@ GlobalDatabaseConnection = Depends(get_database_connection, dependency_scope="li
async def get_user_record( async def get_user_record(
database_connection: MyDatabaseConnection = GlobalDatabaseConnection, database_connection: MyDatabaseConnection = GlobalDatabaseConnection,
user_id: str = Path() user_id: str = Path(),
) -> dict: ) -> dict:
return await database_connection.get_record("users", user_id) return await database_connection.get_record("users", user_id)
@app.get("/users/{user_id}") @app.get("/users/{user_id}")
async def read_user( async def read_user(user_record: dict = Depends(get_user_record)):
user_record: dict = Depends(get_user_record)
):
return user_record return user_record

13
docs_src/dependencies/tutorial013d_an_py39.py

@ -21,6 +21,7 @@ class MyDatabaseConnection:
async def get_record(self, table_name: str, record_id: str) -> dict: async def get_record(self, table_name: str, record_id: str) -> dict:
pass pass
app = FastAPI() app = FastAPI()
@ -29,17 +30,17 @@ async def get_database_connection():
yield connection yield connection
GlobalDatabaseConnection = Annotated[MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")] GlobalDatabaseConnection = Annotated[
MyDatabaseConnection, Depends(get_database_connection, dependency_scope="lifespan")
]
async def get_user_record( async def get_user_record(
database_connection: GlobalDatabaseConnection, database_connection: GlobalDatabaseConnection, user_id: Annotated[str, Path()]
user_id: Annotated[str, Path()]
) -> dict: ) -> dict:
return await database_connection.get_record("users", user_id) return await database_connection.get_record("users", user_id)
@app.get("/users/{user_id}") @app.get("/users/{user_id}")
async def read_user( async def read_user(user_record: Annotated[dict, Depends(get_user_record)]):
user_record: Annotated[dict, Depends(get_user_record)]
):
return user_record return user_record

10
tests/test_tutorial/test_dependencies/test_tutorial013a.py

@ -34,11 +34,7 @@ class MockDatabaseConnection:
def database_connection_mock(monkeypatch) -> MockDatabaseConnection: def database_connection_mock(monkeypatch) -> MockDatabaseConnection:
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", lambda *args, **kwargs: mock)
MyDatabaseConnection,
"__new__",
lambda *args, **kwargs: mock
)
return mock return mock
@ -50,13 +46,13 @@ def test_dependency_usage(database_connection_mock):
assert database_connection_mock.enter_count == 1 assert database_connection_mock.enter_count == 1
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
response = test_client.get('/users') response = test_client.get("/users")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
assert database_connection_mock.get_records_count == 1 assert database_connection_mock.get_records_count == 1
response = test_client.get('/items') response = test_client.get("/items")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []

10
tests/test_tutorial/test_dependencies/test_tutorial013a_an_py39.py

@ -35,11 +35,7 @@ class MockDatabaseConnection:
def database_connection_mock(monkeypatch) -> MockDatabaseConnection: def database_connection_mock(monkeypatch) -> MockDatabaseConnection:
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", lambda *args, **kwargs: mock)
MyDatabaseConnection,
"__new__",
lambda *args, **kwargs: mock
)
return mock return mock
@ -52,13 +48,13 @@ def test_dependency_usage(database_connection_mock):
assert database_connection_mock.enter_count == 1 assert database_connection_mock.enter_count == 1
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
response = test_client.get('/users') response = test_client.get("/users")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
assert database_connection_mock.get_records_count == 1 assert database_connection_mock.get_records_count == 1
response = test_client.get('/items') response = test_client.get("/items")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []

29
tests/test_tutorial/test_dependencies/test_tutorial013b.py

@ -40,22 +40,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(*args, **kwargs): def _get_new_connection_mock(*args, **kwargs):
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
monkeypatch.setattr(
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -70,7 +65,7 @@ def test_dependency_usage(database_connection_mocks):
assert connection.get_records_count == 0 assert connection.get_records_count == 0
assert connection.get_record_count == 0 assert connection.get_record_count == 0
response = test_client.get('/users') response = test_client.get("/users")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
@ -80,9 +75,11 @@ def test_dependency_usage(database_connection_mocks):
users_connection = connection users_connection = connection
break break
assert users_connection is not None, "No connection was found for users endpoint" assert (
users_connection is not None
), "No connection was found for users endpoint"
response = test_client.get('/groups') response = test_client.get("/groups")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
@ -92,7 +89,9 @@ def test_dependency_usage(database_connection_mocks):
groups_connection = connection groups_connection = connection
break break
assert groups_connection is not None, "No connection was found for groups endpoint" assert (
groups_connection is not None
), "No connection was found for groups endpoint"
assert groups_connection.get_records_count == 1 assert groups_connection.get_records_count == 1
items_connection = None items_connection = None
@ -101,16 +100,18 @@ def test_dependency_usage(database_connection_mocks):
items_connection = connection items_connection = connection
break break
assert items_connection is not None, "No connection was found for items endpoint" assert (
items_connection is not None
), "No connection was found for items endpoint"
response = test_client.get('/items') response = test_client.get("/items")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
assert items_connection.get_records_count == 1 assert items_connection.get_records_count == 1
assert items_connection.get_record_count == 0 assert items_connection.get_record_count == 0
response = test_client.get('/items/asd') response = test_client.get("/items/asd")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "items", "table_name": "items",

29
tests/test_tutorial/test_dependencies/test_tutorial013b_an_py39.py

@ -41,22 +41,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(*args, **kwargs): def _get_new_connection_mock(*args, **kwargs):
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
monkeypatch.setattr(
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -72,7 +67,7 @@ def test_dependency_usage(database_connection_mocks):
assert connection.get_records_count == 0 assert connection.get_records_count == 0
assert connection.get_record_count == 0 assert connection.get_record_count == 0
response = test_client.get('/users') response = test_client.get("/users")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
@ -82,9 +77,11 @@ def test_dependency_usage(database_connection_mocks):
users_connection = connection users_connection = connection
break break
assert users_connection is not None, "No connection was found for users endpoint" assert (
users_connection is not None
), "No connection was found for users endpoint"
response = test_client.get('/groups') response = test_client.get("/groups")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
@ -94,7 +91,9 @@ def test_dependency_usage(database_connection_mocks):
groups_connection = connection groups_connection = connection
break break
assert groups_connection is not None, "No connection was found for groups endpoint" assert (
groups_connection is not None
), "No connection was found for groups endpoint"
assert groups_connection.get_records_count == 1 assert groups_connection.get_records_count == 1
items_connection = None items_connection = None
@ -103,16 +102,18 @@ def test_dependency_usage(database_connection_mocks):
items_connection = connection items_connection = connection
break break
assert items_connection is not None, "No connection was found for items endpoint" assert (
items_connection is not None
), "No connection was found for items endpoint"
response = test_client.get('/items') response = test_client.get("/items")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [] assert response.json() == []
assert items_connection.get_records_count == 1 assert items_connection.get_records_count == 1
assert items_connection.get_record_count == 0 assert items_connection.get_record_count == 0
response = test_client.get('/items/asd') response = test_client.get("/items/asd")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "items", "table_name": "items",

10
tests/test_tutorial/test_dependencies/test_tutorial013c.py

@ -34,21 +34,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(cls, url): def _get_new_connection_mock(cls, url):
mock = MockDatabaseConnection(url) mock = MockDatabaseConnection(url)
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -64,7 +60,7 @@ def test_dependency_usage(database_connection_mocks):
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
assert database_connection_mock.get_record_count == 0 assert database_connection_mock.get_record_count == 0
response = test_client.get('/users/user') response = test_client.get("/users/user")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "users", "table_name": "users",

10
tests/test_tutorial/test_dependencies/test_tutorial013c_an_py39.py

@ -35,21 +35,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(cls, url): def _get_new_connection_mock(cls, url):
mock = MockDatabaseConnection(url) mock = MockDatabaseConnection(url)
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -66,7 +62,7 @@ def test_dependency_usage(database_connection_mocks):
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
assert database_connection_mock.get_record_count == 0 assert database_connection_mock.get_record_count == 0
response = test_client.get('/users/user') response = test_client.get("/users/user")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "users", "table_name": "users",

10
tests/test_tutorial/test_dependencies/test_tutorial013d.py

@ -33,21 +33,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(*args, **kwargs): def _get_new_connection_mock(*args, **kwargs):
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -62,7 +58,7 @@ def test_dependency_usage(database_connection_mocks):
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
assert database_connection_mock.get_record_count == 0 assert database_connection_mock.get_record_count == 0
response = test_client.get('/users/user') response = test_client.get("/users/user")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "users", "table_name": "users",

10
tests/test_tutorial/test_dependencies/test_tutorial013d_an_py39.py

@ -34,21 +34,17 @@ class MockDatabaseConnection:
} }
@pytest.fixture @pytest.fixture
def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]: def database_connection_mocks(monkeypatch) -> List[MockDatabaseConnection]:
connections = [] connections = []
def _get_new_connection_mock(*args, **kwargs): def _get_new_connection_mock(*args, **kwargs):
mock = MockDatabaseConnection() mock = MockDatabaseConnection()
connections.append(mock) connections.append(mock)
return mock return mock
monkeypatch.setattr( monkeypatch.setattr(MyDatabaseConnection, "__new__", _get_new_connection_mock)
MyDatabaseConnection,
"__new__",
_get_new_connection_mock
)
return connections return connections
@ -64,7 +60,7 @@ def test_dependency_usage(database_connection_mocks):
assert database_connection_mock.exit_count == 0 assert database_connection_mock.exit_count == 0
assert database_connection_mock.get_record_count == 0 assert database_connection_mock.get_record_count == 0
response = test_client.get('/users/user') response = test_client.get("/users/user")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == { assert response.json() == {
"table_name": "users", "table_name": "users",

Loading…
Cancel
Save