Привет, разработчики! 👋

Вы когда-нибудь хотели мгновенно реагировать на получение доната через DonationAlerts? Может быть, запустить эффект на стриме, обновить дашборд или просто надежно логировать донаты без постоянных запросов к API? Опрос API (polling) работает, но он неэффективен и не обеспечивает реального времени.

DonationAlerts предлагает способ получать эти уведомления мгновенно с помощью WebSockets, но настройка включает в себя прохождение потока OAuth 2.0 и работу с их специфичным протоколом WebSocket (на базе Centrifugo).

В этом руководстве мы пройдемся по шагам:

  1. Регистрация приложения в DonationAlerts.
  2. Реализация потока авторизации OAuth 2.0 Authorization Code с использованием Python (на FastAPI).
  3. Подключение к WebSocket DonationAlerts (Centrifugo).
  4. Аутентификация, подписка и получение сообщений о донатах в реальном времени.
  5. Понимание формата данных о донатах.

Давайте откажемся от опросов и настроим получение донатов в реальном времени!

Предварительные требования

  • Аккаунт DonationAlerts (аккаунт стримера, который вы хотите отслеживать).
  • Установленный Python 3.8+.
  • Базовое знакомство с Python. Знание FastAPI полезно, но не строго обязательно для понимания концепций.
  • pip для установки пакетов.

(Мы будем использовать FastAPI для веб-сервера, обрабатывающего OAuth, но основную логику взаимодействия с API и WebSocket DonationAlerts можно адаптировать для других фреймворков.)

Шаг 1: Регистрация вашего приложения в DonationAlerts 🔑

Прежде чем писать код, нам нужно сообщить DonationAlerts о нашем приложении и получить учетные данные.

  1. Перейдите на страницу приложений DonationAlerts OAuth: https://www.donationalerts.com/application/clients
  2. Войдите в систему, используя свой аккаунт стримера (Twitch, YouTube и т.д.).
  3. На странице "OAuth API Applications" нажмите кнопку "+ СОЗДАТЬ НОВОЕ ПРИЛОЖЕНИЕ".
  4. Заполните форму "Новое приложение":
    • Название приложения: Дайте ему описательное имя, например, "Мой Монитор Реального Времени". (Оно будет показано пользователям).
    • URL перенаправления (Redirect URL): Это КРИТИЧЕСКИ ВАЖНО. Введите точный URL, на который DonationAlerts отправит пользователя после того, как он авторизует ваше приложение. Для локальной разработки с нашим примером используйте: http://localhost:8000/api/auth/donationalerts/callback. При развертывании это должен быть ваш публичный URL обратного вызова.
  5. Нажмите оранжевую кнопку "СОЗДАТЬ".
  6. Найдите созданное приложение в списке. Вам понадобятся две части информации:
    • ID Приложения (App ID): Ваш уникальный идентификатор приложения. Будем называть его APP_ID.
    • API Ключ (API Key): Секрет вашего приложения. Храните его в безопасности и никогда не раскрывайте во фронтенд-коде! Будем называть его API_KEY.

Скопируйте эти два значения (APP_ID и API_KEY). Они скоро нам понадобятся.

Шаг 2: Понимание потока OAuth 2.0 🌊

Нам нужно, чтобы пользователь (стример) предоставил нашему приложению разрешение на доступ к своим данным (в частности, на просмотр профиля и подписку на события донатов), не передавая нам свой пароль. Именно для этого и нужен OAuth 2.0. Мы будем использовать поток "Authorization Code", стандартный для веб-приложений.

Вот суть:

  1. Инициация пользователем: Пользователь нажимает кнопку "Войти через DonationAlerts" в нашем приложении.
  2. Перенаправление на DonationAlerts: Наш бэкенд перенаправляет браузер пользователя на специальный URL DonationAlerts, включая наш APP_ID, запрошенные scopes (права доступа) и redirect_uri.
  3. Авторизация пользователем: Пользователь входит в DonationAlerts (если необходимо) и видит экран с запросом на одобрение разрешений, которые запросило наше приложение (oauth-user-show, oauth-donation-subscribe).
  4. Перенаправление обратно с кодом: Если одобрено, DonationAlerts перенаправляет браузер пользователя обратно на наш redirect_uri, добавляя временный code к URL.
  5. Обмен кода на токен: Наш бэкенд получает этот code, проверяет его, а затем безопасно выполняет запрос сервер-сервер к DonationAlerts, отправляя code, наш APP_ID и наш секретный API_KEY.
  6. Получение токенов: DonationAlerts проверяет все и отправляет обратно access_token (используется для вызовов API) и refresh_token (используется для получения нового access_token, когда старый истечет).

Этот access_token является ключом к взаимодействию с API DonationAlerts от имени пользователя.

