Source code for woob.browser.har

# Copyright(C) 2012-2022 woob project
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.


import base64
import io
import os
from datetime import datetime
from threading import Lock
from urllib.parse import parse_qsl, urlparse

from woob import __version__ as woob_version
from woob.tools.json import json
from woob.tools.log import getLogger


__all__ = ["HARManager"]


[docs]class HARManager: def __init__(self, responses_dirname, logger): self.har_path = os.path.join(responses_dirname, "bundle.har") self.responses_lock = Lock() self.logger = getLogger("har", logger) self.bundle = None def _build_har_bundle(self, started_datetime): self.bundle = { "log": { "version": "1.2", "creator": { "name": "woob", "version": woob_version, }, "browser": { "name": "woob", "version": woob_version, }, # there are no pages, but we need that to please firefox "pages": [ { "id": "fake_page", "pageTimings": {}, # and chromium wants some of it too "startedDateTime": started_datetime, } ], # don't put additional data after this list, to have a fixed-size suffix after it # so we can add more entries without rewriting the whole file. "entries": [], }, } @staticmethod def _build_har_request(request, http_version): request_entry = { "method": request.method, "url": request.url, "httpVersion": http_version, "headers": [ { "name": k, "value": v, } for k, v in request.headers.items() ], "queryString": [ { "name": key, "value": value, } for key, value in parse_qsl( urlparse(request.url).query, keep_blank_values=True, ) ], "cookies": [ { "name": k, "value": v, } for k, v in request._cookies.items() ], # for chromium "bodySize": -1, "headersSize": -1, } if request.body is not None: request_entry["postData"] = { "mimeType": request.headers.get("Content-Type", ""), "params": [], } if isinstance(request.body, str): request_entry["postData"]["text"] = request.body else: # HAR format has no proper way to encode posted binary data! request_entry["postData"]["text"] = request.body.decode("latin-1") # add a non-standard key to indicate how should "text" be decoded. request_entry["postData"]["x-binary"] = True if request.headers.get("Content-Type") == "application/x-www-form-urlencoded": request_entry["postData"]["params"] = [ { "name": key, "value": value, } for key, value in parse_qsl(request.body) ] return request_entry @staticmethod def _build_har_response(response): response_entry = { "status": response.status_code, "statusText": response.reason, "httpVersion": "HTTP/%.1f" % (response.raw.version / 10.0), "headers": [ { "name": k, "value": v, } for k, v in response.headers.items() ], "content": { "mimeType": response.headers.get("Content-Type", ""), "size": len(response.content), # systematically use base64 to avoid more content alteration # than there already is... "encoding": "base64", "text": base64.b64encode(response.content).decode("ascii"), }, "cookies": [ { "name": k, "value": v, } for k, v in response.cookies.items() ], "redirectURL": response.headers.get("location", ""), # for chromium "bodySize": -1, "headersSize": -1, } return response_entry @staticmethod def _build_empty_har_response(*args): # called when we get a timeout return { "status": 0, "statusText": "", "httpVersion": "", "headers": [], "content": {}, "cookies": [], "redirectURL": "", # for chromium "bodySize": -1, "headersSize": -1, } def _build_har_entry(self, slug, request, response=None, time=""): # check if response is not None and not if response # because a response with a status_code >= 400 is falsy if response is not None: started_datetime = (datetime.now() - response.elapsed).isoformat() time = int(response.elapsed.total_seconds() * 1000) http_version = "HTTP/%.1f" % (response.raw.version / 10.0) build_response = self._build_har_response else: started_datetime = datetime.now().isoformat() build_response = self._build_empty_har_response http_version = "" if not self.bundle: self._build_har_bundle(started_datetime) har_entry = { "$anchor": slug, "startedDateTime": started_datetime, "pageref": "fake_page", "time": time, "request": self._build_har_request(request, http_version), "response": build_response(response), "timings": { # please chromium "send": -1, "wait": -1, "receive": -1, }, "cache": {}, } return har_entry def _save_har_entry(self, har_entry): self.bundle["log"]["entries"].append(har_entry) if not os.path.isfile(self.har_path): with open(self.har_path, "w") as fd: json.dump(self.bundle, fd, separators=(",", ":")) else: # hack to avoid rewriting the whole file: entries are last in the JSON file # we need to seek at the right place and write the new entry. # this will unfortunately overwrite closings. suffix = "]}}" with open(self.har_path, "r+") as fd: # can't seek with a negative value... fd.seek(0, io.SEEK_END) after_entry_pos = fd.tell() - len(suffix) fd.seek(after_entry_pos) if fd.read(len(suffix)) != suffix: self.logger.warning("HAR file does not end with the expected pattern") else: fd.seek(after_entry_pos) fd.write(",") # there should have been at least one entry json.dump(har_entry, fd, separators=(",", ":")) fd.write(suffix)
[docs] def save_response(self, slug, response): request = response.request har_entry = self._build_har_entry(slug, request, response=response) with self.responses_lock: self._save_har_entry(har_entry)
[docs] def save_request_only(self, slug, request, time): har_entry = self._build_har_entry(slug, request, time=time) with self.responses_lock: self._save_har_entry(har_entry)