Привет, разработчики! 👋
Вы когда-нибудь хотели мгновенно реагировать на получение доната через DonationAlerts? Может быть, запустить эффект на стриме, обновить дашборд или просто надежно логировать донаты без постоянных запросов к API? Опрос API (polling) работает, но он неэффективен и не обеспечивает реального времени.
DonationAlerts предлагает способ получать эти уведомления мгновенно с помощью WebSockets, но настройка включает в себя прохождение потока OAuth 2.0 и работу с их специфичным протоколом WebSocket (на базе Centrifugo).
В этом руководстве мы пройдемся по шагам:
- Регистрация приложения в DonationAlerts.
- Реализация потока авторизации OAuth 2.0 Authorization Code с использованием Python (на FastAPI).
- Подключение к WebSocket DonationAlerts (Centrifugo).
- Аутентификация, подписка и получение сообщений о донатах в реальном времени.
- Понимание формата данных о донатах.
Давайте откажемся от опросов и настроим получение донатов в реальном времени!
Предварительные требования
- Аккаунт DonationAlerts (аккаунт стримера, который вы хотите отслеживать).
- Установленный Python 3.8+.
- Базовое знакомство с Python. Знание FastAPI полезно, но не строго обязательно для понимания концепций.
-
pip
для установки пакетов.
(Мы будем использовать FastAPI для веб-сервера, обрабатывающего OAuth, но основную логику взаимодействия с API и WebSocket DonationAlerts можно адаптировать для других фреймворков.)
Шаг 1: Регистрация вашего приложения в DonationAlerts 🔑
Прежде чем писать код, нам нужно сообщить DonationAlerts о нашем приложении и получить учетные данные.
- Перейдите на страницу приложений DonationAlerts OAuth: https://www.donationalerts.com/application/clients
- Войдите в систему, используя свой аккаунт стримера (Twitch, YouTube и т.д.).
- На странице "OAuth API Applications" нажмите кнопку "+ СОЗДАТЬ НОВОЕ ПРИЛОЖЕНИЕ".
- Заполните форму "Новое приложение":
- Название приложения: Дайте ему описательное имя, например, "Мой Монитор Реального Времени". (Оно будет показано пользователям).
- URL перенаправления (Redirect URL): Это КРИТИЧЕСКИ ВАЖНО. Введите точный URL, на который DonationAlerts отправит пользователя после того, как он авторизует ваше приложение. Для локальной разработки с нашим примером используйте:
http://localhost:8000/api/auth/donationalerts/callback
. При развертывании это должен быть ваш публичный URL обратного вызова.
- Нажмите оранжевую кнопку "СОЗДАТЬ".
- Найдите созданное приложение в списке. Вам понадобятся две части информации:
- ID Приложения (App ID): Ваш уникальный идентификатор приложения. Будем называть его
APP_ID
. - API Ключ (API Key): Секрет вашего приложения. Храните его в безопасности и никогда не раскрывайте во фронтенд-коде! Будем называть его
API_KEY
.
- ID Приложения (App ID): Ваш уникальный идентификатор приложения. Будем называть его
Скопируйте эти два значения (APP_ID
и API_KEY
). Они скоро нам понадобятся.
Шаг 2: Понимание потока OAuth 2.0 🌊
Нам нужно, чтобы пользователь (стример) предоставил нашему приложению разрешение на доступ к своим данным (в частности, на просмотр профиля и подписку на события донатов), не передавая нам свой пароль. Именно для этого и нужен OAuth 2.0. Мы будем использовать поток "Authorization Code", стандартный для веб-приложений.
Вот суть:
- Инициация пользователем: Пользователь нажимает кнопку "Войти через DonationAlerts" в нашем приложении.
- Перенаправление на DonationAlerts: Наш бэкенд перенаправляет браузер пользователя на специальный URL DonationAlerts, включая наш
APP_ID
, запрошенныеscopes
(права доступа) иredirect_uri
. - Авторизация пользователем: Пользователь входит в DonationAlerts (если необходимо) и видит экран с запросом на одобрение разрешений, которые запросило наше приложение (
oauth-user-show
,oauth-donation-subscribe
). - Перенаправление обратно с кодом: Если одобрено, DonationAlerts перенаправляет браузер пользователя обратно на наш
redirect_uri
, добавляя временныйcode
к URL. - Обмен кода на токен: Наш бэкенд получает этот
code
, проверяет его, а затем безопасно выполняет запрос сервер-сервер к DonationAlerts, отправляяcode
, нашAPP_ID
и наш секретныйAPI_KEY
. - Получение токенов: DonationAlerts проверяет все и отправляет обратно
access_token
(используется для вызовов API) иrefresh_token
(используется для получения новогоaccess_token
, когда старый истечет).
Этот access_token
является ключом к взаимодействию с API DonationAlerts от имени пользователя.
Шаг 3: Реализация OAuth на Python (Пример с FastAPI) 🐍
Давайте настроим простое приложение FastAPI для обработки этого потока.
(Полный рабочий пример, включая хранение токенов, управление WebSocket и интерактивный помощник по настройке, смотрите в коде репозитория donationalerts-oauth-websocket-example.)
Конфигурация (config.py
/ .env
)
Лучшей практикой является хранение учетных данных и настроек вне кода. Мы можем использовать файл .env
и вспомогательный модуль config.py
(как показано в полном примере) для их загрузки. Ваш .env
будет выглядеть примерно так:
# .env
APP_ID="ВАШ_APP_ID_ЗДЕСЬ"
API_KEY="ВАШ_API_KEY_ЗДЕСЬ"
REDIRECT_URI="http://localhost:8000/api/auth/donationalerts/callback"
SESSION_SECRET_KEY="СГЕНЕРИРУЙТЕ_НАДЕЖНЫЙ_СЛУЧАЙНЫЙ_КЛЮЧ_ЗДЕСЬ" # Для безопасности сессии
# Обычно фиксированные значения
DA_SCOPES="oauth-user-show oauth-donation-subscribe"
DA_AUTHORIZATION_URL="https://www.donationalerts.com/oauth/authorize"
DA_TOKEN_URL="https://www.donationalerts.com/oauth/token"
DA_API_BASE_URL="https://www.donationalerts.com/api/v1"
# ... другие URL, если необходимо
Эндпоинты FastAPI (main.py
)
Нам нужны два основных эндпоинта: один для начала входа и один для обработки обратного вызова (callback).
# main.py (Упрощенные фрагменты)
import httpx
import secrets
import os # Для загрузки из .env
from fastapi import FastAPI, Request, Depends
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware # Для хранения state
from pydantic_settings import BaseSettings # Для загрузки конфига
# --- Загрузка конфигурации ---
class Settings(BaseSettings):
APP_ID: str
API_KEY: str
REDIRECT_URI: str
SESSION_SECRET_KEY: str
DA_SCOPES: str = "oauth-user-show oauth-donation-subscribe"
DA_AUTHORIZATION_URL: str = "https://www.donationalerts.com/oauth/authorize"
DA_TOKEN_URL: str = "https://www.donationalerts.com/oauth/token"
DA_API_BASE_URL: str = "https://www.donationalerts.com/api/v1"
DA_CENTRIFUGO_URL: str = "wss://centrifugo.donationalerts.com/connection/websocket"
class Config:
env_file = '.env' # Указываем файл .env
settings = Settings() # Загружаем настройки при старте
app = FastAPI()
# ВАЖНО: SessionMiddleware необходим для временного хранения состояния OAuth
app.add_middleware(SessionMiddleware, secret_key=settings.SESSION_SECRET_KEY)
# --- Эндпоинт входа ---
@app.get("/api/auth/donationalerts/login")
async def login_donationalerts(request: Request):
state = secrets.token_urlsafe(16)
request.session['oauth_state'] = state # Сохраняем state для предотвращения CSRF
params = {
"client_id": settings.APP_ID,
"redirect_uri": settings.REDIRECT_URI,
"response_type": "code",
"scope": settings.DA_SCOPES,
"state": state,
}
# Используем httpx для построения URL с параметрами
auth_request = httpx.Request('GET', settings.DA_AUTHORIZATION_URL, params=params)
authorization_url = str(auth_request.url)
print(f"Перенаправление на: {authorization_url}") # Для отладки
return RedirectResponse(authorization_url)
# --- Эндпоинт обратного вызова (Callback) ---
@app.get("/api/auth/donationalerts/callback")
async def auth_donationalerts_callback(request: Request, code: str = None, state: str = None, error: str = None):
if error:
# Обработка ошибки авторизации от DonationAlerts
print(f"Ошибка авторизации от DonationAlerts: {error}")
# В реальном приложении показать пользователю сообщение об ошибке
return {"error": error, "description": "Пользователь отклонил авторизацию или произошла ошибка."}
# --- Проверка безопасности: Сверка State ---
stored_state = request.session.pop('oauth_state', None)
if not state or state != stored_state:
# Несовпадение state, возможная CSRF-атака!
print("Ошибка: Неверный state параметр.")
return {"error": "invalid_state", "description": "Несовпадение параметра state. Возможна CSRF атака."}
if not code:
print("Ошибка: Отсутствует код авторизации.")
return {"error": "missing_code", "description": "В параметрах ответа отсутствует код авторизации."}
# --- Обмен кода на токены ---
token_data = await exchange_code_for_token(code) # См. функцию ниже
if not token_data or "access_token" not in token_data:
print("Ошибка: Не удалось обменять код на токен.")
return {"error": "token_exchange_failed", "description": "Не удалось получить токен доступа от DonationAlerts."}
# --- Успех! Безопасно сохраняем токены ---
# В реальном приложении: сохраните token_data['access_token'], token_data['refresh_token']
# и вычислите время истечения токена (token_data['expires_in']) в базе данных, связанной с пользователем.
# Для этого примера мы могли бы использовать простое файловое хранилище (например, token_storage.py).
# save_token(token_data) # Как в token_storage.py из полного примера
print("Авторизация OAuth прошла успешно, токены получены!")
print(f"Access Token: {token_data['access_token'][:10]}...") # Не логируйте полный токен
print(f"Refresh Token: {token_data.get('refresh_token', 'N/A')[:10]}...")
print(f"Expires in: {token_data.get('expires_in')} seconds")
# TODO: Сохранить токены (например, в файл или базу данных)
# store_tokens(token_data['access_token'], token_data['refresh_token'], token_data['expires_in'])
# Перенаправляем пользователя обратно на главную страницу вашего приложения
# status_code=303 (See Other) рекомендуется после POST или для перенаправления после завершения действия
return RedirectResponse(url="/?status=success", status_code=303)
# --- Вспомогательная функция: Обмен кода на токен ---
async def exchange_code_for_token(code: str) -> dict | None:
data = {
"grant_type": "authorization_code",
"client_id": settings.APP_ID,
"client_secret": settings.API_KEY, # Используем API Key как client_secret
"code": code,
"redirect_uri": settings.REDIRECT_URI,
}
async with httpx.AsyncClient() as client:
try:
print(f"Отправка запроса на обмен токена на {settings.DA_TOKEN_URL}")
response = await client.post(settings.DA_TOKEN_URL, data=data)
response.raise_for_status() # Проверяем наличие HTTP ошибок (4xx, 5xx)
token_info = response.json()
print("Обмен токена прошел успешно!")
return token_info
except httpx.HTTPStatusError as e:
# Логируем ошибку, если DonationAlerts вернул статус ошибки
print(f"Ошибка при обмене токена: Статус {e.response.status_code}")
try:
# Попытка прочитать тело ответа для получения деталей ошибки
error_details = e.response.json()
print(f"Тело ответа ошибки: {error_details}")
except Exception:
print(f"Тело ответа ошибки (не JSON): {e.response.text}")
return None
except httpx.RequestError as e:
# Логируем ошибки сети или соединения
print(f"Ошибка сети при запросе обмена токена: {e}")
return None
except Exception as e:
# Логируем другие непредвиденные ошибки
print(f"Произошла непредвиденная ошибка при обмене токена: {e}")
return None
Этот код обрабатывает основной поток OAuth. Не забудьте безопасно хранить полученные access_token
и refresh_token
.
Шаг 4: Подключение к WebSocket (Centrifugo) 🕸️
Теперь, когда у нас есть access_token
, мы можем подключиться к WebSocket для получения событий в реальном времени. Здесь становится немного сложнее, так как задействованы три разных типа токенов:
-
access_token
OAuth: Тот, который мы только что получили. Необходим для вызовов API. -
socket_connection_token
: Временный токен, предназначенный специально для аутентификации самого WebSocket-соединения. Мы получаем его из эндпоинта API/api/v1/user/oauth
, используя нашaccess_token
. -
subscription_token
: Еще один временный токен, необходимый для подписки на определенный приватный канал (например, канал донатов) после аутентификации WebSocket. Мы получаем его из эндпоинта/api/v1/centrifuge/subscribe
, используя нашaccess_token
иclient_id
, предоставленный WebSocket при аутентификации.
Последовательность подключения:
Вот последовательность, реализованная в donationalerts_client.py
нашего примера:
# donationalerts_client.py (Упрощенные фрагменты)
import websockets
import json
import asyncio
import httpx
import time
from datetime import datetime, timedelta
# Предполагается, что 'settings' загружены (как в main.py),
# и у нас есть валидный 'access_token' (полученный и сохраненный ранее).
# Также предполагается наличие функции `_make_api_request` для запросов к API
# и функции для управления/обновления токенов (get_valid_access_token, refresh_tokens).
# --- Глобальные переменные для управления состоянием ---
current_access_token: str | None = None
current_refresh_token: str | None = None
token_expires_at: datetime | None = None
# TODO: Реализовать функции загрузки/сохранения токенов (например, в файл)
# def load_tokens(): ...
# def save_tokens(access, refresh, expires_in): ...
async def get_valid_access_token() -> str | None:
"""Возвращает действительный access_token, при необходимости обновляя его."""
global current_access_token, token_expires_at
if not current_access_token or not token_expires_at:
# Попытка загрузить токены, если их нет в памяти
# loaded = load_tokens() # Реализация зависит от способа хранения
# if not loaded:
print("Нет сохраненных токенов.")
return None
# Проверяем, не истек ли токен (с небольшим запасом, например, 5 минут)
if datetime.now() >= (token_expires_at - timedelta(minutes=5)):
print("Access token истек или скоро истечет, пытаемся обновить...")
refreshed = await refresh_tokens()
if not refreshed:
print("Не удалось обновить токен.")
return None
# Обновляем глобальные переменные после успешного обновления
# (предполагается, что refresh_tokens обновляет их)
print("Токен успешно обновлен.")
return current_access_token
async def refresh_tokens() -> bool:
"""Обновляет access_token, используя refresh_token."""
global current_access_token, current_refresh_token, token_expires_at, settings
if not current_refresh_token:
print("Нет refresh token для обновления.")
return False
data = {
"grant_type": "refresh_token",
"refresh_token": current_refresh_token,
"client_id": settings.APP_ID,
"client_secret": settings.API_KEY,
}
async with httpx.AsyncClient() as client:
try:
print(f"Запрос на обновление токена на {settings.DA_TOKEN_URL}")
response = await client.post(settings.DA_TOKEN_URL, data=data)
response.raise_for_status()
new_token_data = response.json()
# --- Обновляем глобальные переменные и сохраняем ---
current_access_token = new_token_data['access_token']
# Некоторые API могут не возвращать refresh_token при обновлении
current_refresh_token = new_token_data.get('refresh_token', current_refresh_token)
expires_in = new_token_data['expires_in']
token_expires_at = datetime.now() + timedelta(seconds=expires_in)
# save_tokens(current_access_token, current_refresh_token, expires_in) # Сохраняем обновленные
print(f"Токен обновлен. Новый Access Token: {current_access_token[:10]}...")
return True
except httpx.HTTPStatusError as e:
print(f"Ошибка при обновлении токена: Статус {e.response.status_code}")
try:
print(f"Тело ошибки: {e.response.json()}")
except Exception:
print(f"Тело ошибки (не JSON): {e.response.text}")
# Если ошибка 400/401 с refresh_token, возможно, он отозван, требуется повторная авторизация
if e.response.status_code in [400, 401]:
print("Refresh token недействителен. Требуется повторная авторизация пользователя.")
# Сбросить токены, чтобы запросить новую авторизацию
current_access_token = None
current_refresh_token = None
token_expires_at = None
# remove_saved_tokens()
return False
except Exception as e:
print(f"Непредвиденная ошибка при обновлении токена: {e}")
return False
async def _make_api_request(method: str, endpoint: str, token: str | None = None, **kwargs) -> dict | None:
"""Вспомогательная функция для выполнения запросов к API DonationAlerts."""
global settings
access_token = token or await get_valid_access_token()
if not access_token:
print("Нет действительного access_token для выполнения запроса.")
return None
headers = {"Authorization": f"Bearer {access_token}"}
url = f"{settings.DA_API_BASE_URL}{endpoint}"
async with httpx.AsyncClient() as client:
try:
print(f"Выполнение {method} запроса к {url}")
response = await client.request(method, url, headers=headers, **kwargs)
# Ошибки 4xx/5xx вызовут исключение
response.raise_for_status()
# Успешные ответы без тела (204 No Content)
if response.status_code == 204:
return {} # Возвращаем пустой dict для согласованности
return response.json()
except httpx.HTTPStatusError as e:
print(f"Ошибка API запроса {method} {endpoint}: Статус {e.response.status_code}")
try:
print(f"Тело ошибки: {e.response.json()}")
except Exception:
print(f"Тело ошибки (не JSON): {e.response.text}")
# Если ошибка 401 Unauthorized, возможно токен протух несмотря на проверку
if e.response.status_code == 401:
print("Получена ошибка 401 Unauthorized. Возможно, токен доступа недействителен.")
# Можно попробовать обновить токен и повторить запрос, но это усложнит логику.
# Пока просто возвращаем None.
return None
except httpx.RequestError as e:
print(f"Ошибка сети при запросе к {url}: {e}")
return None
except Exception as e:
print(f"Непредвиденная ошибка при запросе к API: {e}")
return None
async def connect_and_listen():
"""Основная функция подключения к WebSocket и прослушивания сообщений."""
global settings
reconnect_delay = 5 # Начальная задержка перед переподключением (в секундах)
while True: # Цикл для автоматического переподключения
access_token = await get_valid_access_token()
if not access_token:
print("Не удалось получить действительный токен доступа. Невозможно подключиться к WebSocket.")
print(f"Повторная попытка через {reconnect_delay} секунд...")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 60) # Удваиваем задержку, максимум 60с
continue
ws: websockets.WebSocketClientProtocol | None = None # Для finally блока
try:
# --- Этап 1: Получение User ID и Socket Token (HTTP API) ---
print("Получение информации о пользователе и токена для WebSocket...")
user_info_response = await _make_api_request('GET', '/user/oauth', token=access_token)
if not user_info_response or 'data' not in user_info_response:
print("Не удалось получить информацию о пользователе или ответ API не содержит 'data'.")
# Не увеличиваем задержку, т.к. это может быть временная проблема API
await asyncio.sleep(reconnect_delay)
continue # Переходим к следующей итерации цикла переподключения
user_id = user_info_response['data'].get('id')
socket_token = user_info_response['data'].get('socket_connection_token')
if not user_id or not socket_token:
print("В ответе API отсутствует user_id или socket_connection_token.")
await asyncio.sleep(reconnect_delay)
continue
print(f"Получен User ID: {user_id}, Socket Token: {socket_token[:10]}...") # Не логируем полный токен
# --- Этап 2: Подключение и Аутентификация WebSocket ---
websocket_url = settings.DA_CENTRIFUGO_URL
print(f"Подключение к WebSocket: {websocket_url}")
# Устанавливаем таймаут на пинги, чтобы обнаруживать "мертвые" соединения
ws = await websockets.connect(websocket_url, ping_interval=30, ping_timeout=10)
print("Успешно подключено к WebSocket.")
reconnect_delay = 5 # Сбрасываем задержку при успешном подключении
# --- Этап 2a: Отправка Socket Token ---
# ВАЖНО: Формат специфичен! {"params": {"token": ...}, "id": 1}
auth_payload = {"params": {"token": socket_token}, "id": 1}
await ws.send(json.dumps(auth_payload))
print("Отправлен запрос аутентификации WebSocket.")
# --- Этап 2b: Получение ответа аутентификации и Client ID ---
auth_response_str = await asyncio.wait_for(ws.recv(), timeout=10.0) # Ждем ответ 10 сек
auth_response = json.loads(auth_response_str)
print(f"Получен ответ аутентификации: {auth_response}")
client_id = None
if auth_response.get("id") == 1 and auth_response.get("result", {}).get("client"):
client_id = auth_response["result"]["client"]
print(f"Аутентификация WebSocket успешна! Client ID: {client_id}")
else:
# Подробное логирование ошибки аутентификации
error_info = auth_response.get("error")
print(f"Аутентификация WebSocket не удалась! Ошибка: {error_info}")
await ws.close() # Закрываем соединение
continue # Переходим к переподключению
# --- Этап 3: Получение токена подписки (HTTP API) ---
# Канал для донатов: "$alerts:donation_{user_id}"
# Можно подписаться и на другие каналы, например, "$alerts:goal_{user_id}" для целей сбора
channel_name = f"$alerts:donation_{user_id}"
print(f"Запрос токена подписки для канала: {channel_name}")
sub_token_payload = {"client": client_id, "channels": [channel_name]}
sub_response = await _make_api_request('POST', '/centrifuge/subscribe', token=access_token, json=sub_token_payload)
if not sub_response or "channels" not in sub_response:
print("Не удалось получить ответ с токеном подписки или ответ не содержит 'channels'.")
await ws.close()
continue
subscription_token = None
for channel_info in sub_response.get("channels", []):
if channel_info.get("channel") == channel_name and "token" in channel_info:
subscription_token = channel_info["token"]
break
if not subscription_token:
print(f"Токен подписки для канала {channel_name} не найден в ответе API.")
print(f"Ответ API: {sub_response}")
await ws.close()
continue
print(f"Получен токен подписки: {subscription_token[:10]}...")
# --- Этап 4: Подписка на канал (WebSocket) ---
subscribe_payload = {
"id": 2, # Используем новый ID для этого запроса
"method": 1, # 1 = subscribe
"params": {"channel": channel_name, "token": subscription_token}
}
await ws.send(json.dumps(subscribe_payload))
print(f"Отправлен запрос подписки на канал {channel_name}")
# Ожидание подтверждения подписки (не обязательно, но полезно для отладки)
try:
sub_confirm_str = await asyncio.wait_for(ws.recv(), timeout=10.0)
sub_confirm = json.loads(sub_confirm_str)
if sub_confirm.get("id") == 2 and "result" in sub_confirm:
print(f"Подписка на канал {channel_name} подтверждена.")
elif sub_confirm.get("id") == 2 and "error" in sub_confirm:
print(f"Ошибка подписки на канал {channel_name}: {sub_confirm.get('error')}")
await ws.close()
continue
else:
# Это может быть уже сообщение с данными, обработаем его ниже
print(f"Получено сообщение во время ожидания подтверждения подписки: {sub_confirm_str}")
handle_donation_message(sub_confirm_str) # Обрабатываем его
except asyncio.TimeoutError:
print("Не получено подтверждение подписки в течение 10 секунд.")
# Продолжаем слушать, возможно, подписка прошла, но ответ затерялся
# --- Этап 5: Прослушивание сообщений ---
print("Ожидание сообщений о донатах...")
async for message_str in ws:
# print(f"ПОЛУЧЕНО RAW СООБЩЕНИЕ: {message_str}") # Раскомментировать для детальной отладки
# Теперь парсим сообщение (см. следующий шаг)
handle_donation_message(message_str)
except websockets.exceptions.ConnectionClosedOK:
print("Соединение WebSocket закрыто штатно.")
except websockets.exceptions.ConnectionClosedError as e:
print(f"Соединение WebSocket закрыто с ошибкой: Код={e.code}, Причина='{e.reason}'")
except asyncio.TimeoutError:
print("Таймаут при ожидании ответа от WebSocket.")
except httpx.HTTPStatusError as e:
# Эти ошибки должны обрабатываться в _make_api_request, но ловим на всякий случай
print(f"Ошибка HTTP API во время установки соединения: Статус {e.response.status_code}")
except Exception as e:
# Ловим все остальные ошибки (сетевые, JSON и т.д.)
print(f"Произошла ошибка WebSocket: {type(e).__name__}: {e}")
# Добавляем traceback для сложных ошибок
import traceback
traceback.print_exc()
finally:
if ws and not ws.closed:
await ws.close()
print(f"Соединение WebSocket закрыто. Попытка переподключения через {reconnect_delay} секунд...")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 60) # Увеличиваем задержку перед следующей попыткой
# --- Вспомогательная функция для обработки сообщений ---
def handle_donation_message(message_str: str):
"""Парсит входящее сообщение и обрабатывает донаты."""
try:
message = json.loads(message_str)
# Проверяем, является ли это push-сообщением (нет 'id') и имеет ли ожидаемую структуру
if "id" not in message and "result" in message:
result_data = message.get("result", {})
channel = result_data.get("channel", "")
outer_data = result_data.get("data", {})
# Проверяем, что это сообщение из канала донатов
if channel.startswith("$alerts:donation_") and "data" in outer_data:
donation_data = outer_data.get("data") # Это и есть полезная нагрузка доната
if donation_data and isinstance(donation_data, dict):
# Извлекаем полезные поля (используем .get для безопасности)
donation_id = donation_data.get("id")
# Имя отправителя: сначала username, если нет - name, иначе "Аноним"
sender = donation_data.get("username") or donation_data.get("name") or "Аноним"
amount = donation_data.get("amount")
currency = donation_data.get("currency")
text = donation_data.get("message", "")
created_at_str = donation_data.get("created_at") # "YYYY-MM-DD HH:MM:SS"
# Конвертируем строку времени в объект datetime (опционально)
created_at_dt = None
if created_at_str:
try:
created_at_dt = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"Не удалось распарсить дату: {created_at_str}")
print("-" * 20)
print(f"🎉 Новый донат! (ID: {donation_id})")
print(f" От: {sender}")
print(f" Сумма: {amount} {currency}")
# Убираем лишние пробелы и переносы строк из сообщения
print(f" Сообщение: {text.strip() if text else '(пусто)'}")
print(f" Время: {created_at_str} ({created_at_dt.strftime('%H:%M:%S') if created_at_dt else 'N/A'})")
print("-" * 20)
# --- TODO: Отправьте эти данные в ваш фронтенд или вызовите действия! ---
# Пример: await websocket_manager.broadcast({"type": "donation", "data": donation_data})
# Или запишите в базу данных, отправьте уведомление и т.д.
else:
# Странная структура данных внутри data
print(f"Получено сообщение в канале донатов, но 'data' имеет неожиданный формат: {outer_data}")
elif channel.startswith("$alerts:donation_"):
# Может быть сообщение о подписке/отписке или другое системное сообщение на канале
print(f"Получено не-донатное push-сообщение на канале донатов: {message}")
else:
# Сообщение из другого канала (если мы на него подписаны)
print(f"Получено push-сообщение из другого канала ({channel}): {message}")
elif "id" in message:
# Это, вероятно, подтверждение одного из наших запросов (id: 1 или 2) или пинг/понг
# Можно добавить логику для обработки подтверждений или ошибок по ID
if "error" in message:
print(f"Получена ошибка в ответ на запрос ID {message['id']}: {message['error']}")
# else:
# print(f"Получено подтверждение/ответ на запрос ID {message['id']}: {message.get('result', message)}")
else:
# Неизвестный формат сообщения
print(f"Получен необрабатываемый формат сообщения: {message_str}")
except json.JSONDecodeError:
print(f"Ошибка декодирования JSON: {message_str}")
except Exception as e:
# Ловим другие возможные ошибки при обработке сообщения
print(f"Ошибка при обработке сообщения: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
# --- Запуск клиента ---
# if __name__ == "__main__":
# # TODO: Загрузить токены перед запуском
# # load_tokens()
# try:
# asyncio.run(connect_and_listen())
# except KeyboardInterrupt:
# print("Клиент остановлен вручную.")
Уф! Это самая сложная часть. Правильная реализация этой последовательности — ключ к успеху.
Шаг 5: Обработка сообщений о донатах 📨
После подписки DonationAlerts будет отправлять сообщения через WebSocket без поля id
. Фактические данные о донате вложены внутрь.
Формат сообщения:
Исходя из наблюдений, типичное сообщение о донате выглядит так:
{
"result": {
"channel": "$alerts:donation_ВАШ_USER_ID",
"data": {
"seq": 15, // Порядковый номер от Centrifugo
"data": {
// ---> ЭТО и есть сам объект доната <---
"id": 164405432, // Уникальный ID доната! Важно для дедупликации.
"name": "ИмяДонатера", // Имя, введенное донатером (запасной вариант)
"username": "ИмяПользователяDA", // Имя пользователя DA, если авторизован, иначе null
"message": "Ваше сообщение здесь! \r\n Может содержать переносы строк.",
"message_type": "text", // Тип сообщения (может быть 'audio')
"amount": 22.0, // Сумма в валюте доната
"currency": "USD", // Валюта доната
"is_shown": 1, // Был ли показан в стандартных виджетах DA (0 или 1)
"amount_in_user_currency": 1500.0, // Сумма в основной валюте стримера
"recipient_name": "ИмяВашегоСтримера", // Публичное имя получателя
"recipient": { // Информация о получателе (вас)
"user_id": ВАШ_USER_ID, // Ваш ID пользователя в DA
"code": "ВашЛогинСтримера", // Ваш логин/идентификатор канала
"name": "ИмяВашегоСтримера", // Ваше публичное имя
"avatar": "URL_к_аватару" // URL вашего аватара
},
"created_at": "2025-04-12 07:15:49", // Время создания доната на сервере (UTC?)
"updated_at": "2025-04-12 07:15:55", // Время последнего обновления (например, показа)
"shown_at": "2025-04-12 07:15:55", // Время показа (если is_shown=1)
"reason": "Donation", // Причина (может отличаться для других типов событий)
"billing_system": "unitpay", // Платежная система (если доступно)
"billing_system_type": "card" // Тип платежа (если доступно)
// ... потенциально другие поля ...
}
}
}
}
Парсинг на Python:
(См. функцию handle_donation_message
в коде выше. Она уже включает парсинг и извлечение основных полей.)
# def handle_donation_message(message_str):
# ... (код парсинга из предыдущего блока) ...
Не забывайте обрабатывать потенциально отсутствующие поля с помощью .get()
. Поле id
критически важно, если вы хотите предотвратить обработку одного и того же доната несколько раз (DonationAlerts иногда может присылать дубликаты, особенно при переподключениях).
(Опционально) Шаг 6: Отображение донатов 🖥️
Приведенный выше код Python просто выводит донат в консоль. Чтобы показать его в веб-интерфейсе, вам обычно потребуется еще одно WebSocket-соединение, но уже между вашим бэкендом и вашим фронтендом.
- Бэкенд (
websocket_manager.py
/main.py
): Когдаhandle_donation_message
обрабатывает донат, вместоprint
используйтеWebSocketManager
(или аналогичный механизм), чтобы отправить (broadcast
)donation_data
всем подключенным фронтенд-клиентам. - Фронтенд (
index.html
/ JavaScript): Установите WebSocket-соединение с вашим бэкендом (например, по адресу/ws
). Слушайте сообщения. Когда приходит сообщение сtype: "donation"
, используйте JavaScript, чтобы распарситьdata
и добавить его в HTML.
// Упрощенный пример JS для фронтенда
// Подключаемся к WebSocket-эндпоинту вашего бэкенда
const wsUrl = `ws://${window.location.host}/ws`; // Или wss:// для HTTPS
console.log(`Подключение к WebSocket: ${wsUrl}`);
let ws = new WebSocket(wsUrl);
ws.onopen = function(event) {
console.log("WebSocket соединение с бэкендом установлено.");
// Можно отправить сообщение бэкенду при подключении, если нужно
// ws.send(JSON.stringify({type: "hello", client: "web-ui"}));
};
ws.onmessage = function(event) {
try {
const message = JSON.parse(event.data);
console.log("Сообщение от бэкенда:", message); // Для отладки
if (message.type === 'donation' && message.data) {
// Функция для добавления доната на страницу
displayDonation(message.data);
} else {
console.log("Получено сообщение другого типа:", message.type);
}
} catch (e) {
console.error("Ошибка обработки сообщения от бэкенда:", e, event.data);
}
};
ws.onerror = function(event) {
console.error("Ошибка WebSocket:", event);
};
ws.onclose = function(event) {
console.log(`WebSocket соединение закрыто: Код=${event.code}, Причина='${event.reason}', Чисто=${event.wasClean}`);
// Здесь можно реализовать логику переподключения для фронтенда, если необходимо
// setTimeout(reconnectWebSocket, 5000); // Попытка переподключения через 5 секунд
};
function displayDonation(donation) {
const log = document.getElementById('donations-log'); // Убедитесь, что у вас есть в HTML
if (!log) return; // Выход, если элемент не найден
const item = document.createElement('div');
item.classList.add('donation-item'); // Добавим класс для стилизации
// Используем имя пользователя, если есть, иначе имя, иначе "Аноним"
const sender = escapeHtml(donation.username || donation.name || 'Аноним');
const amount = escapeHtml(String(donation.amount)); // Преобразуем в строку перед экранированием
const currency = escapeHtml(donation.currency);
// Экранируем сообщение и заменяем переносы строк на
const messageText = donation.message ? escapeHtml(donation.message).replace(/\r?\n/g, '') : '';
// Формируем HTML для отображения
item.innerHTML = `
${sender}
задонатил ${amount} ${currency}
${messageText ? `${messageText}` : ''}
(${donation.created_at || 'время неизвестно'})
`;
// Добавляем новый донат в начало списка (или в конец, если нужно)
log.prepend(item); // Или log.appendChild(item);
// Опционально: ограничить количество отображаемых донатов
const maxItems = 50;
while (log.children.length > maxItems) {
log.removeChild(log.lastChild);
}
}
// Простая функция для экранирования HTML, чтобы предотвратить XSS
function escapeHtml(unsafe) {
if (unsafe === null || typeof unsafe === 'undefined') return '';
return String(unsafe) // Убедимся, что это строка
.replace(/&/g, "&")
.replace(/, "<")
.replace(/>/g, ">")
.replace(/"/g, """)
.replace(/'/g, "'");
}
// Функция для переподключения (если нужна)
// function reconnectWebSocket() {
// console.log("Попытка переподключения WebSocket...");
// ws = new WebSocket(wsUrl);
// // Переназначаем обработчики событий для нового объекта ws
// ws.onopen = ...
// ws.onmessage = ...
// ws.onerror = ...
// ws.onclose = ...
// }
Заключение ✨
Мы рассмотрели основные шаги для получения уведомлений от DonationAlerts в реальном времени с использованием OAuth 2.0 и WebSockets на Python. Это включает настройку приложения, обработку многоэтапного процесса подключения OAuth и WebSocket (включая эти специфичные токены!) и парсинг входящих сообщений о донатах.
Хотя настройка более сложна, чем простой опрос или старый метод с токеном виджета, она дает вам стандартный, надежный способ прямой интеграции с потоком событий DonationAlerts.