Шаг 3: Реализация OAuth на Python (Пример с FastAPI) 🐍

Давайте настроим простое приложение FastAPI для обработки этого потока.

(Полный рабочий пример, включая хранение токенов, управление WebSocket и интерактивный помощник по настройке, смотрите в коде репозитория donationalerts-oauth-websocket-example.)

Конфигурация (config.py / .env)

Лучшей практикой является хранение учетных данных и настроек вне кода. Мы можем использовать файл .env и вспомогательный модуль config.py (как показано в полном примере) для их загрузки. Ваш .env будет выглядеть примерно так:

# .env
APP_ID="ВАШ_APP_ID_ЗДЕСЬ"
API_KEY="ВАШ_API_KEY_ЗДЕСЬ"
REDIRECT_URI="http://localhost:8000/api/auth/donationalerts/callback"
SESSION_SECRET_KEY="СГЕНЕРИРУЙТЕ_НАДЕЖНЫЙ_СЛУЧАЙНЫЙ_КЛЮЧ_ЗДЕСЬ" # Для безопасности сессии

# Обычно фиксированные значения
DA_SCOPES="oauth-user-show oauth-donation-subscribe"
DA_AUTHORIZATION_URL="https://www.donationalerts.com/oauth/authorize"
DA_TOKEN_URL="https://www.donationalerts.com/oauth/token"
DA_API_BASE_URL="https://www.donationalerts.com/api/v1"
# ... другие URL, если необходимо

Эндпоинты FastAPI (main.py)

Нам нужны два основных эндпоинта: один для начала входа и один для обработки обратного вызова (callback).

# main.py (Упрощенные фрагменты)
import httpx
import secrets
import os # Для загрузки из .env
from fastapi import FastAPI, Request, Depends
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware # Для хранения state
from pydantic_settings import BaseSettings # Для загрузки конфига

# --- Загрузка конфигурации ---
class Settings(BaseSettings):
    APP_ID: str
    API_KEY: str
    REDIRECT_URI: str
    SESSION_SECRET_KEY: str
    DA_SCOPES: str = "oauth-user-show oauth-donation-subscribe"
    DA_AUTHORIZATION_URL: str = "https://www.donationalerts.com/oauth/authorize"
    DA_TOKEN_URL: str = "https://www.donationalerts.com/oauth/token"
    DA_API_BASE_URL: str = "https://www.donationalerts.com/api/v1"
    DA_CENTRIFUGO_URL: str = "wss://centrifugo.donationalerts.com/connection/websocket"

    class Config:
        env_file = '.env' # Указываем файл .env

settings = Settings() # Загружаем настройки при старте

app = FastAPI()

# ВАЖНО: SessionMiddleware необходим для временного хранения состояния OAuth
app.add_middleware(SessionMiddleware, secret_key=settings.SESSION_SECRET_KEY)

# --- Эндпоинт входа ---
@app.get("/api/auth/donationalerts/login")
async def login_donationalerts(request: Request):
    state = secrets.token_urlsafe(16)
    request.session['oauth_state'] = state # Сохраняем state для предотвращения CSRF

    params = {
        "client_id": settings.APP_ID,
        "redirect_uri": settings.REDIRECT_URI,
        "response_type": "code",
        "scope": settings.DA_SCOPES,
        "state": state,
    }
    # Используем httpx для построения URL с параметрами
    auth_request = httpx.Request('GET', settings.DA_AUTHORIZATION_URL, params=params)
    authorization_url = str(auth_request.url)

    print(f"Перенаправление на: {authorization_url}") # Для отладки
    return RedirectResponse(authorization_url)

