Browse Source

Switch to `Any` return types as requested

pull/43/head
Alex Nørgaard 2 years ago
parent
commit
e755db61ea
No known key found for this signature in database GPG Key ID: 94D54F6A3604E97
  1. 90
      a2s/a2s_async.py
  2. 86
      a2s/a2s_sync.py

90
a2s/a2s_async.py

@ -6,7 +6,7 @@ import logging
import time import time
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Dict, Any,
List, List,
NoReturn, NoReturn,
Optional, Optional,
@ -14,7 +14,6 @@ from typing import (
Type, Type,
TypeVar, TypeVar,
Union, Union,
overload,
) )
from a2s.a2s_fragment import A2SFragment, decode_fragment from a2s.a2s_fragment import A2SFragment, decode_fragment
@ -22,8 +21,8 @@ from a2s.byteio import ByteReader
from a2s.defaults import DEFAULT_RETRIES from a2s.defaults import DEFAULT_RETRIES
from a2s.exceptions import BrokenMessageError from a2s.exceptions import BrokenMessageError
from .info import GoldSrcInfo, InfoProtocol, SourceInfo from .info import InfoProtocol
from .players import Player, PlayersProtocol from .players import PlayersProtocol
from .rules import RulesProtocol from .rules import RulesProtocol
if TYPE_CHECKING: if TYPE_CHECKING:
@ -36,102 +35,29 @@ PROTOCOLS = Union[InfoProtocol, PlayersProtocol, RulesProtocol]
logger: logging.Logger = logging.getLogger("a2s") logger: logging.Logger = logging.getLogger("a2s")
T = TypeVar("T", InfoProtocol, PlayersProtocol, RulesProtocol) ProtocolT = TypeVar("ProtocolT", InfoProtocol, PlayersProtocol, RulesProtocol)
@overload
async def request_async( async def request_async(
address: Tuple[str, int], address: Tuple[str, int],
timeout: float, timeout: float,
encoding: str, encoding: str,
a2s_proto: Type[InfoProtocol], a2s_proto: Type[ProtocolT],
) -> Union[SourceInfo, GoldSrcInfo]: ) -> Any:
...
@overload
async def request_async(
address: Tuple[str, int],
timeout: float,
encoding: str,
a2s_proto: Type[PlayersProtocol],
) -> List[Player]:
...
@overload
async def request_async(
address: Tuple[str, int],
timeout: float,
encoding: str,
a2s_proto: Type[RulesProtocol],
) -> Dict[Union[str, bytes], Union[str, bytes]]:
...
async def request_async(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[T]
) -> Union[
SourceInfo,
GoldSrcInfo,
List[Player],
Dict[Union[str, bytes], Union[str, bytes]],
]:
conn = await A2SStreamAsync.create(address, timeout) conn = await A2SStreamAsync.create(address, timeout)
response = await request_async_impl(conn, encoding, a2s_proto) response = await request_async_impl(conn, encoding, a2s_proto)
conn.close() conn.close()
return response return response
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[InfoProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Union[SourceInfo, GoldSrcInfo]:
...
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[PlayersProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> List[Player]:
...
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[RulesProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Dict[Union[str, bytes], Union[str, bytes]]:
...
async def request_async_impl( async def request_async_impl(
conn: A2SStreamAsync, conn: A2SStreamAsync,
encoding: str, encoding: str,
a2s_proto: Type[T], a2s_proto: Type[ProtocolT],
challenge: int = 0, challenge: int = 0,
retries: int = 0, retries: int = 0,
ping: Optional[float] = None, ping: Optional[float] = None,
) -> Union[ ) -> Any:
SourceInfo,
GoldSrcInfo,
Dict[Union[str, bytes], Union[str, bytes]],
List[Player],
]:
send_time = time.monotonic() send_time = time.monotonic()
resp_data = await conn.request(a2s_proto.serialize_request(challenge)) resp_data = await conn.request(a2s_proto.serialize_request(challenge))
recv_time = time.monotonic() recv_time = time.monotonic()

86
a2s/a2s_sync.py

@ -4,15 +4,15 @@ import io
import logging import logging
import socket import socket
import time import time
from typing import Dict, List, Optional, Tuple, Type, TypeVar, Union, overload from typing import Any, Optional, Tuple, Type, TypeVar, Union
from a2s.a2s_fragment import decode_fragment from a2s.a2s_fragment import decode_fragment
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
from a2s.defaults import DEFAULT_RETRIES from a2s.defaults import DEFAULT_RETRIES
from a2s.exceptions import BrokenMessageError from a2s.exceptions import BrokenMessageError
from .info import GoldSrcInfo, InfoProtocol, SourceInfo from .info import InfoProtocol
from .players import Player, PlayersProtocol from .players import PlayersProtocol
from .rules import RulesProtocol from .rules import RulesProtocol
HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF"
@ -25,86 +25,15 @@ logger: logging.Logger = logging.getLogger("a2s")
T = TypeVar("T", InfoProtocol, RulesProtocol, PlayersProtocol) T = TypeVar("T", InfoProtocol, RulesProtocol, PlayersProtocol)
@overload
def request_sync(
address: Tuple[str, int],
timeout: float,
encoding: str,
a2s_proto: Type[InfoProtocol],
) -> Union[SourceInfo, GoldSrcInfo]:
...
@overload
def request_sync(
address: Tuple[str, int],
timeout: float,
encoding: str,
a2s_proto: Type[PlayersProtocol],
) -> List[Player]:
...
@overload
def request_sync(
address: Tuple[str, int],
timeout: float,
encoding: str,
a2s_proto: Type[RulesProtocol],
) -> Dict[Union[str, bytes], Union[str, bytes]]:
...
def request_sync( def request_sync(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[T] address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[T]
) -> Union[ ) -> Any:
List[Player],
GoldSrcInfo,
SourceInfo,
Dict[Union[str, bytes], Union[str, bytes]],
]:
conn = A2SStream(address, timeout) conn = A2SStream(address, timeout)
response = request_sync_impl(conn, encoding, a2s_proto) response = request_sync_impl(conn, encoding, a2s_proto)
conn.close() conn.close()
return response return response
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[InfoProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Union[SourceInfo, GoldSrcInfo]:
...
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[PlayersProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> List[Player]:
...
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[RulesProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Dict[Union[str, bytes], Union[str, bytes]]:
...
def request_sync_impl( def request_sync_impl(
conn: A2SStream, conn: A2SStream,
encoding: str, encoding: str,
@ -112,12 +41,7 @@ def request_sync_impl(
challenge: int = 0, challenge: int = 0,
retries: int = 0, retries: int = 0,
ping: Optional[float] = None, ping: Optional[float] = None,
) -> Union[ ) -> Any:
SourceInfo,
GoldSrcInfo,
List[Player],
Dict[Union[str, bytes], Union[str, bytes]],
]:
send_time = time.monotonic() send_time = time.monotonic()
resp_data = conn.request(a2s_proto.serialize_request(challenge)) resp_data = conn.request(a2s_proto.serialize_request(challenge))
recv_time = time.monotonic() recv_time = time.monotonic()

Loading…
Cancel
Save