# Copyright(C) 2018 Vincent Ardisson
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.
import codecs
import hashlib
import logging
import os
import time
from collections import OrderedDict
from contextlib import contextmanager
from copy import deepcopy
from glob import glob
from tempfile import NamedTemporaryFile
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
try:
from selenium import webdriver
except ImportError:
raise ImportError("Please install python3-selenium")
from selenium.common.exceptions import NoSuchElementException, NoSuchFrameException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
from selenium.webdriver.remote.command import Command
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from woob.tools.log import getLogger
from .pages import HTMLPage as BaseHTMLPage
from .url import URL
__all__ = (
"SeleniumBrowser",
"SeleniumPage",
"HTMLPage",
"CustomCondition",
"AnyCondition",
"AllCondition",
"NotCondition",
"IsHereCondition",
"VisibleXPath",
"ClickableXPath",
"ClickableLinkText",
"HasTextCondition",
"WrapException",
"xpath_locator",
"link_locator",
"ElementWrapper",
)
[docs]class CustomCondition:
"""Abstract condition class
In Selenium, waiting is done on callable objects named "conditions".
Basically, a condition is a function predicate returning True if some condition is met.
The builtin selenium conditions are in :meth:`selenium.webdriver.support.expected_conditions`.
This class exists to differentiate normal methods from condition objects when calling :math:`SeleniumPage.is_here`.
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_support/selenium.webdriver.support.expected_conditions.html
When using `selenium.webdriver.support.expected_conditions`, it's better to
wrap them using :class:`WrapException`.
"""
def __call__(self, driver):
raise NotImplementedError()
[docs]class WrapException(CustomCondition):
"""Wrap Selenium's builtin `expected_conditions` to catch exceptions.
Selenium's builtin `expected_conditions` return True when a condition is met
but might throw exceptions when it's not met, which might not be desirable.
`WrapException` wraps such `expected_conditions` to catch those exception
and simply return False when such exception is thrown.
"""
def __init__(self, condition):
self.condition = condition
def __call__(self, driver):
try:
return self.condition(driver)
except NoSuchElementException:
return False
[docs]class AnyCondition(CustomCondition):
"""Condition that is true if any of several conditions is true."""
def __init__(self, *conditions):
self.conditions = tuple(WrapException(cb) for cb in conditions)
def __call__(self, driver):
return any(cb(driver) for cb in self.conditions)
[docs]class AllCondition(CustomCondition):
"""Condition that is true if all of several conditions are true."""
def __init__(self, *conditions):
self.conditions = tuple(WrapException(cb) for cb in conditions)
def __call__(self, driver):
return all(cb(driver) for cb in self.conditions)
[docs]class NotCondition(CustomCondition):
"""Condition that tests the inverse of another condition."""
def __init__(self, condition):
self.condition = WrapException(condition)
def __call__(self, driver):
return not self.condition(driver)
[docs]class IsHereCondition(CustomCondition):
"""Condition that is true if a page "is here".
This condition is to be passed to `SeleniumBrowser.wait_until`.
It mustn't be used in a `SeleniumPage.is_here` definition.
"""
def __init__(self, urlobj):
assert isinstance(urlobj, URL)
self.urlobj = urlobj
def __call__(self, driver):
return self.urlobj.is_here()
class WithinFrame(CustomCondition):
"""Check a condition inside a frame.
In Selenium, frames are separated from each other and from the main page.
This class wraps a condition to execute it within a frame.
"""
def __init__(self, selector, condition):
self.selector = selector
self.condition = condition
def __call__(self, driver):
try:
driver.switch_to.frame(self.selector)
except NoSuchFrameException:
return False
try:
return self.condition(driver)
finally:
driver.switch_to.default_content()
class StablePageCondition(CustomCondition):
"""
Warning: this condition will not work if a site has a carousel or something
like this that constantly changes the DOM.
"""
purge_times = 10
def __init__(self, waiting=3):
self.elements = OrderedDict()
self.waiting = waiting
def _purge(self):
now = time.time()
for k in list(self.elements):
if now - self.elements[k][0] > self.purge_times * self.waiting:
del self.elements[k]
def __call__(self, driver):
self._purge()
hashed = hashlib.new("md5", driver.page_source.encode("utf-8")).hexdigest() # nosec
now = time.time()
page_id = driver.find_element_by_xpath("/*").id
if page_id not in self.elements or self.elements[page_id][1] != hashed:
self.elements[page_id] = (now, hashed)
return False
elif now - self.elements[page_id][0] < self.waiting:
return False
return True
[docs]def VisibleXPath(xpath):
"""Wraps `visibility_of_element_located`"""
return WrapException(EC.visibility_of_element_located(xpath_locator(xpath)))
[docs]def ClickableXPath(xpath):
"""Wraps `element_to_be_clickable`"""
return WrapException(EC.element_to_be_clickable(xpath_locator(xpath)))
[docs]def ClickableLinkText(text, partial=False):
"""Wraps `element_to_be_clickable`"""
return WrapException(EC.element_to_be_clickable(link_locator(text, partial)))
[docs]def HasTextCondition(xpath):
"""Condition to ensure some xpath is visible and contains non-empty text."""
xpath = '(%s)[normalize-space(text())!=""]' % xpath
return VisibleXPath(xpath)
[docs]def xpath_locator(xpath):
"""Creates an XPath locator from a string
Most Selenium functions don't accept XPaths directly but "locators".
Locators can be XPath, CSS selectors.
"""
return (By.XPATH, xpath)
[docs]def link_locator(text, partial=False):
"""Creates an link text locator locator from a string
Most Selenium functions don't accept XPaths directly but "locators".
Warning: if searched text is not directly in <a> but in one of its children,
some webdrivers might not find the link.
"""
if partial:
return (By.PARTIAL_LINK_TEXT, text)
else:
return (By.LINK_TEXT, text)
[docs]class ElementWrapper:
"""Wrapper to Selenium element to ressemble lxml.
Some differences:
- only a subset of lxml's Element class are available
- cannot access XPath "text()", only Elements
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_remote/selenium.webdriver.remote.webelement.html
"""
def __init__(self, wrapped):
self.wrapped = wrapped
[docs] def xpath(self, xpath):
"""Returns a list of elements matching `xpath`.
Since it uses `find_elements_by_xpath`, it does not raise
`NoSuchElementException` or `TimeoutException`.
"""
return [ElementWrapper(sel) for sel in self.wrapped.find_elements_by_xpath(xpath)]
[docs] def text_content(self):
return self.wrapped.text
@property
def text(self):
# Selenium can only fetch text recursively.
# Could be implemented by injecting JS though.
raise NotImplementedError()
[docs] def itertext(self):
return [self.wrapped.text]
def __getattr__(self, attr):
return getattr(self.wrapped, attr)
@property
class attrib:
def __init__(self, el):
self.el = el
def __getitem__(self, k):
v = self.el.get_attribute(k)
if v is None:
raise KeyError("Attribute %r was not found" % k)
return v
def get(self, k, default=None):
v = self.el.get_attribute(k)
if v is None:
return default
return v
[docs]class SeleniumPage:
"""Page to use in a SeleniumBrowser
Differences with regular woob Pages:
- cannot access raw HTML text
"""
logged = False
def __init__(self, browser):
super().__init__()
self.params = {}
self.browser = browser
self.driver = browser.driver
self.logger = getLogger(self.__class__.__name__.lower(), browser.logger)
@property
def doc(self):
return ElementWrapper(self.browser.driver.find_element_by_xpath("/*"))
[docs] def is_here(self):
"""Method to determine if the browser is on this page and the page is ready.
Use XPath and page content to determine if we are on this page.
Make sure the page is "ready" for the usage we want. For example, if there's
a splash screen in front the page, preventing click, it should return False.
`is_here` can be a method or a :class:`CustomCondition` instance.
"""
return True
# TODO get_form
[docs]class HTMLPage(BaseHTMLPage):
ENCODING = "utf-8"
def __init__(self, browser):
fake = FakeResponse(
url=browser.url,
text=browser.page_source,
content=browser.page_source.encode("utf-8"),
encoding="utf-8",
)
super().__init__(browser, fake, encoding="utf-8")
self.driver = browser.driver
OPTIONS_CLASSES = {
webdriver.Firefox: webdriver.FirefoxOptions,
webdriver.Chrome: webdriver.ChromeOptions,
}
CAPA_CLASSES = {
webdriver.Firefox: DesiredCapabilities.FIREFOX,
webdriver.Chrome: DesiredCapabilities.CHROME,
}
class DirFirefoxProfile(FirefoxProfile):
def __init__(self, custom_dir):
self._woob_dir = custom_dir
super().__init__()
def _create_tempfolder(self):
if self._woob_dir:
return self._woob_dir
return super()._create_tempfolder()
class FakeResponse:
page = None
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
class SeleniumBrowserSetupError(Exception):
"""
Raised when the browser attributes are not valid
and the driver can not be setup
"""
[docs]class SeleniumBrowser:
"""Browser similar to PagesBrowser, but using Selenium.
URLs instances can be used. The need_login decorator can be used too.
Differences:
- since JS code can be run anytime, the current `url` and `page` can change anytime
- it's not possible to use :meth:`open()`, only :meth:`location()` can be used
- many options are not implemented yet (like proxies) or cannot be implemented at all
"""
DRIVER = webdriver.Firefox
"""Selenium driver class"""
HEADLESS = True
"""Run without any display"""
DEFAULT_WAIT = 10
"""Default wait time for `wait_*` methods"""
WINDOW_SIZE = None
"""Rendering window size
It can be useful for responsive websites which show or hide elements depending
on the viewport size.
"""
BASEURL = None
MAX_SAVED_RESPONSES = 1 << 30 # limit to 1GiB
def __init__(
self,
logger=None,
proxy=None,
responses_dirname=None,
weboob=None,
proxy_headers=None,
preferences=None,
remote_driver_url=None,
woob=None,
):
super().__init__()
self.responses_dirname = responses_dirname
self.responses_count = 0
self.woob = woob or weboob
self.logger = getLogger("browser", logger)
self.proxy = proxy or {}
self.remote_driver_url = remote_driver_url
# We set the default value of selenium logger to ERROR to avoid
# spamming logs with useless information.
# Also, the data we send to the browser using selenium (with send_keys)
# can be displayed clearly in the log, if the log level is
# set to DEBUG.
logging.getLogger("selenium").setLevel(logging.ERROR)
self.implicit_timeout = 0
self.last_page_hash = None
self._setup_driver(preferences)
self._urls = []
cls = type(self)
for attr in dir(cls):
val = getattr(cls, attr)
if isinstance(val, URL):
val = deepcopy(val)
val.browser = self
setattr(self, attr, val)
self._urls.append(val)
self._urls.sort(key=lambda u: u._creation_counter)
def _build_options(self, preferences):
options = OPTIONS_CLASSES[self.DRIVER]()
if preferences:
if isinstance(options, webdriver.FirefoxOptions):
for key, value in preferences.items():
options.set_preference(key, value)
elif isinstance(options, webdriver.ChromeOptions):
options.add_experimental_option("prefs", preferences)
return options
def _build_capabilities(self):
caps = CAPA_CLASSES[self.DRIVER].copy()
caps["acceptInsecureCerts"] = bool(getattr(self, "VERIFY", False))
return caps
[docs] def get_proxy_url(self, url):
if self.DRIVER is webdriver.Firefox:
proxy_url = urlparse(url)
return proxy_url.geturl().replace("%s://" % proxy_url.scheme, "")
return url
def _build_proxy(self):
proxy = Proxy()
if "http" in self.proxy:
proxy.proxy_type = ProxyType.MANUAL
proxy.http_proxy = self.get_proxy_url(self.proxy["http"])
if "https" in self.proxy:
proxy.proxy_type = ProxyType.MANUAL
proxy.ssl_proxy = self.get_proxy_url(self.proxy["https"])
if proxy.proxy_type != ProxyType.MANUAL:
proxy.proxy_type = ProxyType.DIRECT
return proxy
def _setup_driver(self, preferences):
proxy = self._build_proxy()
capa = self._build_capabilities()
proxy.add_to_capabilities(capa)
options = self._build_options(preferences)
# TODO some browsers don't need headless
# TODO handle different proxy setting?
try:
# New Selenium versions
options.headless = self.HEADLESS
except AttributeError:
# Keep compatibility with old Selenium versions
options.set_headless(self.HEADLESS)
driver_kwargs = {}
if self.responses_dirname:
if not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
driver_kwargs["service_log_path"] = os.path.join(self.responses_dirname, "selenium.log")
else:
driver_kwargs["service_log_path"] = NamedTemporaryFile(
prefix="woob_selenium_", suffix=".log", delete=False
).name
if self.remote_driver_url:
self._setup_remote_driver(options=options, capabilities=capa, proxy=proxy)
elif self.DRIVER is webdriver.Firefox:
if self.responses_dirname and not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
options.profile = DirFirefoxProfile(self.responses_dirname)
if self.responses_dirname:
capa["profile"] = self.responses_dirname
self.driver = self.DRIVER(options=options, capabilities=capa, **driver_kwargs)
elif self.DRIVER is webdriver.Chrome:
if self.HEADLESS:
# Prevent random renderer timeout
options.add_argument("--disable-gpu")
self.driver = self.DRIVER(options=options, desired_capabilities=capa, **driver_kwargs)
else:
raise NotImplementedError()
if self.WINDOW_SIZE:
self.driver.set_window_size(*self.WINDOW_SIZE)
def _setup_remote_driver(self, options, capabilities, proxy):
if self.DRIVER is webdriver.Firefox:
capabilities["browserName"] = "firefox"
elif self.DRIVER is webdriver.Chrome:
capabilities["browserName"] = "chrome"
options.add_argument("start-maximized") # must be start maximized to avoid diffs using headless
else:
raise SeleniumBrowserSetupError("Remote driver supports only Firefox and Chrome.")
self.driver = webdriver.Remote(
command_executor="%s/wd/hub" % self.remote_driver_url,
desired_capabilities=capabilities,
options=options,
proxy=proxy,
)
### Browser
[docs] def deinit(self):
if self.driver:
self.driver.quit()
@property
def url(self):
return self.driver.current_url
@property
def page(self):
def do_on_load(page):
if hasattr(page, "on_load"):
page.on_load()
for val in self._urls:
if not val.match(self.url):
continue
page = val.klass(self)
with self.implicit_wait(0):
try:
if isinstance(page.is_here, CustomCondition):
if page.is_here(self.driver):
self.logger.debug("Handle %s with %s", self.url, type(page).__name__)
self.save_response_if_changed()
do_on_load(page)
return page
elif page.is_here():
self.logger.debug("Handle %s with %s", self.url, type(page).__name__)
self.save_response_if_changed()
do_on_load(page)
return page
except NoSuchElementException:
pass
self.logger.debug("Unable to handle %s", self.url)
[docs] def open(self, *args, **kwargs):
"""
Raises :class:`NotImplementedError`.
"""
# TODO maybe implement with a new window?
raise NotImplementedError()
[docs] def location(self, url, data=None, headers=None, params=None, method=None, json=None, timeout=None):
"""
Change current url of the browser.
Warning: unlike other requests-based woob browsers, this function does not block
until the page is loaded, it's completely asynchronous.
To use the new page content, it's necessary to wait, either implicitly (e.g. with
context manager :meth:`implicit_wait`) or explicitly (e.g. using method
:meth:`wait_until`)
"""
assert method is None
assert data is None
assert json is None
assert timeout is None
assert not headers
params = params or {}
# if it's a list of 2-tuples, it will cast into a dict
# otherwise, it will raise a TypeError
try:
params = dict(params)
except TypeError:
raise TypeError("'params' keyword argument must be a dict, a list of tuples or None.")
params = params.items()
url_parsed = urlparse(url)
original_params = parse_qsl(url_parsed.query)
original_params.extend(params)
query = urlencode(original_params)
url_parsed._replace(query=query)
url = urlunparse(url_parsed)
self.logger.debug("opening %r", url)
self.driver.get(url)
try:
WebDriverWait(self.driver, 1).until(EC.url_changes(self.url))
except TimeoutException:
pass
return FakeResponse(page=self.page)
[docs] def export_session(self):
cookies = [cookie.copy() for cookie in self.driver.get_cookies()]
for cookie in cookies:
cookie["expirationDate"] = cookie.pop("expiry", None)
ret = {
"url": self.url,
"cookies": cookies,
}
return ret
[docs] def save_response_if_changed(self):
hash = hashlib.new("md5", self.driver.page_source.encode("utf-8")).hexdigest() # nosec
if self.last_page_hash != hash:
self.save_response()
self.last_page_hash = hash
[docs] def save_response(self):
if self.responses_dirname:
if not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
total = sum(os.path.getsize(f) for f in glob("%s/*" % self.responses_dirname))
if self.MAX_SAVED_RESPONSES is not None and total >= self.MAX_SAVED_RESPONSES:
self.logger.info("quota reached, not saving responses")
return
self.responses_count += 1
path = "%s/%02d.html" % (self.responses_dirname, self.responses_count)
with codecs.open(path, "w", encoding="utf-8") as fd:
fd.write(self.driver.page_source)
self.logger.info("Response saved to %s", path)
[docs] def absurl(self, uri, base=None):
# FIXME this is copy-pasta from DomainBrowser
if not base:
base = self.url
if base is None or base is True:
base = self.BASEURL
return urljoin(base, uri)
### a few selenium wrappers
[docs] def wait_xpath(self, xpath, timeout=None):
self.wait_until(EC.presence_of_element_located(xpath_locator(xpath)), timeout)
[docs] def wait_xpath_visible(self, xpath, timeout=None):
self.wait_until(EC.visibility_of_element_located(xpath_locator(xpath)), timeout)
[docs] def wait_xpath_invisible(self, xpath, timeout=None):
self.wait_until(EC.invisibility_of_element_located(xpath_locator(xpath)), timeout)
[docs] def wait_xpath_clickable(self, xpath, timeout=None):
self.wait_until(EC.element_to_be_clickable(xpath_locator(xpath)), timeout)
[docs] def wait_until_is_here(self, urlobj, timeout=None):
self.wait_until(IsHereCondition(urlobj), timeout)
[docs] def wait_until(self, condition, timeout=None):
"""Wait until some condition object is met
Wraps WebDriverWait.
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_support/selenium.webdriver.support.wait.html
See :class:`CustomCondition`.
:param timeout: wait time in seconds (else DEFAULT_WAIT if None)
"""
if timeout is None:
timeout = self.DEFAULT_WAIT
try:
WebDriverWait(self.driver, timeout).until(condition)
except (NoSuchElementException, TimeoutException):
if self.responses_dirname:
self.driver.get_screenshot_as_file("%s/%02d.png" % (self.responses_dirname, self.responses_count))
self.save_response()
raise
[docs] def implicitly_wait(self, timeout):
"""Set implicit wait time
When querying anything in DOM in Selenium, like evaluating XPath, if not found,
Selenium will wait in a blocking manner until it is found or until the
implicit wait timeouts.
By default, it is 0, so if an XPath is not found, it fails immediately.
:param timeout: new implicit wait time in seconds
"""
self.implicit_timeout = timeout
self.driver.implicitly_wait(timeout)
[docs] @contextmanager
def implicit_wait(self, timeout):
"""Context manager to change implicit wait time and restore it
Example::
with browser.implicit_wait(10):
# Within this block, the implicit wait will be set to 10 seconds
# and be restored at the end of block.
# If the link is not found immediately, it will be periodically
# retried until found (for max 10 seconds).
el = self.find_element_link_text("Show list")
el.click()
"""
old = self.implicit_timeout
try:
self.driver.implicitly_wait(timeout)
yield
finally:
self.driver.implicitly_wait(old)
[docs] @contextmanager
def in_frame(self, selector):
"""Context manager to execute a block inside a frame and restore main page after.
In selenium, to operate on a frame's content, one needs to switch to the frame before
and return to main page after.
:param selector: selector to match the frame
Example::
with self.in_frame(xpath_locator('//frame[@id="foo"]')):
el = self.find_element_by_xpath('//a[@id="bar"]')
el.click()
"""
self.driver.switch_to.frame(selector)
try:
yield
finally:
self.driver.switch_to.default_content()
[docs] def get_storage(self):
"""Get localStorage content for current domain.
As for cookies, this method only manipulates data for current domain.
It's not possible to get all localStorage content. To get localStorage
for multiple domains, the browser must change the url to each domain
and call get_storage each time after.
To do so, it's wise to choose a neutral URL (like an image file or JS file)
to avoid the target page itself changing the cookies.
"""
response = self.driver.execute(Command.GET_LOCAL_STORAGE_KEYS)
ret = {}
for k in response["value"]:
response = self.driver.execute(Command.GET_LOCAL_STORAGE_ITEM, {"key": k})
ret[k] = response["value"]
return ret
[docs] def update_storage(self, d):
"""Update local storage content for current domain.
It has the same restrictions as `get_storage`.
"""
for k, v in d.items():
self.driver.execute(Command.SET_LOCAL_STORAGE_ITEM, {"key": k, "value": v})
[docs] def clear_storage(self):
"""Clear local storage."""
self.driver.execute(Command.CLEAR_LOCAL_STORAGE)
class SubSeleniumMixin:
"""Mixin to have a Selenium browser for performing login."""
SELENIUM_BROWSER = None
"""Class of Selenium browser to use for the login"""
__states__ = ("selenium_state",)
selenium_state = None
def create_selenium_browser(self):
dirname = self.responses_dirname
if dirname:
dirname += "/selenium"
return self.SELENIUM_BROWSER(self.config, logger=self.logger, responses_dirname=dirname, proxy=self.PROXIES)
def do_login(self):
sub_browser = self.create_selenium_browser()
try:
if self.selenium_state and hasattr(sub_browser, "load_state"):
sub_browser.load_state(self.selenium_state)
sub_browser.do_login()
self.load_selenium_session(sub_browser)
finally:
try:
if hasattr(sub_browser, "dump_state"):
self.selenium_state = sub_browser.dump_state()
finally:
sub_browser.deinit()
def load_selenium_session(self, selenium):
d = selenium.export_session()
for cookie in d["cookies"]:
self.session.cookies.set(cookie["name"], cookie["value"], domain=cookie["domain"])
if hasattr(self, "locate_browser"):
self.locate_browser(d)