Source code for woob.tools.misc

# Copyright(C) 2010-2014 Romain Bignon
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

from time import time, sleep
from typing import Optional, Union
import itertools
import locale
import os
import re
import sys
import traceback
import types
import unicodedata
import unidecode

__all__ = [
    'NO_DEFAULT',
    'NoDefaultType',
    'classproperty',
    'clean_text',
    'find_exe',
    'get_backtrace',
    'get_bytes_size',
    'iter_fields',
    'limit',
    'polling_loop',
    'to_unicode',
]

NEWLINES_RE = re.compile(r'\s+', flags=re.UNICODE)


[docs]class NoDefaultType: """Type for the NO_DEFAULT constant.""" __slots__ = () def __new__(mcls, *args, **kwargs): try: return mcls.__unique_value__ except AttributeError: value = super().__new__(mcls, *args, **kwargs) mcls.__unique_value__ = value return value def __repr__(self): return 'NO_DEFAULT'
NO_DEFAULT = NoDefaultType()
[docs]def get_backtrace(empty="Empty backtrace."): """ Try to get backtrace as string. Returns "Error while trying to get backtrace" on failure. """ try: info = sys.exc_info() if info != (None, None, None): trace = traceback.format_exception(*info) return "".join(trace) except: return "Error while trying to get backtrace" return empty
[docs]def get_bytes_size(size, unit_name): r"""Converts a unit and a number into a number of bytes. >>> get_bytes_size(2, 'KB') 2048.0 """ unit_data = { 'bytes': 1, 'KB': 1024, 'KiB': 1024, 'MB': 1024 * 1024, 'MiB': 1024 * 1024, 'GB': 1024 * 1024 * 1024, 'GiB': 1024 * 1024 * 1024, 'TB': 1024 * 1024 * 1024 * 1024, 'TiB': 1024 * 1024 * 1024 * 1024, } return float(size * unit_data.get(unit_name, 1))
[docs]def iter_fields(obj): for attribute_name in dir(obj): if attribute_name.startswith('_'): continue attribute = getattr(obj, attribute_name) if not isinstance(attribute, types.MethodType): yield attribute_name, attribute
[docs]def to_unicode(text): r""" >>> to_unicode('ascii') == u'ascii' True >>> to_unicode(u'utf\xe9'.encode('UTF-8')) == u'utf\xe9' True >>> to_unicode(u'unicode') == u'unicode' True """ if isinstance(text, str): return text if isinstance(text, memoryview): text = text.tobytes() if not isinstance(text, bytes): return str(text) try: return text.decode('utf-8') except UnicodeError: pass try: return text.decode('iso-8859-15') except UnicodeError: pass return text.decode('windows-1252', 'replace')
def guess_encoding(stdio): try: encoding = stdio.encoding or locale.getpreferredencoding() except AttributeError: encoding = None # ASCII or ANSI is most likely a user mistake if not encoding or encoding.lower() == 'ascii' or encoding.lower().startswith('ansi'): encoding = 'UTF-8' return encoding
[docs]def limit(iterator, lim): """Iterate on the lim first elements of iterator.""" count = 0 iterator = iter(iterator) while count < lim: yield next(iterator) count += 1
def ratelimit(group, delay): """ Simple rate limiting. Waits if the last call of lastlimit with this group name was less than delay seconds ago. The rate limiting is global, shared between any instance of the application and any call to this function sharing the same group name. The same group name should not be used with different delays. This function is intended to be called just before the code that should be rate-limited. This function is not thread-safe. For reasonably non-critical rate limiting (like accessing a website), it should be sufficient nevertheless. @param group [string] rate limiting group name, alphanumeric @param delay [int] delay in seconds between each call """ from tempfile import gettempdir path = os.path.join(gettempdir(), 'woob_ratelimit.%s' % group) while True: try: offset = time() - os.stat(path).st_mtime except OSError: with open(path, 'w'): pass offset = 0 if delay < offset: break sleep(delay - offset) os.utime(path, None)
[docs]def find_exe(basename): """ Find the path to an executable by its base name (such as 'gpg'). The executable can be overriden using an environment variable in the form `NAME_EXECUTABLE` where NAME is the specified base name in upper case. If the environment variable is not provided, the PATH will be searched both without and with a ".exe" suffix for Windows compatibility. If the executable can not be found, None is returned. """ env_exe = os.getenv('%s_EXECUTABLE' % basename.upper()) if env_exe and os.path.exists(env_exe) and os.access(env_exe, os.X_OK): return env_exe paths = os.getenv('PATH', os.defpath).split(os.pathsep) for path in paths: for ex in (basename, basename + '.exe'): fpath = os.path.join(path, ex) if os.path.exists(fpath) and os.access(fpath, os.X_OK): return fpath
[docs]def clean_text( text: str, *, remove_newlines: bool = True, normalize: Optional[Union[str, bool]] = 'NFC', transliterate: bool = False, ) -> str: """Clean a given text. This function is used by the :py:class:`woob.browser.filters.standard.CleanText` browser filter. :param text: The text to clean. :param remove_newlines: Whether to transform newlines into spaces or not. If this parameter is set to false, newlines will be normalized as UNIX-style newlines (U+000A). :param normalize: The Unicode normalization to apply, if relevant. :param transliterate: Whether to transliterate Unicode characters into ASCII characters, when possible. :return: The cleaned text. """ # For compatibility with legacy code that sets ``normalize`` to a boolean, # we bind boolean values for the option to actual, usable values. if normalize is False: normalize = None elif normalize is True: normalize = 'NFC' if remove_newlines: text = NEWLINES_RE.sub(' ', text) else: # normalize newlines and clean what is inside text = '\n'.join(clean_text(line) for line in text.splitlines()) text = text.strip() if normalize is not None: text = unicodedata.normalize(normalize, text) if transliterate: text = unidecode.unidecode(text) return text
[docs]def polling_loop(*, count=None, timeout=None, delay=5): """ Delay iterator for polling loops. The count or timeout must be specified; both can be simultaneously. The default delay is five seconds. :param count: Maximum number of iterations this loop can produce. Can be None for unlimited retries. :type count: int :param timeout: Maximum number of seconds to try the loop for. :type timeout: float :param delay: Delay in seconds between each iteration. :type delay: float """ if count is None and timeout is None: raise ValueError( 'polling_loop should be given at least a count or a timeout', ) counter = itertools.count() if count is not None: counter = range(count) if timeout is not None: timeout = time() + timeout for idx in counter: if idx > 0: if timeout is not None and time() + delay >= timeout: break sleep(delay) yield idx
[docs]class classproperty: """ Use it as a decorator to define a class property. >>> class C: ... @classproperty ... def VERSION(self): ... return '3.3' ... >>> C.VERSION '3.3' >>> C().VERSION '3.3' """ def __init__(self, f): self.f = f def __get__(self, obj, owner): return self.f(owner)