Browse Source

Update context/super property management

pull/10109/head
dolfies 2 years ago
parent
commit
527329be61
  1. 110
      discord/http.py
  2. 238
      discord/tracking.py
  3. 27
      discord/utils.py

110
discord/http.py

@ -25,8 +25,6 @@ DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import asyncio
from base64 import b64encode
import json
import logging
from random import choice, choices
import string
@ -527,7 +525,6 @@ class HTTPClient:
self.captcha_handler: Optional[CaptchaHandler] = captcha_handler
self.max_ratelimit_timeout: Optional[float] = max(30.0, max_ratelimit_timeout) if max_ratelimit_timeout else None
self.user_agent: str = MISSING
self.super_properties: Dict[str, Any] = {}
self.encoded_super_properties: str = MISSING
self._started: bool = False
@ -552,26 +549,8 @@ class HTTPClient:
connector=self.connector,
trace_configs=None if self.http_trace is None else [self.http_trace],
)
self.user_agent, self.browser_version, self.client_build_number = ua, bv, bn = await utils._get_info(session)
_log.info('Found user agent %s (%s), build number %s.', ua, bv, bn)
self.super_properties = sp = {
'os': 'Windows',
'browser': 'Chrome',
'device': '',
'browser_user_agent': ua,
'browser_version': bv,
'os_version': '10',
'referrer': '',
'referring_domain': '',
'referrer_current': '',
'referring_domain_current': '',
'release_channel': 'stable',
'system_locale': 'en-US',
'client_build_number': bn,
'client_event_source': None,
'design_id': 0,
}
self.encoded_super_properties = b64encode(json.dumps(sp).encode()).decode('utf-8')
self.super_properties, self.encoded_super_properties = sp, _ = await utils._get_info(session)
_log.info('Found user agent %s, build number %s.', sp.get('browser_user_agent'), sp.get('client_build_number'))
if self.captcha_handler is not None:
await self.captcha_handler.startup()
@ -605,6 +584,14 @@ class HTTPClient:
return await self.__session.ws_connect(url, **kwargs)
@property
def browser_version(self) -> str:
return self.super_properties['browser_version']
@property
def user_agent(self) -> str:
return self.super_properties['browser_user_agent']
def _try_clear_expired_ratelimits(self) -> None:
if len(self._buckets) < 256:
return
@ -1009,7 +996,7 @@ class HTTPClient:
payload = {
'recipients': recipients,
}
props = ContextProperties._from_new_group_dm() # New Group DM button
props = ContextProperties.from_new_group_dm() # New Group DM button
return self.request(Route('POST', '/users/@me/channels'), json=payload, context_properties=props)
@ -1017,9 +1004,12 @@ class HTTPClient:
payload = None
if nick:
payload = {'nick': nick}
props = ContextProperties.from_add_friends_to_dm()
return self.request(
Route('PUT', '/channels/{channel_id}/recipients/{user_id}', channel_id=channel_id, user_id=user_id), json=payload
Route('PUT', '/channels/{channel_id}/recipients/{user_id}', channel_id=channel_id, user_id=user_id),
json=payload,
context_properties=props,
)
def remove_group_recipient(self, channel_id: Snowflake, user_id: Snowflake) -> Response[None]:
@ -1034,7 +1024,7 @@ class HTTPClient:
payload = {
'recipients': [user_id],
}
props = ContextProperties._empty() # {}
props = ContextProperties.empty() # {}
return self.request(Route('POST', '/users/@me/channels'), json=payload, context_properties=props)
@ -1668,7 +1658,7 @@ class HTTPClient:
params['location'] = 'Guild%20Discovery'
if location is not MISSING:
params['location'] = location
props = ContextProperties._empty() if lurker else ContextProperties._from_lurking()
props = ContextProperties.empty() if lurker else ContextProperties.from_lurking()
return self.request(
Route('PUT', '/guilds/{guild_id}/members/@me', guild_id=guild_id),
@ -2092,7 +2082,7 @@ class HTTPClient:
message: Message = MISSING,
): # TODO: response type
if message is not MISSING: # Invite Button Embed
props = ContextProperties._from_invite_embed(
props = ContextProperties.from_invite_button_embed(
guild_id=getattr(message.guild, 'id', None),
channel_id=message.channel.id,
channel_type=getattr(message.channel, 'type', None),
@ -2101,12 +2091,12 @@ class HTTPClient:
elif type is InviteType.guild or type is InviteType.group_dm: # Join Guild, Accept Invite Page
props = choice(
(
ContextProperties._from_accept_invite_page,
ContextProperties._from_join_guild_popup,
ContextProperties.from_accept_invite_page,
ContextProperties.from_join_guild,
)
)(guild_id=guild_id, channel_id=channel_id, channel_type=channel_type)
else: # Accept Invite Page
props = ContextProperties._from_accept_invite_page(
props = ContextProperties.from_accept_invite_page(
guild_id=guild_id, channel_id=channel_id, channel_type=channel_type
)
return self.request(Route('POST', '/invites/{invite_id}', invite_id=invite_id), context_properties=props, json={})
@ -2140,8 +2130,8 @@ class HTTPClient:
payload['target_application_id'] = str(target_application_id)
props = choice(
(
ContextProperties._from_guild_header,
ContextProperties._from_context_menu,
ContextProperties.from_guild_header,
ContextProperties.from_context_menu,
)
)()
@ -2156,14 +2146,14 @@ class HTTPClient:
payload = {
'max_age': max_age,
}
props = ContextProperties._from_group_dm_invite()
props = ContextProperties.from_group_dm_invite_create()
return self.request(
Route('POST', '/channels/{channel_id}/invites', channel_id=channel_id), json=payload, context_properties=props
)
def create_friend_invite(self) -> Response[invite.Invite]:
return self.request(Route('POST', '/users/@me/invites'), json={}, context_properties=ContextProperties._empty())
return self.request(Route('POST', '/users/@me/invites'), json={}, context_properties=ContextProperties.empty())
def get_invite(
self,
@ -2192,13 +2182,13 @@ class HTTPClient:
return self.request(Route('GET', '/channels/{channel_id}/invites', channel_id=channel_id))
def get_friend_invites(self) -> Response[List[invite.Invite]]:
return self.request(Route('GET', '/users/@me/invites'), context_properties=ContextProperties._empty())
return self.request(Route('GET', '/users/@me/invites'), context_properties=ContextProperties.empty())
def delete_invite(self, invite_id: str, *, reason: Optional[str] = None) -> Response[invite.Invite]:
return self.request(Route('DELETE', '/invites/{invite_id}', invite_id=invite_id), reason=reason)
def delete_friend_invites(self) -> Response[List[invite.Invite]]:
return self.request(Route('DELETE', '/users/@me/invites'), context_properties=ContextProperties._empty())
return self.request(Route('DELETE', '/users/@me/invites'), context_properties=ContextProperties.empty())
# Role management
@ -2560,9 +2550,9 @@ class HTTPClient:
if action is RelationshipAction.deny_request: # User Profile, Friends, DM Channel
props = choice(
(
ContextProperties._from_friends_page,
ContextProperties._from_user_profile,
ContextProperties._from_dm_channel,
ContextProperties.from_friends,
ContextProperties.from_user_profile,
ContextProperties.from_dm_channel,
)
)()
elif action in (
@ -2571,16 +2561,16 @@ class HTTPClient:
): # Friends, ContextMenu, User Profile, DM Channel
props = choice(
(
ContextProperties._from_contextmenu,
ContextProperties._from_user_profile,
ContextProperties._from_friends_page,
ContextProperties._from_dm_channel,
ContextProperties.from_contextmenu,
ContextProperties.from_user_profile,
ContextProperties.from_friends,
ContextProperties.from_dm_channel,
)
)()
elif action == RelationshipAction.remove_pending_request: # Friends
props = ContextProperties._from_friends_page()
props = ContextProperties.from_friends()
else:
props = ContextProperties._empty()
props = ContextProperties.empty()
return self.request(r, context_properties=props)
@ -2591,36 +2581,36 @@ class HTTPClient:
if action is RelationshipAction.accept_request: # User Profile, Friends, DM Channel
props = choice(
(
ContextProperties._from_friends_page,
ContextProperties._from_user_profile,
ContextProperties._from_dm_channel,
ContextProperties.from_friends,
ContextProperties.from_user_profile,
ContextProperties.from_dm_channel,
)
)()
elif action is RelationshipAction.block: # Friends, ContextMenu, User Profile, DM Channel.
props = choice(
(
ContextProperties._from_contextmenu,
ContextProperties._from_user_profile,
ContextProperties._from_friends_page,
ContextProperties._from_dm_channel,
ContextProperties.from_contextmenu,
ContextProperties.from_user_profile,
ContextProperties.from_friends,
ContextProperties.from_dm_channel,
)
)()
elif action is RelationshipAction.send_friend_request: # ContextMenu, User Profile, DM Channel
props = choice(
(
ContextProperties._from_contextmenu,
ContextProperties._from_user_profile,
ContextProperties._from_dm_channel,
ContextProperties.from_contextmenu,
ContextProperties.from_user_profile,
ContextProperties.from_dm_channel,
)
)()
else:
props = ContextProperties._empty()
props = ContextProperties.empty()
return self.request(r, context_properties=props, json={'type': type} if type else None)
def send_friend_request(self, username: str, discriminator: Snowflake) -> Response[None]:
r = Route('POST', '/users/@me/relationships')
props = choice((ContextProperties._from_add_friend_page, ContextProperties._from_group_dm))() # Friends, Group DM
props = choice((ContextProperties.from_add_friend, ContextProperties.from_group_dm))() # Friends, Group DM
payload = {'username': username, 'discriminator': int(discriminator)}
return self.request(r, json=payload, context_properties=props)
@ -3453,7 +3443,7 @@ class HTTPClient:
return self.request(
Route('GET', '/store/skus/{sku_id}/purchase', sku_id=sku_id),
params=params,
context_properties=ContextProperties._empty(),
context_properties=ContextProperties.empty(),
)
def purchase_sku(
@ -3496,7 +3486,7 @@ class HTTPClient:
return self.request(
Route('POST', '/store/skus/{sku_id}/purchase', sku_id=sku_id),
json=payload,
context_properties=ContextProperties._empty(),
context_properties=ContextProperties.empty(),
)
def create_sku_discount(self, sku_id: Snowflake, user_id: Snowflake, percent_off: int, ttl: int = 600) -> Response[None]:

238
discord/tracking.py

@ -28,7 +28,7 @@ from base64 import b64encode
import json
from random import choice
from typing import Dict, Optional, TYPE_CHECKING
from typing import Any, Callable, Dict, Optional, Tuple, TYPE_CHECKING
from .utils import MISSING
@ -45,178 +45,111 @@ __all__ = (
# fmt: on
class ContextProperties: # Thank you Discord-S.C.U.M
"""Represents the Discord X-Context-Properties header.
class ContextPropertiesMeta(type):
if TYPE_CHECKING:
This header is essential for certain actions (e.g. joining guilds, friend requesting).
def __getattribute__(self, name: str) -> Callable[[], Self]:
...
.. versionadded:: 1.9
def __new__(cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]):
cls = super().__new__(cls, name, bases, attrs)
locations = attrs.get('LOCATIONS', {})
sources = attrs.get('SOURCES', {})
.. container:: operations
def build_location(location: str) -> classmethod:
def f(cls) -> Self:
data = {'location': location}
return cls(data)
.. describe:: x == y
return classmethod(f)
Checks if two context properties are equal.
def build_source(source: str) -> classmethod:
def f(cls) -> Self:
data = {'source': source}
return cls(data)
.. describe:: x != y
return classmethod(f)
Checks if two context properties are not equal.
for location in locations:
if location:
setattr(cls, f'from_{location.lower().replace(" ", "_").replace("/", "")}', build_location(location))
.. describe:: hash(x)
for source in sources:
if source:
setattr(cls, f'from_{source.lower().replace(" ", "_")}', build_source(source))
Return the context property's hash.
return cls
.. describe:: str(x)
Returns the context property's name.
class ContextProperties(metaclass=ContextPropertiesMeta):
"""Represents the Discord X-Context-Properties header.
Attributes
----------
value: :class:`str`
The encoded header value.
This header is essential for certain actions (e.g. joining guilds, friend requesting).
"""
__slots__ = ('_data', 'value')
def __init__(self, data) -> None:
__slots__ = ('_data',)
LOCATIONS = {
None: 'e30=',
'Friends': 'eyJsb2NhdGlvbiI6IkZyaWVuZHMifQ==',
'ContextMenu': 'eyJsb2NhdGlvbiI6IkNvbnRleHRNZW51In0=',
'Context Menu': 'eyJsb2NhdGlvbiI6IkNvbnRleHQgTWVudSJ9',
'User Profile': 'eyJsb2NhdGlvbiI6IlVzZXIgUHJvZmlsZSJ9',
'Add Friend': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmQifQ==',
'Guild Header': 'eyJsb2NhdGlvbiI6Ikd1aWxkIEhlYWRlciJ9',
'Group DM': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIn0=',
'DM Channel': 'eyJsb2NhdGlvbiI6IkRNIENoYW5uZWwifQ==',
'/app': 'eyJsb2NhdGlvbiI6ICIvYXBwIn0=',
'Login': 'eyJsb2NhdGlvbiI6IkxvZ2luIn0=',
'Register': 'eyJsb2NhdGlvbiI6IlJlZ2lzdGVyIn0=',
'Verify Email': 'eyJsb2NhdGlvbiI6IlZlcmlmeSBFbWFpbCJ9',
'New Group DM': 'eyJsb2NhdGlvbiI6Ik5ldyBHcm91cCBETSJ9',
'Add Friends to DM': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmRzIHRvIERNIn0=',
'Group DM Invite Create': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIEludml0ZSBDcmVhdGUifQ==',
'Stage Channel': 'eyJsb2NhdGlvbiI6IlN0YWdlIENoYW5uZWwifQ==',
}
SOURCES = {
None: 'e30=',
'Chat Input Blocker - Lurker Mode': 'eyJzb3VyY2UiOiJDaGF0IElucHV0IEJsb2NrZXIgLSBMdXJrZXIgTW9kZSJ9',
'Notice - Lurker Mode': 'eyJzb3VyY2UiOiJOb3RpY2UgLSBMdXJrZXIgTW9kZSJ9',
}
def __init__(self, data: dict) -> None:
self._data: Dict[str, Snowflake] = data
self.value: str = self._encode_data(data)
def _encode_data(self, data) -> str:
library = {
None: 'e30=',
# Locations
'Friends': 'eyJsb2NhdGlvbiI6IkZyaWVuZHMifQ==',
'ContextMenu': 'eyJsb2NhdGlvbiI6IkNvbnRleHRNZW51In0=',
'Context Menu': 'eyJsb2NhdGlvbiI6IkNvbnRleHQgTWVudSJ9',
'User Profile': 'eyJsb2NhdGlvbiI6IlVzZXIgUHJvZmlsZSJ9',
'Add Friend': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmQifQ==',
'Guild Header': 'eyJsb2NhdGlvbiI6Ikd1aWxkIEhlYWRlciJ9',
'Group DM': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIn0=',
'DM Channel': 'eyJsb2NhdGlvbiI6IkRNIENoYW5uZWwifQ==',
'/app': 'eyJsb2NhdGlvbiI6ICIvYXBwIn0=',
'Login': 'eyJsb2NhdGlvbiI6IkxvZ2luIn0=',
'Register': 'eyJsb2NhdGlvbiI6IlJlZ2lzdGVyIn0=',
'Verify Email': 'eyJsb2NhdGlvbiI6IlZlcmlmeSBFbWFpbCJ9',
'New Group DM': 'eyJsb2NhdGlvbiI6Ik5ldyBHcm91cCBETSJ9',
'Add Friends to DM': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmRzIHRvIERNIn0=',
'Group DM Invite Create': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIEludml0ZSBDcmVhdGUifQ==',
'Stage Channel': 'eyJsb2NhdGlvbiI6IlN0YWdlIENoYW5uZWwifQ==',
# Sources
'Chat Input Blocker - Lurker Mode': 'eyJzb3VyY2UiOiJDaGF0IElucHV0IEJsb2NrZXIgLSBMdXJrZXIgTW9kZSJ9',
'Notice - Lurker Mode': 'eyJzb3VyY2UiOiJOb3RpY2UgLSBMdXJrZXIgTW9kZSJ9',
}
def _encode_data(self) -> str:
try:
return library[self.target]
target = self.target
return self.LOCATIONS.get(target, self.SOURCES[target])
except KeyError:
return b64encode(json.dumps(data, separators=(',', ':')).encode()).decode('utf-8')
return b64encode(json.dumps(self._data, separators=(',', ':')).encode()).decode('utf-8')
@classmethod
def _empty(cls) -> Self:
def empty(cls) -> Self:
return cls({})
@classmethod
def _from_friends_page(cls) -> Self:
data = {'location': 'Friends'}
return cls(data)
@classmethod
def _from_contextmenu(cls) -> Self:
data = {'location': 'ContextMenu'}
return cls(data)
@classmethod
def _from_context_menu(cls) -> Self:
data = {'location': 'Context Menu'}
return cls(data)
@classmethod
def _from_user_profile(cls) -> Self:
data = {'location': 'User Profile'}
return cls(data)
@classmethod
def _from_add_friend_page(cls) -> Self:
data = {'location': 'Add Friend'}
return cls(data)
@classmethod
def _from_guild_header(cls) -> Self:
data = {'location': 'Guild Header'}
return cls(data)
@classmethod
def _from_group_dm(cls) -> Self:
data = {'location': 'Group DM'}
return cls(data)
@classmethod
def _from_new_group_dm(cls) -> Self:
data = {'location': 'New Group DM'}
return cls(data)
@classmethod
def _from_dm_channel(cls) -> Self:
data = {'location': 'DM Channel'}
return cls(data)
@classmethod
def _from_add_to_dm(cls) -> Self:
data = {'location': 'Add Friends to DM'}
return cls(data)
@classmethod
def _from_group_dm_invite(cls) -> Self:
data = {'location': 'Group DM Invite Create'}
return cls(data)
@classmethod
def _from_app(cls) -> Self:
data = {'location': '/app'}
return cls(data)
@classmethod
def _from_login(cls) -> Self:
data = {'location': 'Login'}
return cls(data)
@classmethod
def _from_register(cls) -> Self:
data = {'location': 'Register'}
return cls(data)
@classmethod
def _from_verification(cls) -> Self:
data = {'location': 'Verify Email'}
return cls(data)
@classmethod
def _from_stage_channel(cls) -> Self:
data = {'location': 'Stage Channel'}
return cls(data)
@classmethod
def _from_accept_invite_page(
def from_accept_invite_page(
cls,
*,
guild_id: Snowflake = MISSING,
channel_id: Snowflake = MISSING,
channel_type: ChannelType = MISSING,
guild_id: Optional[Snowflake] = None,
channel_id: Optional[Snowflake] = None,
channel_type: Optional[ChannelType] = None,
) -> Self:
data: Dict[str, Snowflake] = {
'location': 'Accept Invite Page',
}
if guild_id is not MISSING:
if guild_id:
data['location_guild_id'] = str(guild_id)
if channel_id is not MISSING:
if channel_id:
data['location_channel_id'] = str(channel_id)
if channel_type is not MISSING:
if channel_type:
data['location_channel_type'] = int(channel_type)
return cls(data)
@classmethod
def _from_join_guild_popup(
def from_join_guild(
cls,
*,
guild_id: Snowflake = MISSING,
@ -235,7 +168,7 @@ class ContextProperties: # Thank you Discord-S.C.U.M
return cls(data)
@classmethod
def _from_invite_embed(
def from_invite_button_embed(
cls,
*,
guild_id: Optional[Snowflake],
@ -253,7 +186,7 @@ class ContextProperties: # Thank you Discord-S.C.U.M
return cls(data)
@classmethod
def _from_lurking(cls, source: str = MISSING) -> Self:
def from_lurking(cls, source: str = MISSING) -> Self:
data = {'source': source or choice(('Chat Input Blocker - Lurker Mode', 'Notice - Lurker Mode'))}
return cls(data)
@ -262,26 +195,8 @@ class ContextProperties: # Thank you Discord-S.C.U.M
return self._data.get('location', self._data.get('source')) # type: ignore
@property
def guild_id(self) -> Optional[int]:
data = self._data.get('location_guild_id')
if data is not None:
return int(data)
@property
def channel_id(self) -> Optional[int]:
data = self._data.get('location_channel_id')
if data is not None:
return int(data)
@property
def channel_type(self) -> Optional[int]:
return self._data.get('location_channel_type') # type: ignore
@property
def message_id(self) -> Optional[int]:
data = self._data.get('location_message_id')
if data is not None:
return int(data)
def value(self) -> str:
return self._encode_data()
def __str__(self) -> str:
return self.target or 'None'
@ -296,6 +211,3 @@ class ContextProperties: # Thank you Discord-S.C.U.M
if isinstance(other, ContextProperties):
return self.value != other.value
return True
def __hash__(self) -> int:
return hash(self.value)

27
discord/utils.py

@ -1431,19 +1431,38 @@ class ExpiringString(collections.UserString):
self._timer.cancel()
async def _get_info(session: ClientSession) -> Tuple[str, str, int]:
async def _get_info(session: ClientSession) -> Tuple[Dict[str, Any], str]:
for _ in range(3):
try:
async with session.get('https://cordapi.dolfi.es/api/v1/properties/web', timeout=5) as resp:
async with session.post('https://cordapi.dolfi.es/api/v2/properties/web', timeout=5) as resp:
json = await resp.json()
return json['chrome_user_agent'], json['chrome_version'], json['client_build_number']
return json['properties'], json['encoded']
except Exception:
continue
_log.warning('Info API down. Falling back to manual fetching...')
ua = await _get_user_agent(session)
bn = await _get_build_number(session)
bv = _get_browser_version(ua)
return ua, bv, bn
properties = {
'os': 'Windows',
'browser': 'Chrome',
'device': '',
'browser_user_agent': ua,
'browser_version': bv,
'os_version': '10',
'referrer': '',
'referring_domain': '',
'referrer_current': '',
'referring_domain_current': '',
'release_channel': 'stable',
'system_locale': 'en-US',
'client_build_number': bn,
'client_event_source': None,
'design_id': 0,
}
return properties, b64encode(_to_json(properties).encode()).decode('utf-8')
async def _get_build_number(session: ClientSession) -> int: # Thank you Discord-S.C.U.M

Loading…
Cancel
Save