# --- Эндпоинт обратного вызова (Callback) ---
@app.get("/api/auth/donationalerts/callback")
async def auth_donationalerts_callback(request: Request, code: str = None, state: str = None, error: str = None):
    if error:
        # Обработка ошибки авторизации от DonationAlerts
        print(f"Ошибка авторизации от DonationAlerts: {error}")
        # В реальном приложении показать пользователю сообщение об ошибке
        return {"error": error, "description": "Пользователь отклонил авторизацию или произошла ошибка."}

    # --- Проверка безопасности: Сверка State ---
    stored_state = request.session.pop('oauth_state', None)
    if not state or state != stored_state:
        # Несовпадение state, возможная CSRF-атака!
        print("Ошибка: Неверный state параметр.")
        return {"error": "invalid_state", "description": "Несовпадение параметра state. Возможна CSRF атака."}

    if not code:
        print("Ошибка: Отсутствует код авторизации.")
        return {"error": "missing_code", "description": "В параметрах ответа отсутствует код авторизации."}

    # --- Обмен кода на токены ---
    token_data = await exchange_code_for_token(code) # См. функцию ниже

    if not token_data or "access_token" not in token_data:
        print("Ошибка: Не удалось обменять код на токен.")
        return {"error": "token_exchange_failed", "description": "Не удалось получить токен доступа от DonationAlerts."}

    # --- Успех! Безопасно сохраняем токены ---
    # В реальном приложении: сохраните token_data['access_token'], token_data['refresh_token']
    # и вычислите время истечения токена (token_data['expires_in']) в базе данных, связанной с пользователем.
    # Для этого примера мы могли бы использовать простое файловое хранилище (например, token_storage.py).
    # save_token(token_data) # Как в token_storage.py из полного примера

    print("Авторизация OAuth прошла успешно, токены получены!")
    print(f"Access Token: {token_data['access_token'][:10]}...") # Не логируйте полный токен
    print(f"Refresh Token: {token_data.get('refresh_token', 'N/A')[:10]}...")
    print(f"Expires in: {token_data.get('expires_in')} seconds")

    # TODO: Сохранить токены (например, в файл или базу данных)
    # store_tokens(token_data['access_token'], token_data['refresh_token'], token_data['expires_in'])

    # Перенаправляем пользователя обратно на главную страницу вашего приложения
    # status_code=303 (See Other) рекомендуется после POST или для перенаправления после завершения действия
    return RedirectResponse(url="/?status=success", status_code=303)

# --- Вспомогательная функция: Обмен кода на токен ---
async def exchange_code_for_token(code: str) -> dict | None:
    data = {
        "grant_type": "authorization_code",
        "client_id": settings.APP_ID,
        "client_secret": settings.API_KEY, # Используем API Key как client_secret
        "code": code,
        "redirect_uri": settings.REDIRECT_URI,
    }
    async with httpx.AsyncClient() as client:
        try:
            print(f"Отправка запроса на обмен токена на {settings.DA_TOKEN_URL}")
            response = await client.post(settings.DA_TOKEN_URL, data=data)
            response.raise_for_status() # Проверяем наличие HTTP ошибок (4xx, 5xx)
            token_info = response.json()
            print("Обмен токена прошел успешно!")
            return token_info
        except httpx.HTTPStatusError as e:
            # Логируем ошибку, если DonationAlerts вернул статус ошибки
            print(f"Ошибка при обмене токена: Статус {e.response.status_code}")
            try:
                # Попытка прочитать тело ответа для получения деталей ошибки
                error_details = e.response.json()
                print(f"Тело ответа ошибки: {error_details}")
            except Exception:
                print(f"Тело ответа ошибки (не JSON): {e.response.text}")
            return None
        except httpx.RequestError as e:
            # Логируем ошибки сети или соединения
            print(f"Ошибка сети при запросе обмена токена: {e}")
            return None
        except Exception as e:
            # Логируем другие непредвиденные ошибки
            print(f"Произошла непредвиденная ошибка при обмене токена: {e}")
            return None

Этот код обрабатывает основной поток OAuth. Не забудьте безопасно хранить полученные access_token и refresh_token.

Шаг 4: Подключение к WebSocket (Centrifugo) 🕸️

Теперь, когда у нас есть access_token, мы можем подключиться к WebSocket для получения событий в реальном времени. Здесь становится немного сложнее, так как задействованы три разных типа токенов:

  1. access_token OAuth: Тот, который мы только что получили. Необходим для вызовов API.
  2. socket_connection_token: Временный токен, предназначенный специально для аутентификации самого WebSocket-соединения. Мы получаем его из эндпоинта API /api/v1/user/oauth, используя наш access_token.
  3. subscription_token: Еще один временный токен, необходимый для подписки на определенный приватный канал (например, канал донатов) после аутентификации WebSocket. Мы получаем его из эндпоинта /api/v1/centrifuge/subscribe, используя наш access_token и client_id, предоставленный WebSocket при аутентификации.

Последовательность подключения:

Вот последовательность, реализованная в donationalerts_client.py нашего примера:

# donationalerts_client.py (Упрощенные фрагменты)
import websockets
import json
import asyncio
import httpx
import time
from datetime import datetime, timedelta

# Предполагается, что 'settings' загружены (как в main.py),
# и у нас есть валидный 'access_token' (полученный и сохраненный ранее).
# Также предполагается наличие функции `_make_api_request` для запросов к API
# и функции для управления/обновления токенов (get_valid_access_token, refresh_tokens).

# --- Глобальные переменные для управления состоянием ---
current_access_token: str | None = None
current_refresh_token: str | None = None
token_expires_at: datetime | None = None

# TODO: Реализовать функции загрузки/сохранения токенов (например, в файл)
# def load_tokens(): ...
# def save_tokens(access, refresh, expires_in): ...

