Source code for woob.capabilities.base

# Copyright(C) 2010-2013 Christophe Benz, Romain Bignon, Julien Hebert
# This file is part of woob.
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <>.

from __future__ import annotations

from typing import Dict, Iterable, Type, Any, Tuple, TypeVar, overload

from collections import OrderedDict, deque
import warnings
import re
from decimal import Decimal
from copy import deepcopy, copy

from import to_unicode

__all__ = [
    'UserError', 'FieldNotFound', 'NotAvailable', 'FetchError', 'NotLoaded',
    'Capability', 'Field', 'IntField', 'DecimalField', 'FloatField',
    'StringField', 'BytesField', 'BoolField', 'Enum', 'EnumField', 'empty',
    'BaseObject', 'find_object', 'find_object_any_match', 'strict_find_object',

class EnumMeta(type):
    def __prepare__(mcs, name, bases, **kwargs):
        # in python3.6, default namespace keeps declaration order
        # in python>=3 but <3.6, force ordered namespace
        # doesn't work in python2
        return OrderedDict()

    def __init__(cls, name, bases, attrs, *args, **kwargs):
        super(EnumMeta, cls).__init__(name, bases, attrs, *args, **kwargs)
        attrs = [(k, v) for k, v in attrs.items() if not callable(v) and not k.startswith('__')]
        cls.__members__ = OrderedDict(attrs)

    def __setattr__(cls, name, value):
        super(EnumMeta, cls).__setattr__(name, value)
        if not callable(value) and not name.startswith('__'):
            cls.__members__[name] = value

    def __call__(cls, *args, **kwargs):
        raise ValueError("Enum type can't be instanciated")

    def _items(cls):
        return cls.__members__.items()

    def _keys(cls):
        return cls.__members__.keys()

    def _values(cls):
        return cls.__members__.values()

    def _types(cls):
        return set(map(type, cls._values))

    def __iter__(cls):
        return iter(cls.__members__.values())

    def __len__(cls):
        return len(cls.__members__)

    def __contains__(cls, value):
        return value in cls.__members__.values()

    def __getitem__(cls, k):
        return cls.__members__[k]

[docs]class Enum(metaclass=EnumMeta): pass
[docs]def empty(value: Any) -> bool: """ Checks if a value is empty (None, NotLoaded or NotAvailable). :rtype: :class:`bool` """ return value is None or isinstance(value, EmptyType)
T = TypeVar('T') @overload def find_object( mylist: Iterable[T], error: None = None, **kwargs ) -> T | None: ... @overload def find_object( mylist: Iterable[T], error: Type[Exception], **kwargs ) -> T: ...
[docs]def find_object( mylist, error = None, **kwargs ): """ Very simple tools to return an object with the matching parameters in kwargs. """ for a in mylist: for key, value in kwargs.items(): if getattr(a, key) != value: break else: return a if error is not None: raise error() return None
@overload def find_object_any_match( objects: Iterable[T], key_value_pairs: Iterable[Tuple[str, Any]], error: None = None, with_priority: bool = True, ignore_empty: bool = True ) -> T | None: ... @overload def find_object_any_match( objects: Iterable[T], key_value_pairs: Iterable[Tuple[str, Any]], error: Type[Exception], with_priority: bool = True, ignore_empty: bool = True ) -> T: ...
[docs]def find_object_any_match( objects: Iterable[T], key_value_pairs: Iterable[Tuple[str, Any]], error: Type[Exception] | None = None, with_priority: bool = True, ignore_empty: bool = True ) -> T | None: """ Tool method that returns an object that match any parameter given in key_value_pairs. Compared with find_object, this method does not need all parameters to be matching. If no priority is set, the method will return the first successful match. With several key-values set, this may not be the object instance with the most key matches. :param objects: The list or iterator of objects that can match :type objectlist: list :param key_value_pairs: The key-values that the object should have to match. This is given as a list of pairs, where each pair represent a key-value, the first item being the key, and the second is the value. :type key_value_pairs: list :param error: The error to raise in case no object matches. :type error: Exception or None :param with_priority: If True, the key-value pairs order decides the priority of the matching. With priority enabled, an object matching with a more important key will be matched over another object matching with a less important keys. This means that all objects will be iterated in order to assure this priority, unless the most preferred key matches, in which case the iteration stops early. This feature is enabled by default. See the example sections for more details. :type with_priority: bool :param ignore_empty: If True, empty key-values (None, EmptyType) will not be matched. True by default. :type ignore_empty: bool :return: The matching object if any Examples: >>> class Point: ... def __init__(self, x, y): ... self.x = x ... self.y = y ... def __repr__(self): ... return f'Point({self.x},{self.y})' >>> points = (Point(0,0), Point(1, 2), Point(None, 3)) >>> find_object_any_match(points, (('x', 0), ('y', 1))) # Will match on x with first point Point(0,0) >>> find_object_any_match(points, (('x', 2), ('y', 2))) # Will match on y with second point Point(1,2) >>> find_object_any_match(points, (('x', 1), ('y', 0))) # Will match on x with second point because priority Point(1,2) >>> find_object_any_match(points, (('x', 1), ('y', 0)), with_priority=False) # Will return first successful match Point(0,0) >>> find_object_any_match(points, (('x', None),)) # second condition (None) is ignored by default None >>> find_object_any_match(points, (('x', None),), ignore_empty=False) # will match last point Point(None,3) >>> find_object_any_match(points, (('x', -1),), error=ValueError) # Will raise ValueError ValueError: """ # Will be used if with_priority is true found = None found_index = None for o in objects: for index, (key, value) in enumerate(key_value_pairs): # There is no point trying to match as it will be not priorized if found_index and found_index <= index: break o_value = getattr(o, key) if ( ignore_empty and (empty(value) or empty(o_value)) or o_value != value ): continue # First key is a special case with priority, we can return early # if we find a match on the first key # Also we only keep more priorized matches if not with_priority or index == 0: return o found = o found_index = index if not found and error is not None: raise error() return found
[docs]def strict_find_object(mylist: Iterable[BaseObject], error=None, **kwargs) -> BaseObject | None: """ Tools to return an object with the matching parameters in kwargs. Parameters with empty value are skipped """ kwargs = {k: v for k, v in kwargs.items() if not empty(v)} if kwargs: return find_object(mylist, error=error, **kwargs) if error is not None: raise error() return None
[docs]class UserError(Exception): """ Exception containing an error message for user. """
[docs]class FieldNotFound(Exception): """ A field isn't found. :param obj: object :type obj: :class:`BaseObject` :param field: field not found :type field: :class:`Field` """ def __init__(self, obj: BaseObject, field: str): super().__init__('Field "%s" not found for object %s' % (field, obj))
class ConversionWarning(UserWarning): """ A field's type was changed when setting it. Ideally, the module should use the right type before setting it. """ class AttributeCreationWarning(UserWarning): """ A non-field attribute has been created with a name not prefixed with a _. """ class EmptyType: """ Parent class for NotAvailableType, NotLoadedType and FetchErrorType. """ def __str__(self): return repr(self) def __copy__(self): return self def __deepcopy__(self, memo): return self def __nonzero__(self): return False __bool__ = __nonzero__ class NotAvailableType(EmptyType): """ NotAvailable is a constant to use on non available fields. """ def __repr__(self): return 'NotAvailable' def __str__(self): return 'Not available' NotAvailable = NotAvailableType() class NotLoadedType(EmptyType): """ NotLoaded is a constant to use on not loaded fields. When you use :func:`` on a object based on :class:`BaseObject`, it will request all fields with this value. """ def __repr__(self): return 'NotLoaded' def __str__(self): return 'Not loaded' NotLoaded = NotLoadedType() class FetchErrorType(EmptyType): """ FetchError is a constant to use when parsing a non-mandatory field raises an exception. """ def __repr__(self): return 'FetchError' def __str__(self): return 'Not mandatory' FetchError = FetchErrorType()
[docs]class Capability: """ This is the base class for all capabilities. A capability may define abstract methods (which raise :class:`NotImplementedError`) with an explicit docstring to tell backends how to implement them. Also, it may define some *objects*, using :class:`BaseObject`. """
[docs]class Field: """ Field of a :class:`BaseObject` class. :param doc: docstring of the field :type doc: :class:`str` :param args: list of types accepted :param default: default value of this field. If not specified, :class:`NotLoaded` is used. """ _creation_counter = 0 def __init__(self, doc, *args, **kwargs): self.types = () self.value = kwargs.get('default', NotLoaded) self.doc = doc self.mandatory = kwargs.get('mandatory', True) for arg in args: if isinstance(arg, type) or isinstance(arg, str): self.types += (arg,) else: raise TypeError('Arguments must be types or strings of type name') self._creation_counter = Field._creation_counter Field._creation_counter += 1
[docs] def convert(self, value): """ Convert value to the wanted one. """ return value
[docs]class IntField(Field): """ A field which accepts only :class:`int` types. """ def __init__(self, doc, **kwargs): super().__init__(doc, int, **kwargs)
[docs] def convert(self, value): return int(value)
[docs]class BoolField(Field): """ A field which accepts only :class:`bool` type. """ def __init__(self, doc, **kwargs): super().__init__(doc, bool, **kwargs)
[docs] def convert(self, value): return bool(value)
[docs]class DecimalField(Field): """ A field which accepts only :class:`decimal` type. """ def __init__(self, doc, **kwargs): super().__init__(doc, Decimal, **kwargs)
[docs] def convert(self, value): if isinstance(value, Decimal): return value return Decimal(value)
[docs]class FloatField(Field): """ A field which accepts only :class:`float` type. """ def __init__(self, doc, **kwargs): super(FloatField, self).__init__(doc, float, **kwargs)
[docs] def convert(self, value): return float(value)
[docs]class StringField(Field): """ A field which accepts only :class:`str` strings. """ def __init__(self, doc, **kwargs): super(StringField, self).__init__(doc, str, **kwargs)
[docs] def convert(self, value): return to_unicode(value)
[docs]class BytesField(Field): """ A field which accepts only :class:`bytes` strings. """ def __init__(self, doc, **kwargs): super(BytesField, self).__init__(doc, bytes, **kwargs)
[docs] def convert(self, value): if isinstance(value, str): value = value.encode('utf-8') return bytes(value)
[docs]class EnumField(Field): def __init__(self, doc, enum, **kwargs): if not issubclass(enum, Enum): raise TypeError('invalid enum type: %r' % enum) super(EnumField, self).__init__(doc, *enum._types, **kwargs) self.enum = enum
[docs] def convert(self, value): if value not in self.enum._values: raise ValueError('value %r does not belong to enum %s' % (value, self.enum)) return value
class _BaseObjectMeta(type): def __new__(cls, name, bases, attrs): fields = [(field_name, attrs.pop(field_name)) for field_name, obj in list(attrs.items()) if isinstance(obj, Field)] fields.sort(key=lambda x: x[1]._creation_counter) new_class = super(_BaseObjectMeta, cls).__new__(cls, name, bases, attrs) if new_class._fields is None: new_class._fields = OrderedDict() else: new_class._fields = deepcopy(new_class._fields) new_class._fields.update(fields) if new_class.__doc__ is None: new_class.__doc__ = '' for name, field in new_class._fields.items(): doc = '(%s) %s' % (', '.join([':class:`%s`' % v.__name__ if isinstance(v, type) else v for v in field.types]), field.doc) if field.value is not NotLoaded: doc += ' (default: %s)' % field.value new_class.__doc__ += '\n:var %s: %s' % (name, doc) return new_class
[docs]class BaseObject(metaclass=_BaseObjectMeta): """ This is the base class for a capability object. A capability interface may specify to return several kind of objects, to formalise retrieved information from websites. As python is a flexible language where variables are not typed, we use a system to force backends to set wanted values on all fields. To do that, we use the :class:`Field` class and all derived ones. For example:: class Transfer(BaseObject): " Transfer from an account to a recipient. " amount = DecimalField('Amount to transfer') date = Field('Date of transfer', str, date, datetime) origin = Field('Origin of transfer', int, str) recipient = Field('Recipient', int, str) The docstring is mandatory. """ id: str | None = None backend: str | None = None _fields: Dict[str, Field] = {} # XXX remove it? url = StringField('url') def __init__( self, id: str = '', url: str | NotLoadedType | NotAvailableType = NotLoaded, backend=None ): = id or '' self.backend = backend self._fields = deepcopy(self._fields) self.__setattr__('url', url) @property def fullid(self) -> str: """ Full ID of the object, in form '**ID@backend**'. """ return '%s@%s' % (, self.backend) def __iscomplete__(self) -> bool: """ Return True if the object is completed. It is useful when the object is a field of an other object which is going to be filled. The default behavior is to iter on fields (with iter_fields) and if a field is NotLoaded, return False. """ for key, value in self.iter_fields(): if value is NotLoaded: return False return True
[docs] def copy(self) -> BaseObject: obj = copy(self) obj._fields = copy(self._fields) for k in obj._fields: obj._fields[k] = copy(obj._fields[k]) return obj
def __deepcopy__(self, memo) -> BaseObject: return self.copy()
[docs] def set_empty_fields(self, value: Any, excepts=()): """ Set the same value on all empty fields. :param value: value to set on all empty fields :param excepts: if specified, do not change fields listed """ for key, old_value in self.iter_fields(): if empty(old_value) and key not in excepts: setattr(self, key, value)
[docs] def iter_fields(self) -> Iterable[Tuple[str, Any]]: """ Iterate on the fields keys and values. Can be overloaded to iterate on other things. :rtype: iter[(key, value)] """ if hasattr(self, 'id') and is not None: yield 'id', for name, field in self._fields.items(): yield name, field.value
def __eq__(self, obj) -> bool: if isinstance(obj, BaseObject): return self.backend == obj.backend and == else: return False def __getattr__(self, name: str) -> Any: if self._fields is not None and name in self._fields: return self._fields[name].value else: raise AttributeError("'%s' object has no attribute '%s'" % ( self.__class__.__name__, name)) def __setattr__(self, name: str, value: Any): try: attr = (self._fields or {})[name] except KeyError: if name not in dir(self) and not name.startswith('_'): warnings.warn('Creating a non-field attribute %s. Please prefix it with _' % name, AttributeCreationWarning, stacklevel=2) object.__setattr__(self, name, value) else: if not empty(value): try: # Try to convert value to the wanted one. nvalue = attr.convert(value) except (TypeError, ValueError, ArithmeticError): # error during conversion, it will probably not # match the wanted following types, so we'll # raise ValueError. pass else: # If the value was converted if nvalue is not value: warnings.warn('Value %s was converted from %s to %s' % (name, type(value), type(nvalue)), ConversionWarning, stacklevel=2) value = nvalue actual_types = _resolve_types(attr.types) if not isinstance(value, actual_types) and not empty(value): raise ValueError( 'Value for "%s" needs to be of type %r, not %r' % ( name, actual_types, type(value))) attr.value = value def __delattr__(self, name: str): try: self._fields.pop(name) except KeyError: object.__delattr__(self, name)
[docs] def to_dict(self) -> Dict[str, Any]: def iter_decorate(d): for key, value in d: if key == 'id' and self.backend is not None: value = self.fullid yield key, value fields_iterator = self.iter_fields() return OrderedDict(iter_decorate(fields_iterator))
def __getstate__(self) -> Dict[str, Any]: d = self.to_dict() d.update((k, v) for k, v in self.__dict__.items() if k != '_fields') return d
[docs] @classmethod def from_dict(cls, values: Dict[str, Any], backend: str | None = None): self = cls() for attr in values: setattr(self, attr, values[attr]) return self
def __setstate__(self, state: Dict[str, Any]): self._fields = deepcopy(self._fields) # because yaml does not call __init__ for k in state: setattr(self, k, state[k]) def __dir__(self): return list(super(BaseObject, self).__dir__()) + list(self._fields.keys())
def _resolve_types(types): actual_types = () for v in types: if isinstance(v, str): # the following is a (almost) copy/paste from # q = deque([object]) while q: t = q.popleft() if t.__name__ == v: actual_types += (t,) else: try: # keep looking! q.extend(t.__subclasses__()) except TypeError: # type.__subclasses__ needs an argument for # whatever reason. if t is type: continue else: raise else: actual_types += (v,) return actual_types class Currency: CURRENCIES = OrderedDict([ ('EUR', ('€', 'EURO', 'EUROS')), ('CHF', ('CHF',)), ('USD', ('$', '$US')), ('GBP', ('£',)), ('LBP', ('ل.ل',)), ('AED', ('AED',)), ('XOF', ('XOF',)), ('RUB', ('руб',)), ('SGD', ('SGD',)), ('BRL', ('R$',)), ('MXN', ('$',)), ('JPY', ('¥',)), ('TRY', ('₺', 'TRY')), ('RON', ('lei',)), ('COP', ('$',)), ('NOK', ('kr',)), ('CNY', ('¥',)), ('RSD', ('din',)), ('ZAR', ('rand',)), ('MYR', ('RM',)), ('HUF', ('Ft',)), ('HKD', ('HK$',)), ('TWD', ('NT$',)), ('QAR', ('QR',)), ('MAD', ('MAD',)), ('ARS', ('ARS',)), ('AUD', ('AUD',)), ('CAD', ('CAD',)), ('NZD', ('NZD',)), ('BHD', ('BHD',)), ('SEK', ('SEK',)), ('DKK', ('DKK',)), ('LUF', ('LUF',)), ('KZT', ('KZT',)), ('PLN', ('PLN',)), ('ILS', ('ILS',)), ('THB', ('THB',)), ('INR', ('₹', 'INR')), ('PEN', ('S/',)), ('IDR', ('Rp',)), ('KWD', ('KD',)), ('KRW', ('₩',)), ('CZK', ('Kč',)), ('EGP', ('E£',)), ('ISK', ('Íkr', 'kr')), ('XPF', ('XPF',)), ('SAR', ('SAR',)), ('BGN', ('лв',)), ('HRK', ('HRK', 'kn')), ]) EXTRACTOR = re.compile(r'[()\d\s,\.\-]', re.UNICODE) @classmethod def get_currency(cls: Type[Currency], text: str) -> str | None: """ >>> Currency.get_currency(u'42') None >>> Currency.get_currency(u'42 €') u'EUR' >>> Currency.get_currency(u'$42') u'USD' >>> Currency.get_currency(u'42.000,00€') u'EUR' >>> Currency.get_currency(u'$42 USD') u'USD' >>> Currency.get_currency(u'%42 USD') u'USD' >>> Currency.get_currency(u'US1D') None """ curtexts = cls.EXTRACTOR.sub(' ', text.upper()).split() for currency, symbols in cls.CURRENCIES.items(): for curtext in curtexts: if curtext == currency: return currency for symbol in symbols: if curtext == symbol: return currency return None @classmethod def currency2txt(cls, currency): _currency = cls.CURRENCIES.get(currency, ('',)) return _currency[0]
[docs]def capability_to_string(capability_klass: Type[Capability]) -> str: m = re.match(r'^Cap(\w+)', capability_klass.__name__) assert m is not None return
class DeprecatedFieldWarning(UserWarning): pass