Source code for woob.browser.nss

# Copyright(C) 2016      Vincent A
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.

"""
Module to use NSS instead of OpenSSL in urllib3/requests.

As it was used by Firefox, it offered the same security level, to be sure woob
can access to a website the same way than a real browser.

.. deprecated:: 3.4
   NSS is not maintained by Mozilla anymore, so do not use it anymore.

   If there is an issue to access a website, you can override use the
   :class:`woob.browser.adapters.LowSecHTTPAdapter` with the
   :attr:`woob.browser.browsers.Browser.HTTP_ADAPTER_CLASS` attribute to be
   explicitely more permissive.
"""

# create db:
#   mkdir pki
#   certutil -N -d pki

# import certificate:
#   find -L /etc/ssl/certs -name "*.pem" | while read f; do certutil -A -d pki -i $f -n $f -t TCu,Cu,Tu; done

from functools import wraps
from io import RawIOBase, BufferedRWPair
import hashlib
import os
import re
import socket
import ssl as basessl
import subprocess
from tempfile import NamedTemporaryFile
from threading import Lock
import warnings

try:
    import nss.ssl
    import nss.error
    import nss.nss
except ImportError:
    raise ImportError('Please install python3-nss')
from requests.packages.urllib3.util.ssl_ import ssl_wrap_socket as old_ssl_wrap_socket
import requests  # for AIA
from woob.tools.log import getLogger


warnings.warn('Use of NSS is deprecated, it will be removed in woob 4.0', DeprecationWarning, stacklevel=2)


__all__ = ['init_nss', 'inject_in_urllib3', 'certificate_db_filename']


CTX = None
INIT_PID = None
INIT_ARGS = None
LOGGER = getLogger(__name__)


def nss_version():
    version_str = nss.nss.nss_get_version()
    version_str = re.match(r'\d+\.\d+', version_str).group(0) # can be "3.21.3 Extended ECC"
    return tuple(int(x) for x in version_str.split('.'))
    # see https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/NSS_3.35_release_notes