async def get_valid_access_token() -> str | None:
    """Возвращает действительный access_token, при необходимости обновляя его."""
    global current_access_token, token_expires_at

    if not current_access_token or not token_expires_at:
        # Попытка загрузить токены, если их нет в памяти
        # loaded = load_tokens() # Реализация зависит от способа хранения
        # if not loaded:
        print("Нет сохраненных токенов.")
        return None

    # Проверяем, не истек ли токен (с небольшим запасом, например, 5 минут)
    if datetime.now() >= (token_expires_at - timedelta(minutes=5)):
        print("Access token истек или скоро истечет, пытаемся обновить...")
        refreshed = await refresh_tokens()
        if not refreshed:
            print("Не удалось обновить токен.")
            return None
        # Обновляем глобальные переменные после успешного обновления
        # (предполагается, что refresh_tokens обновляет их)
        print("Токен успешно обновлен.")

    return current_access_token

async def refresh_tokens() -> bool:
    """Обновляет access_token, используя refresh_token."""
    global current_access_token, current_refresh_token, token_expires_at, settings

    if not current_refresh_token:
        print("Нет refresh token для обновления.")
        return False

    data = {
        "grant_type": "refresh_token",
        "refresh_token": current_refresh_token,
        "client_id": settings.APP_ID,
        "client_secret": settings.API_KEY,
    }
    async with httpx.AsyncClient() as client:
        try:
            print(f"Запрос на обновление токена на {settings.DA_TOKEN_URL}")
            response = await client.post(settings.DA_TOKEN_URL, data=data)
            response.raise_for_status()
            new_token_data = response.json()

            # --- Обновляем глобальные переменные и сохраняем ---
            current_access_token = new_token_data['access_token']
            # Некоторые API могут не возвращать refresh_token при обновлении
            current_refresh_token = new_token_data.get('refresh_token', current_refresh_token)
            expires_in = new_token_data['expires_in']
            token_expires_at = datetime.now() + timedelta(seconds=expires_in)

            # save_tokens(current_access_token, current_refresh_token, expires_in) # Сохраняем обновленные
            print(f"Токен обновлен. Новый Access Token: {current_access_token[:10]}...")
            return True
        except httpx.HTTPStatusError as e:
            print(f"Ошибка при обновлении токена: Статус {e.response.status_code}")
            try:
                print(f"Тело ошибки: {e.response.json()}")
            except Exception:
                print(f"Тело ошибки (не JSON): {e.response.text}")
            # Если ошибка 400/401 с refresh_token, возможно, он отозван, требуется повторная авторизация
            if e.response.status_code in [400, 401]:
                 print("Refresh token недействителен. Требуется повторная авторизация пользователя.")
                 # Сбросить токены, чтобы запросить новую авторизацию
                 current_access_token = None
                 current_refresh_token = None
                 token_expires_at = None
                 # remove_saved_tokens()
            return False
        except Exception as e:
            print(f"Непредвиденная ошибка при обновлении токена: {e}")
            return False


async def _make_api_request(method: str, endpoint: str, token: str | None = None, **kwargs) -> dict | None:
    """Вспомогательная функция для выполнения запросов к API DonationAlerts."""
    global settings
    access_token = token or await get_valid_access_token()
    if not access_token:
        print("Нет действительного access_token для выполнения запроса.")
        return None

    headers = {"Authorization": f"Bearer {access_token}"}
    url = f"{settings.DA_API_BASE_URL}{endpoint}"
    async with httpx.AsyncClient() as client:
        try:
            print(f"Выполнение {method} запроса к {url}")
            response = await client.request(method, url, headers=headers, **kwargs)
            # Ошибки 4xx/5xx вызовут исключение
            response.raise_for_status()
            # Успешные ответы без тела (204 No Content)
            if response.status_code == 204:
                 return {} # Возвращаем пустой dict для согласованности
            return response.json()
        except httpx.HTTPStatusError as e:
            print(f"Ошибка API запроса {method} {endpoint}: Статус {e.response.status_code}")
            try:
                print(f"Тело ошибки: {e.response.json()}")
            except Exception:
                print(f"Тело ошибки (не JSON): {e.response.text}")
            # Если ошибка 401 Unauthorized, возможно токен протух несмотря на проверку
            if e.response.status_code == 401:
                 print("Получена ошибка 401 Unauthorized. Возможно, токен доступа недействителен.")
                 # Можно попробовать обновить токен и повторить запрос, но это усложнит логику.
                 # Пока просто возвращаем None.
            return None
        except httpx.RequestError as e:
            print(f"Ошибка сети при запросе к {url}: {e}")
            return None
        except Exception as e:
            print(f"Непредвиденная ошибка при запросе к API: {e}")
            return None


