1
0
mirror of https://github.com/eternnoir/pyTelegramBotAPI.git synced 2023-08-10 21:12:57 +03:00

Merge pull request #1652 from coder2020official/master

Added InputFile
This commit is contained in:
Badiboy 2022-08-12 17:35:20 +03:00 committed by GitHub
commit d42c8e2961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 16 deletions

View File

@ -87,6 +87,15 @@ def _make_request(token, method_name, method='get', params=None, files=None):
logger.debug("Request: method={0} url={1} params={2} files={3}".format(method, request_url, params, files).replace(token, token.split(':')[0] + ":{TOKEN}")) logger.debug("Request: method={0} url={1} params={2} files={3}".format(method, request_url, params, files).replace(token, token.split(':')[0] + ":{TOKEN}"))
read_timeout = READ_TIMEOUT read_timeout = READ_TIMEOUT
connect_timeout = CONNECT_TIMEOUT connect_timeout = CONNECT_TIMEOUT
if files:
files_copy = dict(files)
# process types.InputFile
for key, value in files_copy.items():
if isinstance(value, types.InputFile):
files[key] = value.file
if files and format_header_param: if files and format_header_param:
fields.format_header_param = _no_encode(format_header_param) fields.format_header_param = _no_encode(format_header_param)
if params: if params:

View File

@ -57,7 +57,7 @@ class SessionManager:
session_manager = SessionManager() session_manager = SessionManager()
async def _process_request(token, url, method='get', params=None, files=None, request_timeout=None): async def _process_request(token, url, method='get', params=None, files=None, request_timeout=None):
params = prepare_data(params, files) params = _prepare_data(params, files)
if request_timeout is None: if request_timeout is None:
request_timeout = REQUEST_TIMEOUT request_timeout = REQUEST_TIMEOUT
timeout = aiohttp.ClientTimeout(total=request_timeout) timeout = aiohttp.ClientTimeout(total=request_timeout)
@ -83,24 +83,17 @@ async def _process_request(token, url, method='get', params=None, files=None, re
if not got_result: if not got_result:
raise RequestTimeout("Request timeout. Request: method={0} url={1} params={2} files={3} request_timeout={4}".format(method, url, params, files, request_timeout, current_try)) raise RequestTimeout("Request timeout. Request: method={0} url={1} params={2} files={3} request_timeout={4}".format(method, url, params, files, request_timeout, current_try))
def _prepare_file(obj):
def prepare_file(obj):
""" """
returns os.path.basename for a given file Prepares file for upload.
:param obj:
:return:
""" """
name = getattr(obj, 'name', None) name = getattr(obj, 'name', None)
if name and isinstance(name, str) and name[0] != '<' and name[-1] != '>': if name and isinstance(name, str) and name[0] != '<' and name[-1] != '>':
return os.path.basename(name) return os.path.basename(name)
def _prepare_data(params=None, files=None):
def prepare_data(params=None, files=None):
""" """
prepare data for request. Adds the parameters and files to the request.
:param params: :param params:
:param files: :param files:
@ -111,18 +104,20 @@ def prepare_data(params=None, files=None):
if params: if params:
for key, value in params.items(): for key, value in params.items():
data.add_field(key, str(value)) data.add_field(key, str(value))
if files: if files:
for key, f in files.items(): for key, f in files.items():
if isinstance(f, tuple): if isinstance(f, tuple):
if len(f) == 2: if len(f) == 2:
filename, fileobj = f file_name, file = f
else: else:
raise ValueError('Tuple must have exactly 2 elements: filename, fileobj') raise ValueError('Tuple must have exactly 2 elements: filename, fileobj')
elif isinstance(f, types.InputFile):
file_name = f.file_name
file = f.file
else: else:
filename, fileobj = prepare_file(f) or key, f file_name, file = _prepare_file(f) or key, f
data.add_field(key, fileobj, filename=filename) data.add_field(key, file, filename=file_name)
return data return data

View File

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from io import IOBase
import logging import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from abc import ABC from abc import ABC
@ -6601,3 +6604,62 @@ class ChatAdministratorRights(JsonDeserializable, JsonSerializable, Dictionaryab
class InputFile:
"""
A class to send files through Telegram Bot API.
You need to pass a file, which should be an instance of :class:`io.IOBase` or
:class:`pathlib.Path`, or :obj:`str`.
If you pass an :obj:`str` as a file, it will be opened and closed by the class.
:param file: A file to send.
:type file: :class:`io.IOBase` or :class:`pathlib.Path` or :obj:`str`
.. code-block:: python3
:caption: Example on sending a file using this class
from telebot.types import InputFile
# Sending a file from disk
bot.send_document(
chat_id,
InputFile('/path/to/file/file.txt')
)
# Sending a file from an io.IOBase object
with open('/path/to/file/file.txt', 'rb') as f:
bot.send_document(
chat_id,
InputFile(f)
)
# Sending a file using pathlib.Path:
bot.send_document(
chat_id,
InputFile(pathlib.Path('/path/to/file/file.txt'))
)
"""
def __init__(self, file) -> None:
self._file, self.file_name = self._resolve_file(file)
def _resolve_file(self, file):
if isinstance(file, str):
_file = open(file, 'rb')
return _file, os.path.basename(_file.name)
elif isinstance(file, IOBase):
return file, os.path.basename(file.name)
elif isinstance(file, Path):
_file = open(file, 'rb')
return _file, os.path.basename(_file.name)
else:
raise TypeError("File must be a string or a file-like object(pathlib.Path, io.IOBase).")
@property
def file(self):
"""
File object.
"""
return self._file