|
|
@ -62,20 +62,15 @@ from inspect import isawaitable as _isawaitable, signature as _signature |
|
|
|
from operator import attrgetter |
|
|
|
import json |
|
|
|
import logging |
|
|
|
import os |
|
|
|
import platform |
|
|
|
import random |
|
|
|
import re |
|
|
|
import string |
|
|
|
import subprocess |
|
|
|
import sys |
|
|
|
import tempfile |
|
|
|
from threading import Timer |
|
|
|
import types |
|
|
|
import warnings |
|
|
|
|
|
|
|
import yarl |
|
|
|
from .enums import BrowserEnum |
|
|
|
|
|
|
|
try: |
|
|
|
import orjson # type: ignore |
|
|
@ -1168,7 +1163,11 @@ def format_dt(dt: datetime.datetime, /, style: Optional[TimestampStyle] = None) |
|
|
|
|
|
|
|
|
|
|
|
def set_target( |
|
|
|
items: Iterable[ApplicationCommand], *, channel: Messageable = None, message: Message = None, user: Snowflake = None |
|
|
|
items: Iterable[ApplicationCommand], |
|
|
|
*, |
|
|
|
channel: Optional[Messageable] = MISSING, |
|
|
|
message: Optional[Message] = MISSING, |
|
|
|
user: Optional[Snowflake] = MISSING, |
|
|
|
) -> None: |
|
|
|
"""A helper function to set the target for a list of items. |
|
|
|
|
|
|
@ -1181,24 +1180,26 @@ def set_target( |
|
|
|
----------- |
|
|
|
items: Iterable[:class:`ApplicationCommand`] |
|
|
|
A list of items to set the target for. |
|
|
|
channel: :class:`Messageable` |
|
|
|
channel: Optional[:class:`Messageable`] |
|
|
|
The channel to target. |
|
|
|
message: :class:`Message` |
|
|
|
message: Optional[:class:`Message`] |
|
|
|
The message to target. |
|
|
|
user: :class:`Snowflake` |
|
|
|
user: Optional[:class:`~abc.Snowflake`] |
|
|
|
The user to target. |
|
|
|
""" |
|
|
|
attrs = { |
|
|
|
'target_channel': channel, |
|
|
|
'target_message': message, |
|
|
|
'target_user': user, |
|
|
|
} |
|
|
|
attrs = {} |
|
|
|
if channel is not MISSING: |
|
|
|
attrs['target_channel'] = channel |
|
|
|
if message is not MISSING: |
|
|
|
attrs['target_message'] = message |
|
|
|
if user is not MISSING: |
|
|
|
attrs['target_user'] = user |
|
|
|
|
|
|
|
for item in items: |
|
|
|
for k, v in attrs.items(): |
|
|
|
if v is not None: |
|
|
|
try: |
|
|
|
setattr(item, k, v) # type: ignore |
|
|
|
setattr(item, k, v) |
|
|
|
except AttributeError: |
|
|
|
pass |
|
|
|
|
|
|
@ -1207,39 +1208,6 @@ def _generate_session_id() -> str: |
|
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=16)) |
|
|
|
|
|
|
|
|
|
|
|
class ExpiringQueue(asyncio.Queue): # Inspired from https://github.com/NoahCardoza/CaptchaHarvester |
|
|
|
def __init__(self, timeout: int, maxsize: int = 0) -> None: |
|
|
|
super().__init__(maxsize) |
|
|
|
self.timeout = timeout |
|
|
|
self.timers: asyncio.Queue = asyncio.Queue() |
|
|
|
|
|
|
|
async def put(self, item: str) -> None: |
|
|
|
thread: Timer = Timer(self.timeout, self.expire) |
|
|
|
thread.start() |
|
|
|
await self.timers.put(thread) |
|
|
|
await super().put(item) |
|
|
|
|
|
|
|
async def get(self, block: bool = True) -> str: |
|
|
|
if block: |
|
|
|
thread = await self.timers.get() |
|
|
|
else: |
|
|
|
thread = self.timers.get_nowait() |
|
|
|
thread.cancel() |
|
|
|
if block: |
|
|
|
return await super().get() |
|
|
|
else: |
|
|
|
return self.get_nowait() |
|
|
|
|
|
|
|
def expire(self) -> None: |
|
|
|
try: |
|
|
|
self._queue.popleft() |
|
|
|
except: |
|
|
|
pass |
|
|
|
|
|
|
|
def to_list(self) -> List[str]: |
|
|
|
return list(self._queue) |
|
|
|
|
|
|
|
|
|
|
|
class ExpiringString(collections.UserString): |
|
|
|
def __init__(self, data: str, timeout: int) -> None: |
|
|
|
super().__init__(data) |
|
|
@ -1263,133 +1231,6 @@ class ExpiringString(collections.UserString): |
|
|
|
self._timer.cancel() |
|
|
|
|
|
|
|
|
|
|
|
class Browser: # Inspired from https://github.com/NoahCardoza/CaptchaHarvester |
|
|
|
def __init__(self, browser: Union[BrowserEnum, str] = None) -> None: |
|
|
|
if isinstance(browser, (BrowserEnum, type(None))): |
|
|
|
try: |
|
|
|
browser = self.get_browser(browser) |
|
|
|
except Exception: |
|
|
|
raise RuntimeError('Could not find browser. Please pass browser path manually.') |
|
|
|
|
|
|
|
if browser is None: |
|
|
|
raise RuntimeError('Could not find browser. Please pass browser path manually.') |
|
|
|
|
|
|
|
self.browser: str = browser |
|
|
|
self.proc: subprocess.Popen = MISSING |
|
|
|
|
|
|
|
def get_mac_browser(pkg: str, binary: str) -> Optional[os.PathLike]: |
|
|
|
import plistlib as plist |
|
|
|
|
|
|
|
pfile: str = f'{os.environ["HOME"]}/Library/Preferences/{pkg}.plist' |
|
|
|
if os.path.exists(pfile): |
|
|
|
with open(pfile, 'rb') as f: |
|
|
|
binary_path: Optional[str] = plist.load(f).get('LastRunAppBundlePath') |
|
|
|
if binary_path is not None: |
|
|
|
return os.path.join(binary_path, 'Contents', 'MacOS', binary) |
|
|
|
|
|
|
|
def get_windows_browser(browser: str) -> Optional[str]: |
|
|
|
import winreg as reg |
|
|
|
|
|
|
|
reg_path: str = f'SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\{browser}.exe' |
|
|
|
exe_path: Optional[str] = None |
|
|
|
for install_type in reg.HKEY_CURRENT_USER, reg.HKEY_LOCAL_MACHINE: |
|
|
|
try: |
|
|
|
reg_key: str = reg.OpenKey(install_type, reg_path, 0, reg.KEY_READ) |
|
|
|
exe_path: Optional[str] = reg.QueryValue(reg_key, None) |
|
|
|
reg_key.Close() |
|
|
|
if not os.path.isfile(exe_path): |
|
|
|
continue |
|
|
|
except reg.WindowsError: |
|
|
|
pass |
|
|
|
else: |
|
|
|
break |
|
|
|
return exe_path |
|
|
|
|
|
|
|
def get_linux_browser(browser: str) -> Optional[str]: |
|
|
|
from shutil import which as exists |
|
|
|
|
|
|
|
possibilities: List[str] = [browser + channel for channel in ('', '-beta', '-dev', '-developer', '-canary')] |
|
|
|
for browser in possibilities: |
|
|
|
if exists(browser): |
|
|
|
return browser |
|
|
|
|
|
|
|
registry: Dict[str, Dict[str, functools.partial]] = { |
|
|
|
'Windows': { |
|
|
|
'chrome': functools.partial(get_windows_browser, 'chrome'), |
|
|
|
'chromium': functools.partial(get_windows_browser, 'chromium'), |
|
|
|
'microsoft-edge': functools.partial(get_windows_browser, 'msedge'), |
|
|
|
'opera': functools.partial(get_windows_browser, 'opera'), |
|
|
|
}, |
|
|
|
'Darwin': { |
|
|
|
'chrome': functools.partial(get_mac_browser, 'com.google.Chrome', 'Google Chrome'), |
|
|
|
'chromium': functools.partial(get_mac_browser, 'org.chromium.Chromium', 'Chromium'), |
|
|
|
'microsoft-edge': functools.partial(get_mac_browser, 'com.microsoft.Edge', 'Microsoft Edge'), |
|
|
|
'opera': functools.partial(get_mac_browser, 'com.operasoftware.Opera', 'Opera'), |
|
|
|
}, |
|
|
|
'Linux': { |
|
|
|
'chrome': functools.partial(get_linux_browser, 'chrome'), |
|
|
|
'chromium': functools.partial(get_linux_browser, 'chromium'), |
|
|
|
'microsoft-edge': functools.partial(get_linux_browser, 'microsoft-edge'), |
|
|
|
'opera': functools.partial(get_linux_browser, 'opera'), |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
def get_browser(self, browser: Optional[BrowserEnum] = None) -> Optional[str]: |
|
|
|
if browser is not None: |
|
|
|
return self.registry.get(platform.system(), {})[browser.value]() |
|
|
|
|
|
|
|
for browser in self.registry.get(platform.system(), {}).values(): |
|
|
|
browser = browser() |
|
|
|
if browser is not None: |
|
|
|
return browser |
|
|
|
|
|
|
|
@property |
|
|
|
def running(self) -> bool: |
|
|
|
try: |
|
|
|
return self.proc.poll() is None |
|
|
|
except: |
|
|
|
return False |
|
|
|
|
|
|
|
def launch( |
|
|
|
self, |
|
|
|
domain: Optional[str] = None, |
|
|
|
server: Tuple[Optional[str], Optional[int]] = (None, None), |
|
|
|
width: int = 400, |
|
|
|
height: int = 500, |
|
|
|
browser_args: List[str] = [], |
|
|
|
extensions: Optional[str] = None, |
|
|
|
) -> None: |
|
|
|
browser_command: List[str] = [self.browser, *browser_args] |
|
|
|
|
|
|
|
if extensions: |
|
|
|
browser_command.append(f'--load-extension={extensions}') |
|
|
|
|
|
|
|
browser_command.extend( |
|
|
|
( |
|
|
|
'--disable-default-apps', |
|
|
|
'--no-default-browser-check', |
|
|
|
'--no-check-default-browser', |
|
|
|
'--no-first-run', |
|
|
|
'--ignore-certificate-errors', |
|
|
|
'--disable-background-networking', |
|
|
|
'--disable-component-update', |
|
|
|
'--disable-domain-reliability', |
|
|
|
f'--user-data-dir={os.path.join(tempfile.TemporaryDirectory().name, "Profiles")}', |
|
|
|
f'--host-rules=MAP {domain} {server[0]}:{server[1]}', |
|
|
|
f'--window-size={width},{height}', |
|
|
|
f'--app=https://{domain}', |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
self.proc = subprocess.Popen(browser_command, stdout=-1, stderr=-1) |
|
|
|
|
|
|
|
def stop(self) -> None: |
|
|
|
try: |
|
|
|
self.proc.terminate() |
|
|
|
except: |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
async def _get_info(session: ClientSession) -> Tuple[str, str, int]: |
|
|
|
for _ in range(3): |
|
|
|
try: |
|
|
|