async def connect_and_listen():
    """Основная функция подключения к WebSocket и прослушивания сообщений."""
    global settings
    reconnect_delay = 5 # Начальная задержка перед переподключением (в секундах)

    while True: # Цикл для автоматического переподключения
        access_token = await get_valid_access_token()
        if not access_token:
            print("Не удалось получить действительный токен доступа. Невозможно подключиться к WebSocket.")
            print(f"Повторная попытка через {reconnect_delay} секунд...")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(reconnect_delay * 2, 60) # Удваиваем задержку, максимум 60с
            continue

        ws: websockets.WebSocketClientProtocol | None = None # Для finally блока
        try:
            # --- Этап 1: Получение User ID и Socket Token (HTTP API) ---
            print("Получение информации о пользователе и токена для WebSocket...")
            user_info_response = await _make_api_request('GET', '/user/oauth', token=access_token)

            if not user_info_response or 'data' not in user_info_response:
                 print("Не удалось получить информацию о пользователе или ответ API не содержит 'data'.")
                 # Не увеличиваем задержку, т.к. это может быть временная проблема API
                 await asyncio.sleep(reconnect_delay)
                 continue # Переходим к следующей итерации цикла переподключения

            user_id = user_info_response['data'].get('id')
            socket_token = user_info_response['data'].get('socket_connection_token')

            if not user_id or not socket_token:
                print("В ответе API отсутствует user_id или socket_connection_token.")
                await asyncio.sleep(reconnect_delay)
                continue

            print(f"Получен User ID: {user_id}, Socket Token: {socket_token[:10]}...") # Не логируем полный токен

            # --- Этап 2: Подключение и Аутентификация WebSocket ---
            websocket_url = settings.DA_CENTRIFUGO_URL
            print(f"Подключение к WebSocket: {websocket_url}")
            # Устанавливаем таймаут на пинги, чтобы обнаруживать "мертвые" соединения
            ws = await websockets.connect(websocket_url, ping_interval=30, ping_timeout=10)
            print("Успешно подключено к WebSocket.")
            reconnect_delay = 5 # Сбрасываем задержку при успешном подключении

            # --- Этап 2a: Отправка Socket Token ---
            # ВАЖНО: Формат специфичен! {"params": {"token": ...}, "id": 1}
            auth_payload = {"params": {"token": socket_token}, "id": 1}
            await ws.send(json.dumps(auth_payload))
            print("Отправлен запрос аутентификации WebSocket.")

            # --- Этап 2b: Получение ответа аутентификации и Client ID ---
            auth_response_str = await asyncio.wait_for(ws.recv(), timeout=10.0) # Ждем ответ 10 сек
            auth_response = json.loads(auth_response_str)
            print(f"Получен ответ аутентификации: {auth_response}")

            client_id = None
            if auth_response.get("id") == 1 and auth_response.get("result", {}).get("client"):
                client_id = auth_response["result"]["client"]
                print(f"Аутентификация WebSocket успешна! Client ID: {client_id}")
            else:
                # Подробное логирование ошибки аутентификации
                error_info = auth_response.get("error")
                print(f"Аутентификация WebSocket не удалась! Ошибка: {error_info}")
                await ws.close() # Закрываем соединение
                continue # Переходим к переподключению

            # --- Этап 3: Получение токена подписки (HTTP API) ---
            # Канал для донатов: "$alerts:donation_{user_id}"
            # Можно подписаться и на другие каналы, например, "$alerts:goal_{user_id}" для целей сбора
            channel_name = f"$alerts:donation_{user_id}"
            print(f"Запрос токена подписки для канала: {channel_name}")
            sub_token_payload = {"client": client_id, "channels": [channel_name]}

            sub_response = await _make_api_request('POST', '/centrifuge/subscribe', token=access_token, json=sub_token_payload)

            if not sub_response or "channels" not in sub_response:
                 print("Не удалось получить ответ с токеном подписки или ответ не содержит 'channels'.")
                 await ws.close()
                 continue

            subscription_token = None
            for channel_info in sub_response.get("channels", []):
                if channel_info.get("channel") == channel_name and "token" in channel_info:
                    subscription_token = channel_info["token"]
                    break

            if not subscription_token:
                 print(f"Токен подписки для канала {channel_name} не найден в ответе API.")
                 print(f"Ответ API: {sub_response}")
                 await ws.close()
                 continue

            print(f"Получен токен подписки: {subscription_token[:10]}...")

            # --- Этап 4: Подписка на канал (WebSocket) ---
            subscribe_payload = {
                "id": 2, # Используем новый ID для этого запроса
                "method": 1, # 1 = subscribe
                "params": {"channel": channel_name, "token": subscription_token}
            }
            await ws.send(json.dumps(subscribe_payload))
            print(f"Отправлен запрос подписки на канал {channel_name}")

            # Ожидание подтверждения подписки (не обязательно, но полезно для отладки)
            try:
                sub_confirm_str = await asyncio.wait_for(ws.recv(), timeout=10.0)
                sub_confirm = json.loads(sub_confirm_str)
                if sub_confirm.get("id") == 2 and "result" in sub_confirm:
                    print(f"Подписка на канал {channel_name} подтверждена.")
                elif sub_confirm.get("id") == 2 and "error" in sub_confirm:
                    print(f"Ошибка подписки на канал {channel_name}: {sub_confirm.get('error')}")
                    await ws.close()
                    continue
                else:
                    # Это может быть уже сообщение с данными, обработаем его ниже
                    print(f"Получено сообщение во время ожидания подтверждения подписки: {sub_confirm_str}")
                    handle_donation_message(sub_confirm_str) # Обрабатываем его
            except asyncio.TimeoutError:
                print("Не получено подтверждение подписки в течение 10 секунд.")
                # Продолжаем слушать, возможно, подписка прошла, но ответ затерялся

            # --- Этап 5: Прослушивание сообщений ---
            print("Ожидание сообщений о донатах...")
            async for message_str in ws:
                # print(f"ПОЛУЧЕНО RAW СООБЩЕНИЕ: {message_str}") # Раскомментировать для детальной отладки
                # Теперь парсим сообщение (см. следующий шаг)
                handle_donation_message(message_str)

        except websockets.exceptions.ConnectionClosedOK:
            print("Соединение WebSocket закрыто штатно.")
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"Соединение WebSocket закрыто с ошибкой: Код={e.code}, Причина='{e.reason}'")
        except asyncio.TimeoutError:
            print("Таймаут при ожидании ответа от WebSocket.")
        except httpx.HTTPStatusError as e:
            # Эти ошибки должны обрабатываться в _make_api_request, но ловим на всякий случай
            print(f"Ошибка HTTP API во время установки соединения: Статус {e.response.status_code}")
        except Exception as e:
            # Ловим все остальные ошибки (сетевые, JSON и т.д.)
            print(f"Произошла ошибка WebSocket: {type(e).__name__}: {e}")
            # Добавляем traceback для сложных ошибок
            import traceback
            traceback.print_exc()

        finally:
            if ws and not ws.closed:
                await ws.close()
            print(f"Соединение WebSocket закрыто. Попытка переподключения через {reconnect_delay} секунд...")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(reconnect_delay * 2, 60) # Увеличиваем задержку перед следующей попыткой

