|
|
@ -25,6 +25,7 @@ DEALINGS IN THE SOFTWARE. |
|
|
|
import array |
|
|
|
import asyncio |
|
|
|
import collections.abc |
|
|
|
from typing import Optional, overload |
|
|
|
import unicodedata |
|
|
|
from base64 import b64encode |
|
|
|
from bisect import bisect_left |
|
|
@ -103,9 +104,17 @@ class SequenceProxy(collections.abc.Sequence): |
|
|
|
def count(self, value): |
|
|
|
return self.__proxied.count(value) |
|
|
|
|
|
|
|
def parse_time(timestamp): |
|
|
|
@overload |
|
|
|
def parse_time(timestamp: None) -> None: |
|
|
|
... |
|
|
|
|
|
|
|
@overload |
|
|
|
def parse_time(timestamp: str) -> datetime.datetime: |
|
|
|
... |
|
|
|
|
|
|
|
def parse_time(timestamp: Optional[str]) -> Optional[datetime.datetime]: |
|
|
|
if timestamp: |
|
|
|
return datetime.datetime(*map(int, re.split(r'[^\d]', timestamp.replace('+00:00', '')))) |
|
|
|
return datetime.datetime.fromisoformat(timestamp) |
|
|
|
return None |
|
|
|
|
|
|
|
def copy_doc(original): |
|
|
@ -168,7 +177,7 @@ def oauth_url(client_id, permissions=None, guild=None, redirect_uri=None, scopes |
|
|
|
return url |
|
|
|
|
|
|
|
|
|
|
|
def snowflake_time(id): |
|
|
|
def snowflake_time(id: int) -> datetime.datetime: |
|
|
|
""" |
|
|
|
Parameters |
|
|
|
----------- |
|
|
@ -178,25 +187,34 @@ def snowflake_time(id): |
|
|
|
Returns |
|
|
|
-------- |
|
|
|
:class:`datetime.datetime` |
|
|
|
The creation date in UTC of a Discord snowflake ID.""" |
|
|
|
return datetime.datetime.utcfromtimestamp(((id >> 22) + DISCORD_EPOCH) / 1000) |
|
|
|
An aware datetime in UTC representing the creation time of the snowflake. |
|
|
|
""" |
|
|
|
timestamp = ((id >> 22) + DISCORD_EPOCH) / 1000 |
|
|
|
return datetime.datetime.utcfromtimestamp(timestamp).replace(tzinfo=datetime.timezone.utc) |
|
|
|
|
|
|
|
def time_snowflake(datetime_obj, high=False): |
|
|
|
def time_snowflake(dt: datetime.datetime, high: bool = False) -> int: |
|
|
|
"""Returns a numeric snowflake pretending to be created at the given date. |
|
|
|
|
|
|
|
When using as the lower end of a range, use ``time_snowflake(high=False) - 1`` to be inclusive, ``high=True`` to be exclusive |
|
|
|
When using as the higher end of a range, use ``time_snowflake(high=True)`` + 1 to be inclusive, ``high=False`` to be exclusive |
|
|
|
When using as the lower end of a range, use ``time_snowflake(high=False) - 1`` |
|
|
|
to be inclusive, ``high=True`` to be exclusive. |
|
|
|
|
|
|
|
When using as the higher end of a range, use ``time_snowflake(high=True) + 1`` |
|
|
|
to be inclusive, ``high=False`` to be exclusive |
|
|
|
|
|
|
|
Parameters |
|
|
|
----------- |
|
|
|
datetime_obj: :class:`datetime.datetime` |
|
|
|
A timezone-naive datetime object representing UTC time. |
|
|
|
dt: :class:`datetime.datetime` |
|
|
|
A datetime object to convert to a snowflake. |
|
|
|
If naive, the timezone is assumed to be local time. |
|
|
|
high: :class:`bool` |
|
|
|
Whether or not to set the lower 22 bit to high or low. |
|
|
|
""" |
|
|
|
unix_seconds = (datetime_obj - type(datetime_obj)(1970, 1, 1)).total_seconds() |
|
|
|
discord_millis = int(unix_seconds * 1000 - DISCORD_EPOCH) |
|
|
|
|
|
|
|
Returns |
|
|
|
-------- |
|
|
|
:class:`int` |
|
|
|
The snowflake representing the time given. |
|
|
|
""" |
|
|
|
discord_millis = int(dt.timestamp() * 1000 - DISCORD_EPOCH) |
|
|
|
return (discord_millis << 22) + (2**22-1 if high else 0) |
|
|
|
|
|
|
|
def find(predicate, seq): |
|
|
@ -374,12 +392,12 @@ async def sleep_until(when, result=None): |
|
|
|
----------- |
|
|
|
when: :class:`datetime.datetime` |
|
|
|
The timestamp in which to sleep until. If the datetime is naive then |
|
|
|
it is assumed to be in UTC. |
|
|
|
it is assumed to be local time. |
|
|
|
result: Any |
|
|
|
If provided is returned to the caller when the coroutine completes. |
|
|
|
""" |
|
|
|
if when.tzinfo is None: |
|
|
|
when = when.replace(tzinfo=datetime.timezone.utc) |
|
|
|
when = when.astimezone() |
|
|
|
now = datetime.datetime.now(datetime.timezone.utc) |
|
|
|
delta = (when - now).total_seconds() |
|
|
|
while delta > MAX_ASYNCIO_SECONDS: |
|
|
|