[docs]def certificate_db_filename(): version = nss_version() if version < (3, 35): return 'cert8.db' return 'cert9.db'
def path_for_version(path): # despite what certutil(1) and the NSS 3.35 releases notes say # some nss builds >=3.35 will use either sql or dbm by default # also, some builds <3.35 will fail to enforce sql format when using "sql:" if nss_version() < (3, 35): return path return 'sql:%s' % path def cert_to_dict(cert): # see https://docs.python.org/2/library/ssl.html#ssl.SSLSocket.getpeercert # and https://github.com/kennethreitz/requests/blob/master/requests/packages/urllib3/contrib/pyopenssl.py mappings = { nss.nss.certDNSName: 'DNS', nss.nss.certIPAddress: 'IP Address', # TODO support more types } altnames = [] try: ext = cert.get_extension(nss.nss.SEC_OID_X509_SUBJECT_ALT_NAME) except KeyError: pass else: for entry in nss.nss.x509_alt_name(ext.value, nss.nss.AsObject): key = mappings[entry.type_enum] altnames.append((key, entry.name)) ret = { 'subject': [ [('commonName', cert.subject.common_name)], [('localityName', cert.subject.locality_name)], [('organizationName', cert.subject.org_name)], [('organizationalUnitName', cert.subject.org_unit_name)], [('emailAddress', cert.subject.email_address)], ], 'subjectAltName': altnames, 'issuer': [ [('countryName', cert.issuer.country_name)], [('organizationName', cert.issuer.org_name)], [('organizationalUnitName', cert.issuer.org_unit_name)], [('commonName', cert.issuer.common_name)], ], # TODO serialNumber, notBefore, notAfter 'version': cert.version, } return ret ERROR_MAP = { nss.error.PR_CONNECT_TIMEOUT_ERROR: (socket.timeout,), nss.error.PR_IO_TIMEOUT_ERROR: (socket.timeout,), nss.error.PR_CONNECT_RESET_ERROR: (socket.error,), } def wrap_callable(func): @wraps(func) def wrapper(*args, **kwargs): return exc_wrap(func, *args, **kwargs) return wrapper def exc_wrap(func, *args, **kwargs): try: return func(*args, **kwargs) except nss.error.NSPRError as e: if e.error_desc.startswith('(SEC_ERROR_') or e.error_desc.startswith('(SSL_ERROR_'): raise basessl.SSLError(0, e.error_message or e.error_desc, e) for k in ERROR_MAP: if k == e.error_code: raise ERROR_MAP[k][0] raise class NSSFile(RawIOBase): def __init__(self, obj): self.obj = obj self.open = True def close(self): super(NSSFile, self).close() if self.open: self.obj.close() self.open = False def read(self, amount): return self.obj.recv(amount) def readinto(self, buf): amount = len(buf) chunk = self.obj.recv(amount) # TODO handle timeout by returning None? buf[:len(chunk)] = chunk return len(chunk) def write(self, buf): self.obj.send(buf) return len(buf) def readable(self): return self.open writable = readable class Wrapper: def __init__(self, obj): self.__obj = obj self.__timeout = nss.io.PR_INTERVAL_NO_TIMEOUT def settimeout(self, t): if t is None: self.__timeout = nss.io.PR_INTERVAL_NO_TIMEOUT else: self.__timeout = nss.io.milliseconds_to_interval(int(t * 1000)) def __getattr__(self, attr): ret = getattr(self.__obj, attr) if callable(ret): ret = wrap_callable(ret) return ret def getpeercert(self, binary_form=False): # TODO return none or exception in case no cert yet? cert = self.__obj.get_peer_certificate() if binary_form: return cert.der_data else: return cert_to_dict(cert) def makefile(self, *args, **kwargs): made = self.__obj.makefile(*args, **kwargs) # NSS.io.Socket returns the same object, but increments its internal ref counter # close() decreases the counter, and closes if there are no more refs # see python NSS source assert made is self.__obj rw_wrapper = NSSFile(self) return BufferedRWPair(rw_wrapper, rw_wrapper) def recv(self, amount): return exc_wrap(self.__obj.recv, amount, self.__timeout) def send(self, s): return exc_wrap(self.__obj.send, s, self.__timeout) def auth_cert_pinning(sock, check_sig, is_server, path): cert = sock.get_peer_certificate() expected = nss.nss.Certificate(nss.nss.read_der_from_file(path, True)) return (expected.signed_data.data == cert.signed_data.data) AIA_CACHE = {} AIA_LOCK = Lock() def auth_cert_basic(sock, check_sig, is_server): cert = sock.get_certificate() db = nss.nss.get_default_certdb() # simple case: full cert chain try: valid = cert.verify_hostname(sock.get_hostname()) except nss.error.NSPRError: return False if not valid: return False required = nss.nss.certificateUsageSSLServer try: usages = cert.verify_now(db, check_sig, required) & required except nss.error.NSPRError: return False return bool(usages) def auth_cert_aia_only(sock, check_sig, is_server): cert = sock.get_certificate() db = nss.nss.get_default_certdb() # harder case: the server presents an incomplete cert chain and only has the leaf cert # the parent is indicated in the TLS extension called "AIA" for ext in cert.extensions: if ext.name == 'Authority Information Access': aia_text = ext.format() aia_text = re.sub(r'\s+', ' ', aia_text) break else: return False # yes, the parent TLS cert is behind an HTTP URL parent_url = re.search(r'Method: PKIX CA issuers access method Location: URI: (http:\S+)', aia_text).group(1) with AIA_LOCK: parent_der = AIA_CACHE.get(parent_url) if parent_der is None: parent_der = requests.get(parent_url, timeout=30).content with AIA_LOCK: AIA_CACHE[parent_url] = parent_der # verify parent cert is a CA in our db parent = nss.nss.Certificate(parent_der, perm=False) required = nss.nss.certificateUsageAnyCA try: usages = parent.verify_now(db, check_sig, required) & required except nss.error.NSPRError: return False if not usages: return False # verify leaf certificate try: valid = cert.verify_hostname(sock.get_hostname()) except nss.error.NSPRError: return False if not valid: return False required = nss.nss.certificateUsageSSLServer try: usages = cert.verify_now(db, check_sig, required) & required except nss.error.NSPRError: return False return bool(usages) def auth_cert_with_aia(sock, check_sig, is_server): assert not is_server cert = sock.get_certificate() if len(cert.get_cert_chain()) > 1: return auth_cert_basic(sock, check_sig, is_server) else: return auth_cert_aia_only(sock, check_sig, is_server) DEFAULT_CA_CERTIFICATES = ( '/etc/ssl/certs/ca-certificates.crt', '/etc/pki/tls/certs/ca-bundle.crt', ) try: import certifi except ImportError: pass else: DEFAULT_CA_CERTIFICATES = DEFAULT_CA_CERTIFICATES + (certifi.where(),) def ssl_wrap_socket(sock, *args, **kwargs): if kwargs.get('certfile'): LOGGER.debug('a client certificate is used, falling back to OpenSSL') # TODO implement NSS client certificate support return old_ssl_wrap_socket(sock, *args, **kwargs) reinit_if_needed() # TODO handle more options? hostname = kwargs.get('server_hostname') ossl_ctx = kwargs.get('ssl_context') # the python Socket and the NSS SSLSocket are agnostic of each other's state # so the Socket could close the fd, then a file could be opened, # obtaining the same file descriptor, then NSS would use the file, thinking # it's a network file descriptor... dup the fd to make it independant fileno = sock.fileno() if hasattr(sock, 'detach'): # socket.detach only exists in py3. sock.detach() else: fileno = os.dup(fileno) nsssock = nss.ssl.SSLSocket.import_tcp_socket(fileno) wrapper = Wrapper(nsssock) nsssock.set_certificate_db(nss.nss.get_default_certdb()) if hostname: nsssock.set_hostname(hostname) if ossl_ctx and not ossl_ctx.verify_mode: nsssock.set_auth_certificate_callback(lambda *args: True) elif kwargs.get('ca_certs') and kwargs['ca_certs'] not in DEFAULT_CA_CERTIFICATES: nsssock.set_auth_certificate_callback(auth_cert_pinning, kwargs['ca_certs']) else: nsssock.set_auth_certificate_callback(auth_cert_with_aia) nsssock.reset_handshake(False) # marks handshake as not-done try: wrapper.send(b'') # performs handshake except nss.error.NSPRError as e: if e.error_code == nss.error.PR_END_OF_FILE_ERROR: # the corresponding openssl error isn't exactly socket.timeout() # but rather something SyscallError. # i don't know how to generate it exactly and the end result is # similar so let's use this. raise socket.timeout() # see below why closing wrapper.close() raise except: # If there is an exception during the handshake, correctly close the # duplicated/detached socket as it isn't known by the caller. wrapper.close() raise return wrapper
[docs]def inject_in_urllib3(): import urllib3.util.ssl_ import urllib3.connection # on some distros, requests comes with its own urllib3 version import requests.packages.urllib3.util.ssl_ import requests.packages.urllib3.connection for pkg in (urllib3, requests.packages.urllib3): pkg.util.ssl_.ssl_wrap_socket = ssl_wrap_socket pkg.util.ssl_wrap_socket = ssl_wrap_socket pkg.connection.ssl_wrap_socket = ssl_wrap_socket
[docs]def init_nss(path, rw=False): global CTX, INIT_PID, INIT_ARGS if CTX is not None and INIT_PID == os.getpid(): return INIT_ARGS = (path, rw) if rw: flags = 0 else: flags = nss.nss.NSS_INIT_READONLY path = path_for_version(path) INIT_PID = os.getpid() CTX = nss.nss.nss_init_context(path, flags=flags)
def add_nss_cert(dbpath, certpath, nickname): # Even if you use a different nickname, NSS will not add a cert that is # already in db, without signaling it. subprocess.check_call(['certutil', '-A', '-d', dbpath, '-i', certpath, '-n', nickname, '-t', 'TC,C,T']) def del_nss_cert(dbpath, nickname): subprocess.check_call(['certutil', '-D', '-d', dbpath, '-n', nickname]) def create_cert_db(path): # continue to provide this function for braindead customers who believe a development version # is an api-stable version update_cert_db(path) def iter_db_certs(path): # TODO check existing db output = subprocess.check_output(['certutil', '-L', '-d', path]).decode('utf-8').rstrip() for line in output.split('\n'): if line.startswith(' ') or ',' not in line: # NSS prints a useless header. # The lines we want have format: # {cert nickname}{space based alignment}{nss trust flags} # NSS trust flags contain "," so we're guaranteed these are the desired lines continue yield line.split()[0] def create_empty_db(path): path = path_for_version(path) try: subprocess.check_call(['certutil', '-N', '--empty-password', '-d', path]) except OSError: raise ImportError('Please install libnss3-tools') def update_cert_db(dbpath): """Imports certificates from system dir into NSS database.""" # Tries to keep unchanged certificates. # Each certificate has a "nickname" in NSS db, which is defined by us, not # by cert content. # Previously, we used the certificate file path as nickname. # To be able to track those which changed and those removed, we use the # hash of the PEM data as nickname. This is useful for ensuring unicity as # system certificate dirs contain duplicate certificates. realdbpath = dbpath dbpath = path_for_version(dbpath) if not os.path.exists(os.path.join(realdbpath, certificate_db_filename())): create_empty_db(realdbpath) db_certs = set(iter_db_certs(dbpath)) obsolete_certs = set(db_certs) # Do a first pass by removing certs that use file nicknames. # NSS won't add duplicate certs, so existing file nicknames will prevent # hash nicknames from being inserted into the db. # Then, the purging loop at function end will remove file nicknames, which # will make the db effectively empty, because we couldn't insert any hash # nicknames... # If we clean them before, we aren't blocked from adding hash nicknames. for nick in db_certs: if '/' in nick: del_nss_cert(dbpath, nick) obsolete_certs.discard(nick) cert_dir = '/etc/ssl/certs' for cert_file in os.listdir(cert_dir): cert_file = os.path.join(cert_dir, cert_file) if os.path.isdir(cert_file) or '.' not in cert_file: continue with open(cert_file) as fd: content = fd.read() separators = [ '-----END CERTIFICATE-----', '-----END TRUSTED CERTIFICATE-----', ] for sep in separators: if sep in content: separator = sep break else: continue nb_certs = content.count(separator) try: if nb_certs == 1: nick = hashlib.sha1(content.strip().encode('utf-8')).hexdigest() if nick in db_certs: # no need to import it obsolete_certs.discard(nick) # don't remove it at the end continue add_nss_cert(dbpath, cert_file, nick) db_certs.add(nick) elif nb_certs > 1: # nss can't import bundles, split them for subcert in content.split(separator)[:-1]: subcert += separator nick = hashlib.sha1(subcert.strip().encode('utf-8')).hexdigest() if nick in db_certs: obsolete_certs.discard(nick) continue with NamedTemporaryFile('w') as fd: fd.write(subcert) fd.flush() add_nss_cert(dbpath, fd.name, nick) db_certs.add(nick) except subprocess.CalledProcessError: LOGGER.warning('Unable to handle ca file {}'.format(cert_file)) for nick in obsolete_certs: # Those certs were imported in a previous session, but they don't seem # to be on the system anymore. del_nss_cert(dbpath, nick) def reinit_if_needed(): # if we forked since NSS was initialized, we might get an exception # (SEC_ERROR_PKCS11_DEVICE_ERROR) A PKCS #11 module returned CKR_DEVICE_ERROR, indicating that a problem has occurred with the token or slot. # so we should reinit NSS if INIT_PID and INIT_PID != os.getpid(): LOGGER.debug('nss inited in %s but now in %s', INIT_PID, os.getpid()) assert INIT_ARGS init_nss(*INIT_ARGS)