# --- Вспомогательная функция для обработки сообщений ---
def handle_donation_message(message_str: str):
    """Парсит входящее сообщение и обрабатывает донаты."""
    try:
        message = json.loads(message_str)

        # Проверяем, является ли это push-сообщением (нет 'id') и имеет ли ожидаемую структуру
        if "id" not in message and "result" in message:
            result_data = message.get("result", {})
            channel = result_data.get("channel", "")
            outer_data = result_data.get("data", {})

            # Проверяем, что это сообщение из канала донатов
            if channel.startswith("$alerts:donation_") and "data" in outer_data:
                donation_data = outer_data.get("data") # Это и есть полезная нагрузка доната

                if donation_data and isinstance(donation_data, dict):
                    # Извлекаем полезные поля (используем .get для безопасности)
                    donation_id = donation_data.get("id")
                    # Имя отправителя: сначала username, если нет - name, иначе "Аноним"
                    sender = donation_data.get("username") or donation_data.get("name") or "Аноним"
                    amount = donation_data.get("amount")
                    currency = donation_data.get("currency")
                    text = donation_data.get("message", "")
                    created_at_str = donation_data.get("created_at") # "YYYY-MM-DD HH:MM:SS"

                    # Конвертируем строку времени в объект datetime (опционально)
                    created_at_dt = None
                    if created_at_str:
                        try:
                            created_at_dt = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S")
                        except ValueError:
                            print(f"Не удалось распарсить дату: {created_at_str}")

                    print("-" * 20)
                    print(f"🎉 Новый донат! (ID: {donation_id})")
                    print(f"   От: {sender}")
                    print(f"   Сумма: {amount} {currency}")
                    # Убираем лишние пробелы и переносы строк из сообщения
                    print(f"   Сообщение: {text.strip() if text else '(пусто)'}")
                    print(f"   Время: {created_at_str} ({created_at_dt.strftime('%H:%M:%S') if created_at_dt else 'N/A'})")
                    print("-" * 20)

                    # --- TODO: Отправьте эти данные в ваш фронтенд или вызовите действия! ---
                    # Пример: await websocket_manager.broadcast({"type": "donation", "data": donation_data})
                    # Или запишите в базу данных, отправьте уведомление и т.д.

                else:
                    # Странная структура данных внутри data
                    print(f"Получено сообщение в канале донатов, но 'data' имеет неожиданный формат: {outer_data}")

            elif channel.startswith("$alerts:donation_"):
                 # Может быть сообщение о подписке/отписке или другое системное сообщение на канале
                 print(f"Получено не-донатное push-сообщение на канале донатов: {message}")
            else:
                 # Сообщение из другого канала (если мы на него подписаны)
                 print(f"Получено push-сообщение из другого канала ({channel}): {message}")

        elif "id" in message:
             # Это, вероятно, подтверждение одного из наших запросов (id: 1 или 2) или пинг/понг
             # Можно добавить логику для обработки подтверждений или ошибок по ID
             if "error" in message:
                  print(f"Получена ошибка в ответ на запрос ID {message['id']}: {message['error']}")
             # else:
             #      print(f"Получено подтверждение/ответ на запрос ID {message['id']}: {message.get('result', message)}")
        else:
             # Неизвестный формат сообщения
             print(f"Получен необрабатываемый формат сообщения: {message_str}")

    except json.JSONDecodeError:
        print(f"Ошибка декодирования JSON: {message_str}")
    except Exception as e:
        # Ловим другие возможные ошибки при обработке сообщения
        print(f"Ошибка при обработке сообщения: {type(e).__name__}: {e}")
        import traceback
        traceback.print_exc()

