Source code for pytdbot.client_manager

import asyncio
import logging
from typing import List
from concurrent.futures import ThreadPoolExecutor

import pytdbot
from .tdjson import TdJson

logger = logging.getLogger(__name__)


[docs] class ClientManager: """Client manager for Pytdbot"""
[docs] def __init__( self, clients: List["pytdbot.Client"] = None, lib_path: str = None, verbosity: int = 2, loop: asyncio.AbstractEventLoop = None, ) -> None: """Manage multiple Pytdbot clients Example: .. code-block:: python >>> from pytdbot import ClientManager, Client >>> clients = [Client(...), Client(...), Client(...)] >>> client_manager = ClientManager(clients) >>> await client_manager.start() Parameters: clients (``List[pytdbot.Client]``, *optional*): List of clients to manage lib_path (``str``, *optional*): Path to TDlib library verbosity (``int``, *optional*): Verbosity level of TDlib. Default is ``2`` loop (``asyncio.AbstractEventLoop``, *optional*): Event loop to use """ if clients and not isinstance(clients, (list, pytdbot.Client)): raise TypeError("clients must be a list of pytdbot.Client") self.loop = loop or pytdbot.utils.get_running_loop() self.__tdjson = TdJson(lib_path, verbosity) self.__clients: dict[int, pytdbot.Client] = {} if isinstance(clients, list): self.__pending_clients = clients self.start_clients_on_add = True elif isinstance(clients, pytdbot.Client): self.__pending_clients = [clients] self.start_clients_on_add = False else: self.__pending_clients = None self.start_clients_on_add = False self.__receiver_task = None self.__should_exit = False self.is_running = False
[docs] async def start(self) -> None: """Start the Client Manager""" if self.is_running: return self.__receiver_task = self.loop.create_task(self.__td_receiver_loop()) for client in self.__pending_clients: await self.add_client(client, start_client=self.start_clients_on_add) self.__pending_clients = None
[docs] async def add_client( self, client: "pytdbot.Client", start_client: bool = False ) -> None: """Add a client to the manager Parameters: client (``pytdbot.Client``): Client to add start_client (``bool``, *optional*): Whether to start the client immediately. Default is ``False`` """ if not isinstance(client, pytdbot.Client): raise TypeError("client must be an instance of pytdbot.Client") client_id = self.__tdjson.create_client_id() client.client_id = client_id client.client_manager = self self.__clients[client_id] = client if start_client: await client.start() logger.debug(f"Client {client_id} added")
[docs] async def delete_client(self, client_id: int, close_client: bool = False) -> None: """Remove a client from the manager Parameters: client_id (``int``): ID of client to remove close_client (``bool``, *optional*): Whether to close the client before removing. Default is ``False`` """ client = self.__clients.pop(client_id, None) if not client: raise ValueError(f"Client with ID {client_id} not found") if close_client: await client.stop() logger.debug(f"Client {client_id} deleted")
[docs] def send(self, client_id: int, request: dict) -> None: """Send a request to TDlib Parameters: client_id (``int``): ID of client to send request from request (``dict``): Request to send """ self.__tdjson.send(client_id, request)
async def __td_receiver_loop(self) -> None: with ThreadPoolExecutor( max_workers=1, thread_name_prefix="ClientManager" ) as executor: try: self.is_running = True logger.info("ClientManager started") while not self.__should_exit: update = await self.loop.run_in_executor( executor, self.__tdjson.receive, 100000.0, # Seconds ) if not update or self.__should_exit: continue client = self.__clients.get(update["@client_id"]) if client: self.loop.create_task(client.process_update(update)) else: logger.warning( f"Unknown client ID in update: {update['@client_id']}" ) except Exception: logger.exception("Error in td_receiver") finally: self.is_running = False logger.debug("ClientManager stopped")
[docs] async def close(self, close_all_clients: bool = False) -> bool: """Close the Client Manager Parameters: close_all_clients (``bool``, *optional*): Whether to close all managed clients. Default is ``False`` Returns: ``bool`` """ if self.__should_exit: return True self.__should_exit = True if close_all_clients: for client_id in list(self.__clients.keys()): await self.delete_client(client_id, close_client=True) # Send dummy request to wake up receiver self.send(0, {"@type": "getOption", "name": "version"}) await self.__receiver_task logger.info("ClientManager closed") return True