import asyncio
import signal
from importlib import import_module, reload as reload_module
from json import dumps
from logging import DEBUG, getLogger
from os.path import join as join_path
from pathlib import Path
from platform import python_implementation, python_version
from threading import current_thread, main_thread
from typing import Callable, Dict, Type, Union
import aio_pika
from deepdiff import DeepDiff
import pytdbot
from . import types
from .client_manager import ClientManager
from .exception import AuthorizationError, StopHandlers
from .filters import Filter
from .handlers import Decorators, Handler
from .methods import Methods
from .types import LogStream, Plugins
from .utils import (
create_extra_id,
dict_to_obj,
get_bot_id_from_token,
get_running_loop,
json_dumps,
json_loads,
obj_to_dict,
)
[docs]
class Client(Decorators, Methods):
r"""Pytdbot, a TDLib client
Parameters:
token (``str``, *optional*):
Bot token
api_id (``int``, *optional*):
Identifier for Telegram API access, which can be obtained at https://my.telegram.org
api_hash (``str``, *optional*):
Identifier hash for Telegram API access, which can be obtained at https://my.telegram.org
rabbitmq_url (``str``, *optional*):
URL for RabbitMQ server connection
instance_id (``str``, *optional*):
Instance ID for RabbitMQ connections and queues. Default is ``None`` (random)
lib_path (``str``, *optional*):
Path to TDLib library. Default is ``None`` (auto-detect)
plugins (:class:`~pytdbot.types.Plugins`, *optional*):
Plugins to load
default_parse_mode (``str``, *optional*):
The default ``parse_mode`` for methods: :meth:`~pytdbot.Client.sendTextMessage`, :meth:`~pytdbot.Client.sendPhoto`, :meth:`~pytdbot.Client.sendAudio`, :meth:`~pytdbot.Client.sendVideo`, :meth:`~pytdbot.Client.sendDocument`, :meth:`~pytdbot.Client.sendAnimation`, :meth:`~pytdbot.Client.sendVoice`, :meth:`~pytdbot.Client.sendCopy`, :meth:`~pytdbot.Client.editTextMessage`; Default is ``None`` (Don\'t parse)
Supported values: ``markdown``, ``markdownv2``, ``html``
system_language_code (``str``, *optional*):
System language code. Default is ``en``
device_model (``str``, *optional*):
Device model. Default is ``None`` (auto-detect)
files_directory (``str``, *optional*):
Directory for storing files and database
database_encryption_key (``str`` | ``bytes``):
Encryption key for database encryption
use_test_dc (``bool``, *optional*):
If set to true, the Telegram test environment will be used instead of the production environment. Default is ``False``
use_file_database (``bool``, *optional*):
If set to true, information about downloaded and uploaded files will be saved between application restarts. Default is ``True``
use_chat_info_database (``bool``, *optional*):
If set to true, the library will maintain a cache of users, basic groups, supergroups, channels and secret chats. Implies ``use_file_database``. Default is ``True``
use_message_database (``bool``, *optional*):
If set to true, the library will maintain a cache of chats and messages. Implies use_chat_info_database. Default is ``True``
loop (:py:class:`asyncio.AbstractEventLoop`, *optional*):
Event loop. Default is ``None`` (auto-detect)
options (``dict``, *optional*):
Pass key-value dictionary to set TDLib options. Check the list of available options at https://core.telegram.org/tdlib/options
workers (``int``, *optional*):
Number of workers to handle updates. Default is ``5``. If set to ``None``, updates will be immediately handled instead of being queued, which can impact performance.
default_handlers_timeout (``float``, *optional*):
Default timeout for handlers. If set, each handler will be awaited with this timeout (ignored if ``timeout`` is set when registering handler). Default is ``None`` (no timeout)
no_updates (``bool``, *optional*):
Whether the client should handle updates or not. Applicable only when using [TDLib Server](https://github.com/pytdbot/tdlib-server). Default is ``False``
td_verbosity (``int``, *optional*):
Verbosity level of TDLib. Default is ``2``
td_log (:class:`~pytdbot.types.LogStream`, *optional*):
Log stream. Default is ``None`` (Log to ``stdout``)
user_bot (``bool``, *optional*):
Pass ``True`` if this is a user-bot. Default is ``False``
"""
[docs]
def __init__(
self,
token: str = None,
api_id: int = None,
api_hash: str = None,
rabbitmq_url: str = None,
instance_id: str = None,
lib_path: str = None,
plugins: Plugins = None,
default_parse_mode: str = None,
system_language_code: str = "en",
device_model: str = None,
files_directory: str = None,
database_encryption_key: Union[str, bytes] = None,
use_test_dc: bool = False,
use_file_database: bool = True,
use_chat_info_database: bool = True,
use_message_database: bool = True,
loop: asyncio.AbstractEventLoop = None,
options: dict = None,
workers: int = 5,
queue_size: int = 1000,
default_handlers_timeout: float = None,
no_updates: bool = False,
load_messages_before_reply: bool = False,
td_verbosity: int = 2,
td_log: LogStream = None,
user_bot: bool = False,
server_ack: bool = True,
) -> None:
self.__api_id = api_id
self.__api_hash = api_hash
self.__rabbitmq_url = rabbitmq_url
self._rabbitmq_instance_id = (
instance_id if isinstance(instance_id, str) else create_extra_id(4)
)
self.__token = token
self.__database_encryption_key = database_encryption_key
self.files_directory = files_directory
self.lib_path = lib_path
self.plugins = plugins
self.default_parse_mode = (
default_parse_mode
if isinstance(default_parse_mode, str)
and default_parse_mode.lower() in {"markdown", "markdownv2", "html"}
else None
)
self.system_language_code = system_language_code
self.device_model = device_model
self.use_test_dc = use_test_dc
self.use_file_database = use_file_database
self.use_chat_info_database = use_chat_info_database
self.use_message_database = use_message_database
self.td_options = options
self.workers = workers
self.queue_size = queue_size
self.default_handlers_timeout = default_handlers_timeout
self.no_updates = no_updates
self.load_messages_before_reply = load_messages_before_reply
self.queue = asyncio.Queue()
self.user_bot = user_bot
self.server_ack = server_ack
self.my_id = (
get_bot_id_from_token(self.__token)
if isinstance(self.__token, str)
else None
)
self.client_id = None
self.client_manager = None
self.logger = getLogger(f"{__name__}:{self.my_id or 0}")
self.td_verbosity = td_verbosity
self.td_log = td_log
self.connection_state: str = None
self.is_running = None
self.me: types.User = None
self.is_authenticated = False
self.is_reloading_plugins = False
self.is_rabbitmq = True if rabbitmq_url else False
self.options = {}
self.allow_outgoing_message_types: tuple = (types.MessagePaymentRefunded,)
self.get_message_methods = {
"getmessage",
"getmessagelocally",
"getrepliedmessage",
"getcallbackquerymessage",
} # TODO: improve this
self._check_init_args()
self._handlers = {"initializer": [], "finalizer": []}
self._current_handlers = {}
self._results: Dict[str, asyncio.Future] = {}
self._workers_tasks = None
self.__rabbitmq_iterator_task = None
self.__authorization_state = None
self.__cache = {"is_coro_filter": {}}
self.__local_handlers = {
"updateAuthorizationState": self.__handle_authorization_state,
"updateMessageSendSucceeded": self.__handle_update_message_succeeded,
"updateMessageSendFailed": self.__handle_update_message_failed,
"updateConnectionState": self.__handle_connection_state,
"updateOption": self.__handle_update_option,
"updateUser": self.__handle_update_user,
}
self.__is_queue_worker = False
self.__is_closing = False
# RabbitMQ
self.__rqueues = None
self.__rconnection = None
self.__rchannel = None
self.loop = (
loop if isinstance(loop, asyncio.AbstractEventLoop) else get_running_loop()
)
if plugins is not None:
self._load_plugins()
self.logger.info(f"Pytdbot v{pytdbot.VERSION}")
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
await self.stop()
except Exception:
pass
@property
def authorization_state(self) -> str:
r"""Current authorization state"""
return self.__authorization_state
[docs]
async def getServerStats(
self,
) -> Union["pytdbot.types.ServerStats", "pytdbot.types.Error"]:
"""Returns TDLib Server stats"""
self._check_rabbitmq()
return await self.invoke({"@type": "getServerStats"})
[docs]
async def scheduleEvent(
self, name: str, payload: str, send_at: int
) -> Union["pytdbot.types.ScheduledEvent", "pytdbot.types.Error"]:
"""Schedule an event
Parameters:
name (:class:`str`):
Event name
payload (:class:`str`):
The event payload to be scheduled
send_at (:class:`int`):
Unix timestamp when the event should be sent
"""
self._check_rabbitmq()
if not isinstance(name, str):
raise ValueError("name must be str")
if not isinstance(payload, str):
raise ValueError("payload must be str")
if not isinstance(send_at, (int, float)):
raise ValueError("send_at must be int")
return await self.invoke(
{
"@type": "scheduleEvent",
"name": name,
"payload": payload,
"send_at": send_at,
}
)
[docs]
async def cancelScheduledEvent(
self, event_id: str
) -> Union["pytdbot.types.Ok", "pytdbot.types.Error"]:
"""Cancel a scheduled event
Parameters:
event_id (:class:`str`):
Event ID to cancel
"""
self._check_rabbitmq()
if not isinstance(event_id, str):
raise ValueError("event_id must be str")
return await self.invoke(
{"@type": "cancelScheduledEvent", "event_id": event_id}
)
[docs]
async def start(self) -> None:
r"""Start pytdbot client"""
if not self.is_running:
self.logger.info("Starting pytdbot client...")
if self.is_rabbitmq:
await self.__start_rabbitmq()
elif not self.client_manager:
self.client_manager = ClientManager(
self, self.lib_path, self.td_verbosity, loop=self.loop
)
await self.client_manager.start()
self.is_running = True
if isinstance(self.td_log, LogStream) and not self.is_rabbitmq:
await self.__send(
{"@type": "setLogStream", "log_stream": obj_to_dict(self.td_log)}
)
if isinstance(self.workers, int):
self._workers_tasks = [
self.loop.create_task(
self._queue_update_worker()
if not self.is_rabbitmq
else self.__rabbitmq_worker()
)
for _ in range(self.workers)
]
self.__is_queue_worker = True
self.logger.info(f"Started with {self.workers} workers")
elif self.is_rabbitmq:
raise ValueError("workers must be an int when using TDLib Server")
else:
self.__is_queue_worker = False
self.logger.info("Started with unlimited updates processes")
self.loop.create_task(
self.getOption("version")
) # Ping TDLib to start processing updates
[docs]
def add_handler(
self,
update_type: Union[Type["pytdbot.types.Update"], str],
func: Callable,
filters: pytdbot.filters.Filter = None,
position: int = None,
inner_object: bool = False,
timeout: float = None,
is_from_plugin: bool = False,
) -> None:
r"""Add an update handler
Parameters:
update_type (``str`` || :class:`~pytdbot.types.Update`):
An update type
func (``Callable``):
A callable function
filters (:class:`~pytdbot.filters.Filter`, *optional*):
message filter
position (``int``, *optional*):
The function position in handlers list. Default is ``None`` (append)
inner_object (``bool``, *optional*):
Wether to pass an inner object of update or not; for example ``UpdateNewMessage.message``. Default is ``False``
timeout (``float``, *optional*):
Max execution time for the handler before it timeout. If ``None``, ``Client.default_handlers_timeout`` is preferred. Default is ``None``
is_from_plugin (``bool``, *optional*):
Wether this handler is from a loaded plugin (this can help reloading plugin during runtime; for development only). Default is ``False``
Raises:
TypeError
"""
if not isinstance(update_type, str):
if issubclass(update_type, types.Update):
update_type = update_type.getType()
else:
raise TypeError(
"update_type must be str or subclass of pytdbot.types.Update"
)
if not isinstance(func, Callable):
raise TypeError("func must be callable")
if filters is not None and not isinstance(filters, Filter):
raise TypeError("filters must be instance of pytdbot.filters.Filter")
handler = Handler(
func=func,
update_type=update_type,
filter=filters,
position=position,
inner_object=inner_object,
timeout=timeout,
is_from_plugin=is_from_plugin,
)
if update_type not in self._handlers:
self._handlers[update_type] = []
if isinstance(position, int):
self._handlers[update_type].insert(position, handler)
else:
self._handlers[update_type].append(handler)
self._update_handlers()
[docs]
def reload_plugins(self):
"""Reload all plugins, non-plugin handlers are not ``reloaded``
.. note::
This is for ``development purposes only`` and should not be used
in production environments
"""
if self.is_reloading_plugins:
return
self.is_reloading_plugins = True
for handlers in self._handlers.values():
for handler in handlers.copy():
if handler.is_from_plugin:
self.remove_handler(handler.func)
self._load_plugins(reload_plugins=True)
self.is_reloading_plugins = False
[docs]
def remove_handler(self, func: Callable) -> bool:
r"""Remove an update handler
Parameters:
func (``Callable``):
A callable function
Raises:
TypeError
Returns:
:py:class:`bool`: True if handler was removed, False otherwise
"""
if not isinstance(func, Callable):
raise TypeError("func must be callable")
removed = False
for handlers in self._handlers.values():
for handler in handlers.copy():
if handler.func == func:
handlers.remove(handler)
removed = True
if removed:
self._update_handlers()
return removed
[docs]
async def invoke(
self,
request: dict,
) -> types.TlObject:
r"""Invoke a new TDLib request
Example:
.. code-block:: python
from pytdbot import Client
async with Client(...) as client:
res = await client.invoke({"@type": "getOption", "name": "version"})
if not isinstance(res, types.Error):
print(res)
Parameters:
request (``dict``):
The request to be sent
Returns:
:class:`~pytdbot.types.Result`
"""
request = obj_to_dict(request)
request["@extra"] = {"id": create_extra_id()}
request_method = request["@type"].lower()
if (
self.logger.root.level >= DEBUG or self.logger.level >= DEBUG
): # dumping all requests may create performance issues
self.logger.debug(f"Sending: {dumps(request, indent=4)}")
is_chat_attempted_load = request_method == "getchat"
is_message_attempted_load = request_method in self.get_message_methods
while True:
future = self._create_request_future(request)
await self.__send(request)
result = await future
if not isinstance(result, types.Error):
break
error_code = result.code
error_message = result.message
if error_message.startswith(
"Failed to parse JSON object as TDLib request:"
):
raise ValueError(error_message)
if error_code != 400:
break
chat_id = request.get("chat_id")
message_id = request.get("message_id")
if not is_message_attempted_load and (
error_message == "Message not found" and (chat_id and message_id)
):
is_message_attempted_load = True
self.logger.debug(f"Attempt to load message {message_id} in {chat_id}")
message = await self.getMessage(chat_id=chat_id, message_id=message_id)
if message:
self.logger.debug(f"Message {message_id} in {chat_id} is loaded")
continue
else:
self.logger.debug(
f"Failed to load message {message_id} in {chat_id}"
)
if not is_chat_attempted_load and (
error_message == "Chat not found" and chat_id
):
is_chat_attempted_load = True
self.logger.debug(f"Attempt to load chat {chat_id}")
chat = await self.getChat(chat_id)
if not isinstance(chat, types.Error):
self.logger.debug(f"Chat {chat_id} is loaded")
reply_to_message_id = (request.get("reply_to") or {}).get(
"message_id", 0
)
# if the request is a reply to another message
# load the replied message to avoid "Message not found"
if reply_to_message_id > 0:
await self.getMessage(chat_id, reply_to_message_id)
continue
else:
self.logger.error(f"Couldn't load chat {chat_id}")
break
return result
[docs]
async def call_method(self, method: str, **kwargs) -> types.TlObject:
r"""Call a method. with keyword arguments (``kwargs``) support
Example:
.. code-block:: python
from pytdbot import Client
async with Client(...) as client:
res = await client.call_method("getOption", name="version"})
if not isinstance(res, types.Error):
print(res)
Parameters:
method (``str``):
TDLib method name
Returns:
Any :class:`~pytdbot.types.TlObject`
"""
kwargs["@type"] = method
return await self.invoke(kwargs)
[docs]
def run(self) -> None:
r"""Start the client and block until the client is stopped
Example:
.. code-block:: python
from pytdbot import Client
client = Client(...)
@client.on_updateNewMessage()
async def new_message(c,update):
await update.reply_text('Hello!')
client.run()
"""
self._register_signal_handlers()
self.loop.run_until_complete(self.start())
self.loop.run_until_complete(self.idle())
[docs]
async def idle(self):
r"""Idle and wait until the client is stopped."""
while self.is_running:
await asyncio.sleep(1)
[docs]
async def stop(self) -> bool:
r"""Stop the client
Raises:
`RuntimeError`:
If the instance is already stopped
Returns:
:py:class:`bool`: ``True`` on success
"""
if (
self.is_running is False
and self.authorization_state == "authorizationStateClosed"
):
raise RuntimeError("Instance is not running")
self.logger.info("Waiting for TDLib to close...")
self.__is_closing = True
if self.authorization_state not in {
"authorizationStateClosing",
"authorizationStateClosed",
}:
await self.close()
while self.authorization_state != "authorizationStateClosed":
await asyncio.sleep(0.1)
if self.is_rabbitmq:
await self.__rchannel.close()
await self.__rconnection.close()
self.__stop_client()
if self.client_manager and not self.client_manager.start_clients_on_add:
await self.client_manager.close()
self.logger.info("Instance closed")
return True
def _create_request_future(
self, request: dict, result_id: str = None, handle_result: bool = True
) -> asyncio.Future:
result = asyncio.Future()
result.request = request
if handle_result:
self._results[
result_id if result_id is not None else request["@extra"]["id"]
] = result
return result
async def __send(self, request: dict) -> None:
if self.is_rabbitmq:
await self.__rchannel.default_exchange.publish(
aio_pika.Message(
json_dumps(request, encode=True),
reply_to=self.__rqueues["responses"].name,
),
routing_key=self.__rqueues["requests"].name,
)
else:
self.client_manager.send(self.client_id, request)
def _check_rabbitmq(self):
assert self.is_rabbitmq, "This method is only available for TDLib Server"
def _check_init_args(self):
if self.user_bot:
return
if not self.is_rabbitmq:
if not isinstance(self.__api_id, int):
raise TypeError("api_id must be an int")
if not isinstance(self.__api_hash, str):
raise TypeError("api_hash must be a str")
if not isinstance(self.__database_encryption_key, (str, bytes)):
raise TypeError("database_encryption_key must be str or bytes")
if not isinstance(self.files_directory, str):
raise TypeError("files_directory must be a str")
if not isinstance(self.td_verbosity, int):
raise TypeError("td_verbosity must be an int")
if self.__token and not self.my_id:
raise ValueError("Invalid bot token")
if isinstance(self.workers, int) and self.workers < 1:
raise ValueError("workers must be greater than 0")
def _update_handlers(self):
self._current_handlers = {
k: tuple(sorted(v, key=lambda x: (x.position is None, x.position)))
for k, v in self._handlers.items()
}
def _load_plugins(self, reload_plugins: bool = False):
count = 0
handlers = 0
plugin_paths = sorted(Path(self.plugins.folder).rglob("*.py"))
if self.plugins.include:
plugin_paths = [
path
for path in plugin_paths
if ".".join(path.parent.parts + (path.stem,)) in self.plugins.include
]
elif self.plugins.exclude:
plugin_paths = [
path
for path in plugin_paths
if ".".join(path.parent.parts + (path.stem,))
not in self.plugins.exclude
]
for path in plugin_paths:
module_path = ".".join(path.parent.parts + (path.stem,))
try:
module = import_module(module_path)
if reload_plugins:
reload_module(module)
except Exception:
self.logger.exception(f"Failed to import plugin {module_path}")
continue
plugin_handlers_count = 0
handlers_to_load = []
handlers_to_load += [
obj._handler
for obj in vars(module).values()
if hasattr(obj, "_handler")
and isinstance(obj._handler, Handler)
and obj._handler not in handlers_to_load
]
for handler in handlers_to_load:
if asyncio.iscoroutinefunction(handler.func):
self.add_handler(
update_type=handler.update_type,
func=handler.func,
filters=handler.filter,
position=handler.position,
inner_object=handler.inner_object,
timeout=handler.timeout,
is_from_plugin=True,
)
handlers += 1
plugin_handlers_count += 1
self.logger.debug(
f"Handler {handler.func} added from {module_path}"
)
else:
self.logger.warning(
f"Handler {handler.func} is not an async function from module {module_path}"
)
count += 1
self.logger.debug(
f"Plugin {module_path} is fully imported with {plugin_handlers_count} handlers"
)
self.logger.info(f"From {count} plugins got {handlers} handlers")
[docs]
def is_coro_filter(self, func: Callable) -> bool:
if func in self.__cache["is_coro_filter"]:
return self.__cache["is_coro_filter"][func]
else:
is_coro = asyncio.iscoroutinefunction(func)
self.__cache["is_coro_filter"][func] = is_coro
return is_coro
[docs]
async def process_update(self, update):
if not update:
self.logger.warning("Received None update")
return
if (
self.logger.root.level >= DEBUG or self.logger.level >= DEBUG
): # dumping all results may create performance issues
self.logger.debug(f"Received: {dumps(update, indent=4)}")
if "@extra" in update:
if result := self._results.pop(update["@extra"]["id"], None):
obj = dict_to_obj(update, self)
result.set_result(obj)
elif update["@type"] == "error" and "option" in update["@extra"]:
self.logger.error(f"{update['@extra']['option']}: {update['message']}")
else:
update_handler = self.__local_handlers.get(update["@type"])
update = dict_to_obj(update, self)
if update_handler:
self.loop.create_task(update_handler(update))
if not self.is_rabbitmq and self.__is_queue_worker:
self.queue.put_nowait(update)
else:
await self._handle_update(update)
[docs]
def get_inner_object(self, update: types.TlObject):
if isinstance(update, types.UpdateNewMessage):
return update.message
return update
async def __run_initializers(self, update):
inner_object = self.get_inner_object(update)
for initializer in self._current_handlers["initializer"]:
try:
handler_value = inner_object if initializer.inner_object else update
if initializer.filter is not None:
filter_function = initializer.filter.func
if self.is_coro_filter(filter_function):
if not await filter_function(self, handler_value):
continue
elif not filter_function(self, handler_value):
continue
if (
self.default_handlers_timeout is None
and initializer.timeout is None
):
await initializer(self, handler_value)
else:
timeout = initializer.timeout or self.default_handlers_timeout
try:
await asyncio.wait_for(
initializer(self, handler_value),
timeout=timeout,
)
except asyncio.TimeoutError:
self.logger.warning(
f"Initializer {initializer} timed out after {timeout} seconds"
)
except StopHandlers as e:
raise e
except Exception:
self.logger.exception(f"Initializer {initializer} failed")
async def __run_handlers(self, update):
inner_object = self.get_inner_object(update)
for handler in self._current_handlers[update.getType()]:
try:
handler_value = inner_object if handler.inner_object else update
if handler.filter is not None:
filter_function = handler.filter.func
if self.is_coro_filter(filter_function):
if not await filter_function(self, handler_value):
continue
elif not filter_function(self, handler_value):
continue
if self.default_handlers_timeout is None and handler.timeout is None:
await handler(self, handler_value)
else:
timeout = handler.timeout or self.default_handlers_timeout
try:
await asyncio.wait_for(
handler(self, handler_value), timeout=timeout
)
except asyncio.TimeoutError:
self.logger.warning(
f"Handler {handler} timed out after {timeout} seconds"
)
except StopHandlers as e:
raise e
except Exception:
self.logger.exception(f"Exception in {handler}")
async def __run_finalizers(self, update):
inner_object = self.get_inner_object(update)
for finalizer in self._current_handlers["finalizer"]:
try:
handler_value = inner_object if finalizer.inner_object else update
if finalizer.filter is not None:
filter_function = finalizer.filter.func
if self.is_coro_filter(filter_function):
if not await filter_function(self, handler_value):
continue
elif not filter_function(self, handler_value):
continue
if self.default_handlers_timeout is None and finalizer.timeout is None:
await finalizer(self, handler_value)
else:
try:
timeout = finalizer.timeout or self.default_handlers_timeout
await asyncio.wait_for(
finalizer(self, handler_value),
timeout=timeout,
)
except asyncio.TimeoutError:
self.logger.warning(
f"Finalizer {finalizer} timed out after {timeout} seconds"
)
except StopHandlers as e:
raise e
except Exception:
self.logger.exception(f"Finalizer {finalizer} failed")
async def _handle_update(self, update):
if update.getType() in self._current_handlers:
if (
not self.user_bot
and isinstance(update, types.UpdateNewMessage)
and not isinstance(
update.message.content, self.allow_outgoing_message_types
)
and update.message.is_outgoing
):
return
try:
await self.__run_initializers(update)
await self.__run_handlers(update)
except StopHandlers:
pass
finally:
await self.__run_finalizers(update)
async def _queue_update_worker(self):
self.is_running = True
while self.is_running:
try:
await self._handle_update(await self.queue.get())
except Exception:
self.logger.exception("Got worker exception")
[docs]
async def set_td_parameters(self):
r"""Make a call to :meth:`~pytdbot.Client.setTdlibParameters` with the current client init parameters
Raises:
`AuthorizationError`
"""
if self.is_rabbitmq:
return
if isinstance(self.__database_encryption_key, str):
self.__database_encryption_key = self.__database_encryption_key.encode(
"utf-8"
)
res = await self.setTdlibParameters(
use_test_dc=self.use_test_dc,
api_id=self.__api_id,
api_hash=self.__api_hash,
system_language_code=self.system_language_code,
device_model=f"{python_implementation()} {python_version()}",
use_file_database=self.use_file_database,
use_chat_info_database=self.use_chat_info_database,
use_message_database=self.use_message_database,
use_secret_chats=False,
system_version=None,
files_directory=self.files_directory,
database_encryption_key=self.__database_encryption_key,
database_directory=join_path(self.files_directory, "database"),
application_version=f"Pytdbot {pytdbot.__version__}",
)
if isinstance(res, types.Error):
await self.stop()
raise AuthorizationError(res.message)
async def _set_options(self):
if not isinstance(self.td_options, dict):
return
for k, v in self.td_options.items():
v_type = type(v)
if v_type is str:
data = {"@type": "optionValueString", "value": v}
elif v_type is int:
data = {"@type": "optionValueInteger", "value": v}
elif v_type is bool:
data = {"@type": "optionValueBoolean", "value": v}
else:
raise ValueError(f"Option {k} has unsupported type {v_type}")
await self.__send(
{
"@type": "setOption",
"name": k,
"value": data,
"@extra": {"option": k, "value": v, "id": ""},
}
)
self.logger.debug(f"Option {k} sent with value {v}")
async def __handle_authorization_state(
self, update: types.UpdateAuthorizationState
):
self.__authorization_state = update.authorization_state.getType()
self.logger.info(
f"Authorization state changed to {self.authorization_state.removeprefix('authorizationState')}"
)
if self.authorization_state == "authorizationStateWaitTdlibParameters":
await self._set_options()
await self.set_td_parameters()
elif self.authorization_state == "authorizationStateWaitPhoneNumber":
self._print_welcome()
await self.__handle_authorization_state_wait_phone_number()
elif self.authorization_state == "authorizationStateReady":
self.is_authenticated = True
self.me = await self.getMe()
if isinstance(self.me, types.Error):
self.logger.error(f"Get me error: {self.me.message}")
self.logger.info(
f"Logged in as {self.me.first_name} "
f"{str(self.me.id) if not self.me.usernames else '@' + self.me.usernames.editable_username}"
)
if self.authorization_state == "authorizationStateClosing":
self.__is_closing = True
elif (
self.authorization_state == "authorizationStateClosed"
and self.__is_closing is False
):
await self.stop()
async def __handle_connection_state(self, update: types.UpdateConnectionState):
self.connection_state: str = update.state.getType()
self.logger.info(
f"Connection state changed to {self.connection_state.removeprefix('connectionState')}"
)
async def __handle_update_message_succeeded(
self, update: types.UpdateMessageSendSucceeded
):
m_id = f"{update.message.chat_id}:{update.old_message_id}"
if result := self._results.pop(m_id, None):
result.set_result(update.message)
async def __handle_update_message_failed(
self, update: types.UpdateMessageSendFailed
):
m_id = f"{update.message.chat_id}:{update.old_message_id}"
if result := self._results.pop(m_id, None):
result.set_result(update.error)
async def __handle_update_option(self, update: types.UpdateOption):
if isinstance(update.value, types.OptionValueBoolean):
self.options[update.name] = bool(update.value.value)
elif isinstance(update.value, types.OptionValueEmpty):
self.options[update.name] = None
elif isinstance(update.value, types.OptionValueInteger):
self.options[update.name] = int(update.value.value)
else:
self.options[update.name] = update.value.value
if update.name == "my_id":
self.my_id = str(update.value.value)
if self.is_authenticated:
self.logger.info(
f"Option {update.name} changed to {self.options[update.name]}"
)
async def __get_updates_queue(self, retries=10, delay=2):
for attempt in range(retries):
try:
return await self.__rchannel.get_queue(self.my_id + "_updates")
except aio_pika.exceptions.ChannelNotFoundEntity:
self.logger.warning(
f"Attempt {attempt + 1}: TDLib Server is not running. Retrying in {delay} seconds..."
)
await asyncio.sleep(delay)
self.logger.error(
f"Could not connect to TDLib Server after {retries} attempts."
)
raise AuthorizationError(
f"Could not connect to TDLib Server after {delay * retries} seconds timeout"
)
async def __start_rabbitmq(self):
self.__rconnection = await aio_pika.connect_robust(
self.__rabbitmq_url,
client_properties={
"connection_name": f"Pytdbot instance {self._rabbitmq_instance_id}"
},
)
self.__rchannel = await self.__rconnection.channel()
self.logger.info("Connected to TDLib server via RabbitMQ")
updates_queue = await self.__get_updates_queue()
notify_queue = await self.__rchannel.declare_queue(
f"{self.my_id}_notify_{self._rabbitmq_instance_id}", exclusive=True
)
await notify_queue.bind(await self.__rchannel.get_exchange("broadcast"))
responses_queue = await self.__rchannel.declare_queue(
f"{self.my_id}_res_{self._rabbitmq_instance_id}", exclusive=True
)
self.__rqueues = {
"updates": updates_queue,
"requests": await self.__rchannel.get_queue(f"{self.my_id}_requests"),
"notify": notify_queue,
"responses": responses_queue,
}
self.is_running = True
await self.__rqueues["responses"].consume(self.__on_update, no_ack=True)
await self._set_options()
res = await self.getCurrentState()
for update in res.updates:
# when using obj_to_dict the key "@client_id" won't exists
# since it's not part of the object
await self.process_update(obj_to_dict(update))
if not self.no_updates:
self.__rabbitmq_iterator_task = self.loop.create_task(
self.__rabbitmq_iterator()
)
await self.__rqueues["notify"].consume(self.__on_update, no_ack=True)
async def __rabbitmq_iterator(self):
async with self.__rqueues["updates"].iterator(
no_ack=not self.server_ack
) as iterator:
async for message in iterator:
if self.queue.qsize() > self.queue_size:
await message.nack(requeue=True)
continue
self.queue.put_nowait(message)
async def __rabbitmq_worker(self):
while self.is_running:
try:
message: aio_pika.IncomingMessage = self.queue.get_nowait()
except asyncio.QueueEmpty:
message: aio_pika.IncomingMessage = await self.queue.get()
try:
update = json_loads(message.body)
if self.__is_closing and not isinstance(
update, types.UpdateAuthorizationState
):
await message.nack(requeue=True)
continue
await self.process_update(update)
except Exception:
self.logger.exception("Error processing message")
await message.ack() # ack after processing
async def __handle_rabbitmq_message(self, message: aio_pika.IncomingMessage):
await self.process_update(json_loads(message.body))
async def __on_update(self, update):
self.loop.create_task(self.__handle_rabbitmq_message(update))
async def __handle_update_user(self, update: types.UpdateUser):
if self.is_authenticated and self.me and update.user.id == self.me.id:
self.logger.info(
f"Updating {self.me.first_name} "
f"({str(self.me.id) if not self.me.usernames else '@' + self.me.usernames.editable_username}) info"
)
try:
deepdiff(self, obj_to_dict(self.me), obj_to_dict(update.user))
except Exception:
self.logger.exception("deepdiff failed")
self.me = update.user
async def __handle_authorization_state_wait_phone_number(self):
if (
self.is_rabbitmq
or self.authorization_state != "authorizationStateWaitPhoneNumber"
or not self.__token
):
return
res = await self.checkAuthenticationBotToken(self.__token)
if isinstance(res, types.Error):
await self.stop()
raise AuthorizationError(res.message)
def __stop_client(self) -> None:
self.is_authenticated = False
self.is_running = False
if self.__rabbitmq_iterator_task:
self.__rabbitmq_iterator_task.cancel()
if self.__is_queue_worker:
for worker_task in self._workers_tasks:
worker_task.cancel()
def _register_signal_handlers(self):
def _handle_signal():
self.loop.create_task(self.stop())
for sig in (
signal.SIGINT,
signal.SIGTERM,
signal.SIGABRT,
signal.SIGSEGV,
):
self.loop.remove_signal_handler(sig)
if current_thread() is main_thread():
try:
for sig in (
signal.SIGINT,
signal.SIGTERM,
signal.SIGABRT,
signal.SIGSEGV,
):
self.loop.add_signal_handler(sig, _handle_signal)
except NotImplementedError: # Windows dosen't support add_signal_handler
pass
def _print_welcome(self):
print(f"Welcome to Pytdbot (v{pytdbot.__version__}). {pytdbot.__copyright__}")
print(
f"Pytdbot is free software and comes with ABSOLUTELY NO WARRANTY. Licensed under the terms of {pytdbot.__license__}.\n\n"
)
[docs]
def deepdiff(self, d1, d2):
d1 = obj_to_dict(d1)
if not isinstance(d1, dict) or not isinstance(d2, dict):
return d1 == d2
deep = DeepDiff(d1, d2, ignore_order=True, view="tree")
for parent, diffs in deep.items():
for diff in diffs:
difflist = diff.path(output_format="list")
key = ".".join(map(str, difflist))
if parent in ("dictionary_item_added", "values_changed"):
self.logger.info(f"{key} changed to {diff.t2}")
elif parent == "dictionary_item_removed":
self.logger.info(f"{key} removed")