# --- Запуск клиента ---
# if __name__ == "__main__":
#     # TODO: Загрузить токены перед запуском
#     # load_tokens()
#     try:
#         asyncio.run(connect_and_listen())
#     except KeyboardInterrupt:
#         print("Клиент остановлен вручную.")

Уф! Это самая сложная часть. Правильная реализация этой последовательности — ключ к успеху.

Шаг 5: Обработка сообщений о донатах 📨

После подписки DonationAlerts будет отправлять сообщения через WebSocket без поля id. Фактические данные о донате вложены внутрь.

Формат сообщения:

Исходя из наблюдений, типичное сообщение о донате выглядит так:

{
  "result": {
    "channel": "$alerts:donation_ВАШ_USER_ID",
    "data": {
      "seq": 15, // Порядковый номер от Centrifugo
      "data": {
        // ---> ЭТО и есть сам объект доната <---
        "id": 164405432, // Уникальный ID доната! Важно для дедупликации.
        "name": "ИмяДонатера", // Имя, введенное донатером (запасной вариант)
        "username": "ИмяПользователяDA", // Имя пользователя DA, если авторизован, иначе null
        "message": "Ваше сообщение здесь! \r\n Может содержать переносы строк.",
        "message_type": "text", // Тип сообщения (может быть 'audio')
        "amount": 22.0, // Сумма в валюте доната
        "currency": "USD", // Валюта доната
        "is_shown": 1, // Был ли показан в стандартных виджетах DA (0 или 1)
        "amount_in_user_currency": 1500.0, // Сумма в основной валюте стримера
        "recipient_name": "ИмяВашегоСтримера", // Публичное имя получателя
        "recipient": { // Информация о получателе (вас)
            "user_id": ВАШ_USER_ID, // Ваш ID пользователя в DA
            "code": "ВашЛогинСтримера", // Ваш логин/идентификатор канала
            "name": "ИмяВашегоСтримера", // Ваше публичное имя
            "avatar": "URL_к_аватару" // URL вашего аватара
         },
        "created_at": "2025-04-12 07:15:49", // Время создания доната на сервере (UTC?)
        "updated_at": "2025-04-12 07:15:55", // Время последнего обновления (например, показа)
        "shown_at": "2025-04-12 07:15:55", // Время показа (если is_shown=1)
        "reason": "Donation", // Причина (может отличаться для других типов событий)
        "billing_system": "unitpay", // Платежная система (если доступно)
        "billing_system_type": "card" // Тип платежа (если доступно)
        // ... потенциально другие поля ...
      }
    }
  }
}

Парсинг на Python:

(См. функцию handle_donation_message в коде выше. Она уже включает парсинг и извлечение основных полей.)

# def handle_donation_message(message_str):
#     ... (код парсинга из предыдущего блока) ...

Не забывайте обрабатывать потенциально отсутствующие поля с помощью .get(). Поле id критически важно, если вы хотите предотвратить обработку одного и того же доната несколько раз (DonationAlerts иногда может присылать дубликаты, особенно при переподключениях).

(Опционально) Шаг 6: Отображение донатов 🖥️

