imp 3 days ago
committed by GitHub
parent
commit
69bbc97ba1
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      discord/client.py
  2. 393
      discord/http.py

3
discord/client.py

@ -252,6 +252,8 @@ class Client:
sync your system clock to Google's NTP server. sync your system clock to Google's NTP server.
.. versionadded:: 1.3 .. versionadded:: 1.3
.. deprecated:: 2.7
This parameter is deprecated and will be removed in a future version as it is unused.
enable_debug_events: :class:`bool` enable_debug_events: :class:`bool`
Whether to enable events that are useful only for debugging gateway related information. Whether to enable events that are useful only for debugging gateway related information.
@ -314,7 +316,6 @@ class Client:
connector, connector,
proxy=proxy, proxy=proxy,
proxy_auth=proxy_auth, proxy_auth=proxy_auth,
unsync_clock=unsync_clock,
http_trace=http_trace, http_trace=http_trace,
max_ratelimit_timeout=max_ratelimit_timeout, max_ratelimit_timeout=max_ratelimit_timeout,
) )

393
discord/http.py

@ -26,6 +26,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import time
import sys import sys
from typing import ( from typing import (
Any, Any,
@ -46,7 +47,6 @@ from typing import (
Union, Union,
) )
from urllib.parse import quote as _uriquote from urllib.parse import quote as _uriquote
from collections import deque
import datetime import datetime
import aiohttp import aiohttp
@ -349,159 +349,171 @@ class Ratelimit:
This is similar to a semaphore except tailored to Discord's rate limits. This is aware of This is similar to a semaphore except tailored to Discord's rate limits. This is aware of
the expiry of a token window, along with the number of tokens available. The goal of this the expiry of a token window, along with the number of tokens available. The goal of this
design is to increase throughput of requests being sent concurrently rather than forcing design is to increase throughput of requests being sent concurrently by forcing everything
everything into a single lock queue per route. into a single lock queue per route.
""" """
__slots__ = ( __slots__ = (
'key',
'limit', 'limit',
'remaining', 'remaining',
'reset_at',
'pending',
'outgoing', 'outgoing',
'reset_after', 'one_shot',
'expires', 'http',
'dirty',
'_last_request', '_last_request',
'_max_ratelimit_timeout', '_lock',
'_loop', '_future',
'_pending_requests',
'_sleeping',
) )
def __init__(self, max_ratelimit_timeout: Optional[float]) -> None: def __init__(self, http: HTTPClient, key: str) -> None:
self.key = key
self.limit: int = 1 self.limit: int = 1
self.remaining: int = self.limit self.remaining: int = self.limit
self.reset_at: float = 0.0
self.pending: int = 0
self.outgoing: int = 0 self.outgoing: int = 0
self.reset_after: float = 0.0 self.one_shot: bool = False
self.expires: Optional[float] = None self.http: HTTPClient = http
self.dirty: bool = False self._last_request: float = 0.0
self._max_ratelimit_timeout: Optional[float] = max_ratelimit_timeout self._lock: asyncio.Lock = asyncio.Lock()
self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() self._future: Optional[asyncio.Future[None]] = None
self._pending_requests: deque[asyncio.Future[Any]] = deque()
# Only a single rate limit object should be sleeping at a time.
# The object that is sleeping is ultimately responsible for freeing the semaphore
# for the requests currently pending.
self._sleeping: asyncio.Lock = asyncio.Lock()
self._last_request: float = self._loop.time()
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return f'<RateLimitBucket limit={self.limit} remaining={self.remaining} pending={self.pending}>'
f'<RateLimitBucket limit={self.limit} remaining={self.remaining} pending_requests={len(self._pending_requests)}>'
) def no_headers(self) -> None:
self.one_shot = True
self._wake()
def reset(self): def reset(self) -> None:
self.remaining = self.limit - self.outgoing self.remaining = self.limit - self.outgoing
self.expires = None self.reset_at = 0.0
self.reset_after = 0.0
self.dirty = False
def update(self, response: aiohttp.ClientResponse, *, use_clock: bool = False) -> None: def update(self, response: aiohttp.ClientResponse, data: Union[Dict[str, Any], str]) -> bool:
headers = response.headers limit = int(response.headers['X-Ratelimit-Limit'])
self.limit = int(headers.get('X-Ratelimit-Limit', 1)) reset_at = self.http.loop.time() + (float(response.headers['X-Ratelimit-Reset']) - time.time())
if self.dirty: # Shared scope (which only appears on 429s) could have a longer "reset_at".
self.remaining = min(int(headers.get('X-Ratelimit-Remaining', 0)), self.limit - self.outgoing) if response.headers.get('X-RateLimit-Scope') == 'shared':
reset_at = max(reset_at, self.http.loop.time() + data['retry_after']) # type: ignore
remaining = 0
else: else:
self.remaining = int(headers.get('X-Ratelimit-Remaining', 0)) # Consider a lower remaining value because updates can be out of order, so self.outgoing is used
self.dirty = True remaining = min(int(response.headers['X-Ratelimit-Remaining']), limit - self.outgoing)
reset_after = headers.get('X-Ratelimit-Reset-After')
if use_clock or not reset_after:
utc = datetime.timezone.utc
now = datetime.datetime.now(utc)
reset = datetime.datetime.fromtimestamp(float(headers['X-Ratelimit-Reset']), utc)
self.reset_after = (reset - now).total_seconds()
else:
self.reset_after = float(reset_after)
self.expires = self._loop.time() + self.reset_after # The checks below combats out-of-order responses and alternating sub ratelimits.
# As a result, there will be lower throughput for routes with subratelimits.
# Unless we document and maintain the undocumented and unstable subratelimits ourselves.
def _wake_next(self) -> None: # 1. Completely ignore if the ratelimit window has expired; that data is useless
while self._pending_requests: if self.http.loop.time() >= reset_at:
future = self._pending_requests.popleft() return False
if not future.done():
future.set_result(None)
break
def _wake(self, count: int = 1, *, exception: Optional[RateLimited] = None) -> None: # 2. Always use the longest reset_at value
awaken = 0 update = False
while self._pending_requests: if reset_at > self.reset_at:
future = self._pending_requests.popleft() self.reset_at = reset_at
if not future.done(): update = True
if exception:
future.set_exception(exception)
else:
future.set_result(None)
awaken += 1
if awaken >= count: # 3. Always use the lowest remaining value
break if remaining < self.remaining:
self.remaining = remaining
update = True
async def _refresh(self) -> None: self.limit = limit
error = self._max_ratelimit_timeout and self.reset_after > self._max_ratelimit_timeout self.one_shot = False
exception = RateLimited(self.reset_after) if error else None
async with self._sleeping:
if not error:
await asyncio.sleep(self.reset_after)
self.reset() # Return whether this update was relevant or not.
self._wake(self.remaining, exception=exception) return update
def is_expired(self) -> bool:
return self.expires is not None and self._loop.time() > self.expires
def is_inactive(self) -> bool: def is_inactive(self) -> bool:
delta = self._loop.time() - self._last_request delta = self.http.loop.time() - self._last_request
return delta >= 300 and self.outgoing == 0 and len(self._pending_requests) == 0 return delta >= 300 and (self.one_shot or (self.outgoing == 0 and self.pending == 0))
def _wake(self) -> None:
if self._future is not None and not self._future.done():
self._future.set_result(None)
async def _wait_global(self, start_time: float):
# Sleep up to 3 times, to account for global reset at overwriting during sleeps
for i in range(3):
seconds = self.http.global_reset_at - start_time
if seconds > 0:
await asyncio.sleep(seconds)
continue
break
else:
raise RuntimeError('Global reset at changed more than 3 times')
async def _wait(self):
# Consider waiting if none is remaining
if self.remaining == 0:
# If reset_at is not set yet, wait for the last request, if outgoing, to finish first
# for up to 3 seconds instead of using aiohttp's default 5 min timeout.
if self.reset_at == 0.0 and (self._last_request == 0.0 or self.http.loop.time() - self._last_request < 3):
self._future = future = self.http.loop.create_future()
try:
await asyncio.wait_for(future, 3)
except asyncio.TimeoutError:
fmt = 'Initial request for rate limit bucket (%s) never finished. Skipping.'
_log.warning(fmt, self.key)
future.cancel()
async def acquire(self) -> None: # If none are still remaining then start sleeping
self._last_request = self._loop.time() if self.remaining == 0 and not self.one_shot:
if self.is_expired(): # Sleep up to 3 times, giving room for a bucket update and a bucket change
# or 2 sub-ratelimit bucket changes, prioritizing is handled in update()
for i in range(3):
seconds = self.reset_at - self.http.loop.time()
copy = self.reset_at
if seconds > 0:
if i:
fmt = 'A rate limit bucket (%s) has changed reset_at during sleep. Sleeping again for %.2f seconds.'
else:
fmt = 'A rate limit bucket (%s) has been exhausted. Sleeping for %.2f seconds.'
_log.warning(fmt, self.key, seconds)
await asyncio.sleep(seconds)
if copy == self.reset_at:
self.reset() self.reset()
elif self.remaining == 0 and not self.one_shot:
continue # sleep again
break
else:
raise RuntimeError('Reset at changed more than 3 times')
if self._max_ratelimit_timeout is not None and self.expires is not None: async def acquire(self):
# Check if we can pre-emptively block this request for having too large of a timeout start_time: float = self.http.loop.time()
current_reset_after = self.expires - self._loop.time()
if current_reset_after > self._max_ratelimit_timeout:
raise RateLimited(current_reset_after)
while self.remaining <= 0:
future = self._loop.create_future()
self._pending_requests.append(future)
try:
while not future.done():
# 30 matches the smallest allowed max_ratelimit_timeout
max_wait_time = self.expires - self._loop.time() if self.expires else 30
await asyncio.wait([future], timeout=max_wait_time)
if not future.done():
await self._refresh()
except:
future.cancel()
if self.remaining > 0 and not future.cancelled():
self._wake_next()
raise
self.remaining -= 1 # Resources confirmed to be "one shots" after the first request like interaction response
# don't have ratelimits and can skip the entire queue logic except global wait
if self.one_shot:
await self._wait_global(start_time)
else:
# Ensure only 1 request goes through the inner acquire logic at a time
self.pending += 1
async with self._lock:
await self._wait()
await self._wait_global(start_time)
self.remaining -= 1 # one shot changing this doesn't matter
self.pending -= 1
self.outgoing += 1 self.outgoing += 1
self._last_request = self.http.loop.time()
return self
async def release(self):
if not self.one_shot:
self.outgoing -= 1
self._wake()
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
await self.acquire() await self.acquire()
return self return self
async def __aexit__(self, type: Type[BE], value: BE, traceback: TracebackType) -> None: async def __aexit__(self, type: Type[BE], value: BE, traceback: TracebackType) -> None:
self.outgoing -= 1 await self.release()
tokens = self.remaining - self.outgoing
# Check whether the rate limit needs to be pre-emptively slept on
# Note that this is a Lock to prevent multiple rate limit objects from sleeping at once
if not self._sleeping.locked():
if tokens <= 0:
await self._refresh()
elif self._pending_requests:
exception = (
RateLimited(self.reset_after)
if self._max_ratelimit_timeout and self.reset_after > self._max_ratelimit_timeout
else None
)
self._wake(tokens, exception=exception)
# For some reason, the Discord voice websocket expects this header to be # For some reason, the Discord voice websocket expects this header to be
@ -519,7 +531,6 @@ class HTTPClient:
*, *,
proxy: Optional[str] = None, proxy: Optional[str] = None,
proxy_auth: Optional[aiohttp.BasicAuth] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None,
unsync_clock: bool = True,
http_trace: Optional[aiohttp.TraceConfig] = None, http_trace: Optional[aiohttp.TraceConfig] = None,
max_ratelimit_timeout: Optional[float] = None, max_ratelimit_timeout: Optional[float] = None,
) -> None: ) -> None:
@ -535,12 +546,11 @@ class HTTPClient:
# one shot requests that don't have a bucket hash # one shot requests that don't have a bucket hash
# When this reaches 256 elements, it will try to evict based off of expiry # When this reaches 256 elements, it will try to evict based off of expiry
self._buckets: Dict[str, Ratelimit] = {} self._buckets: Dict[str, Ratelimit] = {}
self._global_over: asyncio.Event = MISSING self.global_reset_at: float = 0.0
self.token: Optional[str] = None self.token: Optional[str] = None
self.proxy: Optional[str] = proxy self.proxy: Optional[str] = proxy
self.proxy_auth: Optional[aiohttp.BasicAuth] = proxy_auth self.proxy_auth: Optional[aiohttp.BasicAuth] = proxy_auth
self.http_trace: Optional[aiohttp.TraceConfig] = http_trace self.http_trace: Optional[aiohttp.TraceConfig] = http_trace
self.use_clock: bool = not unsync_clock
self.max_ratelimit_timeout: Optional[float] = max(30.0, max_ratelimit_timeout) if max_ratelimit_timeout else None self.max_ratelimit_timeout: Optional[float] = max(30.0, max_ratelimit_timeout) if max_ratelimit_timeout else None
user_agent = 'DiscordBot (https://github.com/Rapptz/discord.py {0}) Python/{1[0]}.{1[1]} aiohttp/{2}' user_agent = 'DiscordBot (https://github.com/Rapptz/discord.py {0}) Python/{1[0]}.{1[1]} aiohttp/{2}'
@ -577,7 +587,7 @@ class HTTPClient:
try: try:
value = self._buckets[key] value = self._buckets[key]
except KeyError: except KeyError:
self._buckets[key] = value = Ratelimit(self.max_ratelimit_timeout) self._buckets[key] = value = Ratelimit(self, key)
self._try_clear_expired_ratelimits() self._try_clear_expired_ratelimits()
return value return value
@ -631,14 +641,11 @@ class HTTPClient:
if self.proxy_auth is not None: if self.proxy_auth is not None:
kwargs['proxy_auth'] = self.proxy_auth kwargs['proxy_auth'] = self.proxy_auth
if not self._global_over.is_set():
# wait until the global lock is complete
await self._global_over.wait()
response: Optional[aiohttp.ClientResponse] = None response: Optional[aiohttp.ClientResponse] = None
data: Optional[Union[Dict[str, Any], str]] = None data: Optional[Union[Dict[str, Any], str]] = None
async with ratelimit:
for tries in range(5): for tries in range(5):
async with ratelimit:
if files: if files:
for f in files: for f in files:
f.reset(seek=tries) f.reset(seek=tries)
@ -654,116 +661,76 @@ class HTTPClient:
async with self.__session.request(method, url, **kwargs) as response: async with self.__session.request(method, url, **kwargs) as response:
_log.debug('%s %s with %s has returned %s', method, url, kwargs.get('data'), response.status) _log.debug('%s %s with %s has returned %s', method, url, kwargs.get('data'), response.status)
# even errors have text involved in them so this is safe to call # Errors have text involved in them so this is safe to call
data = await json_or_text(response) data = await json_or_text(response)
# Update and use rate limit information if the bucket header is present # Endpoint has ratelimit headers
discord_hash = response.headers.get('X-Ratelimit-Bucket') new_bucket_hash = response.headers.get('X-Ratelimit-Bucket')
# I am unsure if X-Ratelimit-Bucket is always available if new_bucket_hash:
# However, X-Ratelimit-Remaining has been a consistent cornerstone that worked # Ratelimit headers are up to date and relevant
has_ratelimit_headers = 'X-Ratelimit-Remaining' in response.headers if ratelimit.update(response, data):
if discord_hash is not None: # Adjust key if the bucket has changed. Either encountered a sub-ratelimit
# If the hash Discord has provided is somehow different from our current hash something changed # or Discord just wants to change ratelimit values for an update probably.
if bucket_hash != discord_hash: if new_bucket_hash != bucket_hash:
if bucket_hash is not None: if bucket_hash is not None:
# If the previous hash was an actual Discord hash then this means the
# hash has changed sporadically.
# This can be due to two reasons
# 1. It's a sub-ratelimit which is hard to handle
# 2. The rate limit information genuinely changed
# There is no good way to discern these, Discord doesn't provide a way to do so.
# At best, there will be some form of logging to help catch it.
# Alternating sub-ratelimits means that the requests oscillate between
# different underlying rate limits -- this can lead to unexpected 429s
# It is unavoidable.
fmt = 'A route (%s) has changed hashes: %s -> %s.' fmt = 'A route (%s) has changed hashes: %s -> %s.'
_log.debug(fmt, route_key, bucket_hash, discord_hash) _log.debug(fmt, route_key, bucket_hash, new_bucket_hash)
self._bucket_hashes[route_key] = discord_hash
self._buckets[f'{discord_hash}:{route.major_parameters}'] = ratelimit
self._buckets.pop(key, None) self._buckets.pop(key, None)
elif route_key not in self._bucket_hashes: elif route_key not in self._bucket_hashes:
fmt = '%s has found its initial rate limit bucket hash (%s).' fmt = '%s has found its initial rate limit bucket hash (%s).'
_log.debug(fmt, route_key, discord_hash) _log.debug(fmt, route_key, new_bucket_hash)
self._bucket_hashes[route_key] = discord_hash self._bucket_hashes[route_key] = new_bucket_hash
self._buckets[f'{discord_hash}:{route.major_parameters}'] = ratelimit ratelimit.key = f'{new_bucket_hash}:{route.major_parameters}'
self._buckets[ratelimit.key] = ratelimit
# Global rate limit 429 wont have ratelimit headers (also can't tell if it's one-shot)
elif response.headers.get('X-RateLimit-Global'):
retry_after: float = data['retry_after'] # type: ignore
_log.warning('Global rate limit has been hit. Retrying in %.2f seconds.', retry_after)
self.global_reset_at = self.loop.time() + retry_after
if has_ratelimit_headers: # Endpoint does not have ratelimit headers; it's one-shot.
if response.status != 429: elif not ratelimit.one_shot:
ratelimit.update(response, use_clock=self.use_clock) ratelimit.no_headers()
if ratelimit.remaining == 0:
_log.debug(
'A rate limit bucket (%s) has been exhausted. Pre-emptively rate limiting...',
discord_hash or route_key,
)
# the request was successful so just return the text/json # The request was successful so just return the text/json
if 300 > response.status >= 200: if 300 > response.status >= 200:
_log.debug('%s %s has received %s', method, url, data) _log.debug('%s %s has received %s', method, url, data)
return data return data
# we are being rate limited # We are being ratelimited
if response.status == 429: elif response.status == 429:
if not response.headers.get('Via') or isinstance(data, str): retry_after: float = float(response.headers['Retry-After']) # only in headers for cf ban
# Banned by Cloudflare more than likely.
# Hit Cloudflare ban for too many invalid requests (10,000 per 10 minutes)
# An invalid HTTP request is 401, 403, or 429 (excluding "shared" scope).
# This ban is the only one that is IP based, and not just token based.
if not response.headers.get('Via'):
fmt = 'We are Cloudflare banned for %.2f seconds.'
_log.warning(fmt, method, url, retry_after)
raise HTTPException(response, data) raise HTTPException(response, data)
if ratelimit.remaining > 0: elif self.max_ratelimit_timeout and retry_after > self.max_ratelimit_timeout:
# According to night fmt = 'We are being rate limited. %s %s responded with 429. Timeout of %.2f was too long, erroring instead.'
# https://github.com/discord/discord-api-docs/issues/2190#issuecomment-816363129 _log.warning(fmt, method, url, retry_after)
# Remaining > 0 and 429 means that a sub ratelimit was hit.
# It is unclear what should happen in these cases other than just using the retry_after
# value in the body.
_log.debug(
'%s %s received a 429 despite having %s remaining requests. This is a sub-ratelimit.',
method,
url,
ratelimit.remaining,
)
retry_after: float = data['retry_after']
if self.max_ratelimit_timeout and retry_after > self.max_ratelimit_timeout:
_log.warning(
'We are being rate limited. %s %s responded with 429. Timeout of %.2f was too long, erroring instead.',
method,
url,
retry_after,
)
raise RateLimited(retry_after) raise RateLimited(retry_after)
elif ratelimit.remaining > 0:
# According to night, remaining > 0 and 429 means that a sub ratelimit was hit.
# https://github.com/discord/discord-api-docs/issues/2190#issuecomment-816363129
fmt = '%s %s received a 429 despite having %s remaining requests. This is a sub-ratelimit.'
_log.debug(fmt, method, url, ratelimit.remaining)
fmt = 'We are being rate limited. %s %s responded with 429. Retrying in %.2f seconds.' fmt = 'We are being rate limited. %s %s responded with 429. Retrying in %.2f seconds.'
_log.warning(fmt, method, url, retry_after) _log.warning(fmt, method, url, retry_after)
_log.debug( # We've received a 500, 502, 504, or 524, unconditional retry
'Rate limit is being handled by bucket hash %s with %r major parameters', elif response.status in {500, 502, 504, 524} and tries < 4:
bucket_hash,
route.major_parameters,
)
# check if it's a global rate limit
is_global = data.get('global', False)
if is_global:
_log.warning('Global rate limit has been hit. Retrying in %.2f seconds.', retry_after)
self._global_over.clear()
await asyncio.sleep(retry_after)
_log.debug('Done sleeping for the rate limit. Retrying...')
# release the global lock now that the
# global rate limit has passed
if is_global:
self._global_over.set()
_log.debug('Global rate limit is now over.')
continue
# we've received a 500, 502, 504, or 524, unconditional retry
if response.status in {500, 502, 504, 524}:
await asyncio.sleep(1 + tries * 2) await asyncio.sleep(1 + tries * 2)
continue continue
# the usual error cases # The usual error cases
if response.status == 403: elif response.status == 403:
raise Forbidden(response, data) raise Forbidden(response, data)
elif response.status == 404: elif response.status == 404:
raise NotFound(response, data) raise NotFound(response, data)
@ -829,8 +796,6 @@ class HTTPClient:
trace_configs=None if self.http_trace is None else [self.http_trace], trace_configs=None if self.http_trace is None else [self.http_trace],
cookie_jar=aiohttp.DummyCookieJar(), cookie_jar=aiohttp.DummyCookieJar(),
) )
self._global_over = asyncio.Event()
self._global_over.set()
old_token = self.token old_token = self.token
self.token = token self.token = token

Loading…
Cancel
Save