Source code for woob.browser.har

# Copyright(C) 2012-2022 woob project
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.


import base64
import io
import os
from datetime import datetime
from threading import Lock
from urllib.parse import urlparse, parse_qsl

from woob.tools.json import json
from woob.tools.log import getLogger
from woob import __version__ as woob_version

__all__ = ['HARManager']


[docs]class HARManager: def __init__(self, responses_dirname, logger): self.har_path = os.path.join(responses_dirname, 'bundle.har') self.responses_lock = Lock() self.logger = getLogger('har', logger) self.bundle = None def _build_har_bundle(self, started_datetime): self.bundle = { 'log': { 'version': '1.2', 'creator': { 'name': 'woob', 'version': woob_version, }, 'browser': { 'name': 'woob', 'version': woob_version, }, # there are no pages, but we need that to please firefox 'pages': [{ 'id': 'fake_page', 'pageTimings': {}, # and chromium wants some of it too 'startedDateTime': started_datetime, }], # don't put additional data after this list, to have a fixed-size suffix after it # so we can add more entries without rewriting the whole file. 'entries': [], }, } @staticmethod def _build_har_request(request, http_version): request_entry = { 'method': request.method, 'url': request.url, 'httpVersion': http_version, 'headers': [ { 'name': k, 'value': v, } for k, v in request.headers.items() ], 'queryString': [ { 'name': key, 'value': value, } for key, value in parse_qsl( urlparse(request.url).query, keep_blank_values=True, ) ], 'cookies': [ { 'name': k, 'value': v, } for k, v in request._cookies.items() ], # for chromium 'bodySize': -1, 'headersSize': -1, } if request.body is not None: request_entry['postData'] = { 'mimeType': request.headers.get('Content-Type', ''), 'params': [], } if isinstance(request.body, str): request_entry['postData']['text'] = request.body else: # HAR format has no proper way to encode posted binary data! request_entry['postData']['text'] = request.body.decode('latin-1') # add a non-standard key to indicate how should "text" be decoded. request_entry['postData']['x-binary'] = True if request.headers.get('Content-Type') == 'application/x-www-form-urlencoded': request_entry['postData']['params'] = [ { "name": key, "value": value, } for key, value in parse_qsl(request.body) ] return request_entry @staticmethod def _build_har_response(response): response_entry = { 'status': response.status_code, 'statusText': response.reason, 'httpVersion': 'HTTP/%.1f' % (response.raw.version / 10.), 'headers': [ { 'name': k, 'value': v, } for k, v in response.headers.items() ], 'content': { 'mimeType': response.headers.get('Content-Type', ''), 'size': len(response.content), # systematically use base64 to avoid more content alteration # than there already is... 'encoding': "base64", 'text': base64.b64encode(response.content).decode('ascii'), }, 'cookies': [ { 'name': k, 'value': v, } for k, v in response.cookies.items() ], 'redirectURL': response.headers.get('location', ''), # for chromium 'bodySize': -1, 'headersSize': -1, } return response_entry @staticmethod def _build_empty_har_response(*args): # called when we get a timeout return { 'status': 0, 'statusText': '', 'httpVersion': '', 'headers': [], 'content': {}, 'cookies': [], 'redirectURL': '', # for chromium 'bodySize': -1, 'headersSize': -1, } def _build_har_entry(self, slug, request, response=None, time=''): # check if response is not None and not if response # because a response with a status_code >= 400 is falsy if response is not None: started_datetime = (datetime.now() - response.elapsed).isoformat() time = int(response.elapsed.total_seconds() * 1000) http_version = 'HTTP/%.1f' % (response.raw.version / 10.) build_response = self._build_har_response else: started_datetime = datetime.now().isoformat() build_response = self._build_empty_har_response http_version = '' if not self.bundle: self._build_har_bundle(started_datetime) har_entry = { '$anchor': slug, 'startedDateTime': started_datetime, 'pageref': 'fake_page', 'time': time, 'request': self._build_har_request(request, http_version), 'response': build_response(response), 'timings': { # please chromium 'send': -1, 'wait': -1, 'receive': -1, }, 'cache': {}, } return har_entry def _save_har_entry(self, har_entry): self.bundle['log']['entries'].append(har_entry) if not os.path.isfile(self.har_path): with open(self.har_path, 'w') as fd: json.dump(self.bundle, fd, separators=(',', ':')) else: # hack to avoid rewriting the whole file: entries are last in the JSON file # we need to seek at the right place and write the new entry. # this will unfortunately overwrite closings. suffix = "]}}" with open(self.har_path, 'r+') as fd: # can't seek with a negative value... fd.seek(0, io.SEEK_END) after_entry_pos = fd.tell() - len(suffix) fd.seek(after_entry_pos) if fd.read(len(suffix)) != suffix: self.logger.warning('HAR file does not end with the expected pattern') else: fd.seek(after_entry_pos) fd.write(',') # there should have been at least one entry json.dump(har_entry, fd, separators=(',', ':')) fd.write(suffix)
[docs] def save_response(self, slug, response): request = response.request har_entry = self._build_har_entry(slug, request, response=response) with self.responses_lock: self._save_har_entry(har_entry)
[docs] def save_request_only(self, slug, request, time): har_entry = self._build_har_entry(slug, request, time=time) with self.responses_lock: self._save_har_entry(har_entry)