Приведенный выше код Python просто выводит донат в консоль. Чтобы показать его в веб-интерфейсе, вам обычно потребуется еще одно WebSocket-соединение, но уже между вашим бэкендом и вашим фронтендом.

  1. Бэкенд (websocket_manager.py / main.py): Когда handle_donation_message обрабатывает донат, вместо print используйте WebSocketManager (или аналогичный механизм), чтобы отправить (broadcast) donation_data всем подключенным фронтенд-клиентам.
  2. Фронтенд (index.html / JavaScript): Установите WebSocket-соединение с вашим бэкендом (например, по адресу /ws). Слушайте сообщения. Когда приходит сообщение с type: "donation", используйте JavaScript, чтобы распарсить data и добавить его в HTML.
// Упрощенный пример JS для фронтенда
// Подключаемся к WebSocket-эндпоинту вашего бэкенда
const wsUrl = `ws://${window.location.host}/ws`; // Или wss:// для HTTPS
console.log(`Подключение к WebSocket: ${wsUrl}`);
let ws = new WebSocket(wsUrl);

ws.onopen = function(event) {
    console.log("WebSocket соединение с бэкендом установлено.");
    // Можно отправить сообщение бэкенду при подключении, если нужно
    // ws.send(JSON.stringify({type: "hello", client: "web-ui"}));
};

ws.onmessage = function(event) {
    try {
        const message = JSON.parse(event.data);
        console.log("Сообщение от бэкенда:", message); // Для отладки

        if (message.type === 'donation' && message.data) {
            // Функция для добавления доната на страницу
            displayDonation(message.data); 
        } else {
            console.log("Получено сообщение другого типа:", message.type);
        }
    } catch (e) {
        console.error("Ошибка обработки сообщения от бэкенда:", e, event.data);
    }
};

ws.onerror = function(event) {
    console.error("Ошибка WebSocket:", event);
};

ws.onclose = function(event) {
    console.log(`WebSocket соединение закрыто: Код=${event.code}, Причина='${event.reason}', Чисто=${event.wasClean}`);
    // Здесь можно реализовать логику переподключения для фронтенда, если необходимо
    // setTimeout(reconnectWebSocket, 5000); // Попытка переподключения через 5 секунд
};

function displayDonation(donation) {
    const log = document.getElementById('donations-log'); // Убедитесь, что у вас есть  в HTML
    if (!log) return; // Выход, если элемент не найден

    const item = document.createElement('div');
    item.classList.add('donation-item'); // Добавим класс для стилизации

    // Используем имя пользователя, если есть, иначе имя, иначе "Аноним"
    const sender = escapeHtml(donation.username || donation.name || 'Аноним');
    const amount = escapeHtml(String(donation.amount)); // Преобразуем в строку перед экранированием
    const currency = escapeHtml(donation.currency);
    // Экранируем сообщение и заменяем переносы строк на 
    const messageText = donation.message ? escapeHtml(donation.message).replace(/\r?\n/g, '') : '';

    // Формируем HTML для отображения
    item.innerHTML = `
        ${sender}
        задонатил ${amount} ${currency}
        ${messageText ? `${messageText}` : ''}
        (${donation.created_at || 'время неизвестно'})
    `;

    // Добавляем новый донат в начало списка (или в конец, если нужно)
    log.prepend(item); // Или log.appendChild(item);

    // Опционально: ограничить количество отображаемых донатов
    const maxItems = 50;
    while (log.children.length > maxItems) {
        log.removeChild(log.lastChild);
    }
}

// Простая функция для экранирования HTML, чтобы предотвратить XSS
function escapeHtml(unsafe) {
    if (unsafe === null || typeof unsafe === 'undefined') return '';
    return String(unsafe) // Убедимся, что это строка
         .replace(/&/g, "&")
         .replace(/, "<")
         .replace(/>/g, ">")
         .replace(/"/g, """)
         .replace(/'/g, "'");
}

// Функция для переподключения (если нужна)
// function reconnectWebSocket() {
//     console.log("Попытка переподключения WebSocket...");
//     ws = new WebSocket(wsUrl);
//     // Переназначаем обработчики событий для нового объекта ws
//     ws.onopen = ...
//     ws.onmessage = ...
//     ws.onerror = ...
//     ws.onclose = ...
// }

Заключение ✨

Мы рассмотрели основные шаги для получения уведомлений от DonationAlerts в реальном времени с использованием OAuth 2.0 и WebSockets на Python. Это включает настройку приложения, обработку многоэтапного процесса подключения OAuth и WebSocket (включая эти специфичные токены!) и парсинг входящих сообщений о донатах.

Хотя настройка более сложна, чем простой опрос или старый метод с токеном виджета, она дает вам стандартный, надежный способ прямой интеграции с потоком событий DonationAlerts.