You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
268 lines
6.4 KiB
268 lines
6.4 KiB
import json
|
|
import requests
|
|
from datetime import datetime
|
|
|
|
from websocket import create_connection
|
|
import socketio
|
|
|
|
from .utils import Event, User, Data, Donations, DonationsData, CentrifugoResponse
|
|
|
|
DEFAULT_URL = "https://www.donationalerts.com/oauth/"
|
|
DEFAULT_API_LINK = "https://www.donationalerts.com/api/v1/"
|
|
|
|
|
|
class DonationAlertsAPI:
|
|
"""
|
|
This class describes work with Donation Alerts API
|
|
"""
|
|
|
|
def __init__(self, client_id, client_secret, redirect_uri, scopes):
|
|
symbols = [",", ", ", " ", "%20"]
|
|
|
|
if isinstance(scopes, list):
|
|
obj_scopes = []
|
|
for scope in scopes:
|
|
obj_scopes.append(scope)
|
|
|
|
scopes = " ".join(obj_scopes)
|
|
|
|
for symbol in symbols:
|
|
if symbol in scopes:
|
|
self.scope = scopes.replace(symbol, "%20").strip() # Replaces some symbols on '%20' for stable work
|
|
else:
|
|
self.scope = scopes
|
|
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.redirect_uri = redirect_uri
|
|
|
|
def login(self):
|
|
return f"{DEFAULT_URL}authorize?client_id={self.client_id}&redirect_uri={self.redirect_uri}&response_type=code&scope={self.scope}"
|
|
|
|
def get_access_token(self, code, *, full_json=False):
|
|
payload = {
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": self.redirect_uri,
|
|
"scope": self.scope
|
|
}
|
|
|
|
obj = requests.post(f"{DEFAULT_URL}token", data=payload).json()
|
|
|
|
return Data(
|
|
obj["access_token"],
|
|
obj["expires_in"],
|
|
obj["refresh_token"],
|
|
obj["token_type"],
|
|
obj
|
|
) if full_json else obj["access_token"]
|
|
|
|
def donations_list(self, access_token, *, page: int=1):
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
|
|
objs = requests.get(f"{DEFAULT_API_LINK}alerts/donations?page={page}", headers=headers).json()
|
|
donations = Donations(objects=objs)
|
|
|
|
for obj in objs["data"]:
|
|
donation_object = DonationsData(
|
|
obj["amount"],
|
|
obj["amount_in_user_currency"],
|
|
datetime.strptime(obj["created_at"], "%Y-%m-%d %H:%M:%S"),
|
|
obj["currency"],
|
|
obj["id"],
|
|
obj["is_shown"],
|
|
obj["message"],
|
|
obj["message_type"],
|
|
obj["name"],
|
|
obj["payin_system"],
|
|
obj["recipient_name"],
|
|
obj["shown_at"],
|
|
obj["username"]
|
|
)
|
|
donations.donation.append(donation_object)
|
|
|
|
return donations
|
|
|
|
def user(self, access_token):
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
obj = requests.get(f"{DEFAULT_API_LINK}user/oauth", headers=headers).json()
|
|
|
|
return User(
|
|
obj["data"]["avatar"],
|
|
obj["data"]["code"],
|
|
obj["data"]["email"],
|
|
obj["data"]["id"],
|
|
obj["data"]["language"],
|
|
obj["data"]["name"],
|
|
obj["data"]["socket_connection_token"],
|
|
obj["data"]
|
|
)
|
|
|
|
def send_custom_alert(self, access_token, external_id, headline, message, *, image_url=None, sound_url=None, is_shown=0):
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
payload = {
|
|
"external_id": external_id,
|
|
"headline": headline,
|
|
"message": message,
|
|
"is_shown": is_shown,
|
|
"image_url": image_url,
|
|
"sound_url": sound_url
|
|
}
|
|
|
|
obj = requests.post(f"{DEFAULT_API_LINK}custom_alert", data=payload, headers=headers).json()
|
|
return obj
|
|
|
|
def get_refresh_token(self, access_token, refresh_token):
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded"
|
|
}
|
|
payload = {
|
|
"grant_type": "refresh_token",
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"refresh_token": refresh_token,
|
|
"redirect_uri": self.redirect_uri,
|
|
"scope": self.scope
|
|
}
|
|
|
|
obj = requests.post(f"{DEFAULT_URL}token", data=payload, headers=headers).json()
|
|
return Data(
|
|
obj["access_token"],
|
|
obj["expires_in"],
|
|
obj["refresh_token"],
|
|
obj["token_type"],
|
|
obj
|
|
)
|
|
|
|
|
|
class Centrifugo:
|
|
|
|
def __init__(self, socket_connection_token, access_token, user_id):
|
|
self.socket_connection_token = socket_connection_token
|
|
self.access_token = access_token
|
|
self.user_id = user_id
|
|
|
|
self.uri = "wss://centrifugo.donationalerts.com/connection/websocket"
|
|
|
|
def subscribe(self, channels):
|
|
chnls = [f"{channels}{self.user_id}"]
|
|
if isinstance(channels, list):
|
|
chnls = []
|
|
for channel in channels:
|
|
chnls.append(f"{channel}{self.user_id}")
|
|
|
|
ws = create_connection(self.uri)
|
|
ws.send(json.dumps(
|
|
{
|
|
"params": {
|
|
"token": self.socket_connection_token
|
|
},
|
|
"id": self.user_id
|
|
}
|
|
))
|
|
|
|
ws_response = json.loads(ws.recv())
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"channels": chnls,
|
|
"client": ws_response["result"]["client"]
|
|
}
|
|
|
|
response = requests.post(f"{DEFAULT_API_LINK}centrifuge/subscribe", data=json.dumps(data), headers=headers).json()
|
|
for ch in response["channels"]:
|
|
self.ws.send(json.dumps(
|
|
{
|
|
"params": {
|
|
"channel": ch["channel"],
|
|
"token": ch["token"]
|
|
},
|
|
"method": 1,
|
|
"id": self.user_id
|
|
}
|
|
))
|
|
|
|
ws.recv()
|
|
ws.recv()
|
|
|
|
obj = json.loads(ws.recv())["result"]["data"]["data"]
|
|
return CentrifugoResponse(
|
|
obj["amount"],
|
|
obj["amount_in_user_currency"],
|
|
datetime.strptime(obj["created_at"], "%Y-%m-%d %H:%M:%S"),
|
|
obj["currency"],
|
|
obj["id"],
|
|
obj["is_shown"],
|
|
obj["message"],
|
|
obj["message_type"],
|
|
obj["name"],
|
|
obj["payin_system"],
|
|
obj["recipient_name"],
|
|
obj["shown_at"],
|
|
obj["username"],
|
|
obj["reason"],
|
|
obj
|
|
)
|
|
|
|
|
|
sio = socketio.Client()
|
|
|
|
|
|
class Alert:
|
|
|
|
def __init__(self, token):
|
|
self.token = token
|
|
|
|
def event(self):
|
|
def wrapper(function):
|
|
|
|
@sio.on("connect")
|
|
def on_connect():
|
|
sio.emit("add-user", {"token": self.token, "type": "alert_widget"})
|
|
|
|
@sio.on("donation")
|
|
def on_message(data):
|
|
data = json.loads(data)
|
|
|
|
function(
|
|
Event(
|
|
data["id"],
|
|
data["alert_type"],
|
|
data["is_shown"],
|
|
json.loads(data["additional_data"]),
|
|
data["billing_system"],
|
|
data["billing_system_type"],
|
|
data["username"],
|
|
data["amount"],
|
|
data["amount_formatted"],
|
|
data["amount_main"],
|
|
data["currency"],
|
|
data["message"],
|
|
data["header"],
|
|
datetime.strptime(data["date_created"], "%Y-%m-%d %H:%M:%S"),
|
|
data["emotes"],
|
|
data["ap_id"],
|
|
data["_is_test_alert"],
|
|
data["message_type"],
|
|
data["preset_id"],
|
|
data
|
|
)
|
|
)
|
|
|
|
sio.connect("wss://socket.donationalerts.ru:443", transports="websocket")
|
|
|
|
return wrapper
|