# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of woob.
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO, StringIO
from collections import namedtuple
import logging
import os
import subprocess
from tempfile import mkstemp
__all__ = ['decompress_pdf', 'get_pdf_rows']
[docs]def decompress_pdf(inpdf: bytes) -> bytes:
"""
Takes PDF file contents as a string and returns decompressed version
of the file contents, suitable for text parsing.
External dependencies:
MuPDF (https://www.mupdf.com).
"""
inh, inname = mkstemp(suffix='.pdf')
outh, outname = mkstemp(suffix='.pdf')
os.write(inh, inpdf)
os.close(inh)
os.close(outh)
subprocess.call(['mutool', 'clean', '-d', inname, outname])
with open(outname, 'rb') as f:
outpdf = f.read()
os.remove(inname)
os.remove(outname)
return outpdf
Rect = namedtuple('Rect', ('x0', 'y0', 'x1', 'y1'))
TextRect = namedtuple('TextRect', ('x0', 'y0', 'x1', 'y1', 'text'))
def almost_eq(a, b):
return abs(a - b) < 2
def lt_to_coords(obj, ltpage):
# in a pdf, 'y' coords are bottom-to-top
# in a pdf, coordinates are very often almost equal but not strictly equal
x0 = (min(obj.x0, obj.x1))
y0 = (min(ltpage.y1 - obj.y0, ltpage.y1 - obj.y1))
x1 = (max(obj.x0, obj.x1))
y1 = (max(ltpage.y1 - obj.y0, ltpage.y1 - obj.y1))
x0 = round(x0)
y0 = round(y0)
x1 = round(x1)
y1 = round(y1)
# in a pdf, straight lines are actually rects, make them as thin as possible
if almost_eq(x1, x0):
x1 = x0
if almost_eq(y1, y0):
y1 = y0
return Rect(x0, y0, x1, y1)
def lttext_to_multilines(obj, ltpage):
# text lines within 'obj' are probably the same height
x0 = (min(obj.x0, obj.x1))
y0 = (min(ltpage.y1 - obj.y0, ltpage.y1 - obj.y1))
x1 = (max(obj.x0, obj.x1))
y1 = (max(ltpage.y1 - obj.y0, ltpage.y1 - obj.y1))
lines = obj.get_text().rstrip('\n').split('\n')
h = (y1 - y0) / len(lines)
for n, line in enumerate(lines):
yield TextRect((x0), (y0 + n * h), (x1), (y0 + n * h + h), line)
# fuzzy floats to smooth comparisons because lines are actually rects
# and seemingly-contiguous lines are actually not contiguous
class ApproxFloat(float):
def __eq__(self, other):
return almost_eq(self, other)
def __ne__(self, other):
return not self == other
def __lt__(self, other):
return self - other < 0 and self != other
def __le__(self, other):
return self - other <= 0 or self == other
def __gt__(self, other):
return not self <= other
def __ge__(self, other):
return not self < other
ANGLE_VERTICAL = 0
ANGLE_HORIZONTAL = 1
ANGLE_OTHER = 2
def angle(r):
if r.x0 == r.x1:
return ANGLE_VERTICAL
elif r.y0 == r.y1:
return ANGLE_HORIZONTAL
return ANGLE_OTHER
class ApproxVecDict(dict):
# since coords are never strictly equal, search coords around
# store vectors and points
def __getitem__(self, coords):
x, y = coords
for i in (0, -1, 1):
for j in (0, -1, 1):
try:
return super(ApproxVecDict, self).__getitem__((x+i, y+j))
except KeyError:
pass
raise KeyError()
def get(self, k, v=None):
try:
return self[k]
except KeyError:
return v
class ApproxRectDict(dict):
# like ApproxVecDict, but store rects
def __getitem__(self, coords):
x0, y0, x1, y1 = coords
for i in (0, -1, 1):
for j in (0, -1, 1):
if x0 == x1:
for j2 in (0, -1, 1):
try:
return super(ApproxRectDict, self).__getitem__((x0+i, y0+j, x0+i, y1+j2))
except KeyError:
pass
elif y0 == y1:
for i2 in (0, -1, 1):
try:
return super(ApproxRectDict, self).__getitem__((x0+i, y0+j, x1+i2, y0+j))
except KeyError:
pass
else:
return super(ApproxRectDict, self).__getitem__((x0, y0, x1, y1))
raise KeyError()
def uniq_lines(lines):
new = ApproxRectDict()
for line in lines:
line = tuple(line)
try:
new[line]
except KeyError:
new[line] = None
return [Rect(*k) for k in new.keys()]
def build_rows(lines):
points = ApproxVecDict()
# for each top-left point, build tuple with lines going down and lines going right
for line in lines:
a = angle(line)
if a not in (ANGLE_HORIZONTAL, ANGLE_VERTICAL):
continue
coord = (line.x0, line.y0)
plines = points.get(coord)
if plines is None:
plines = points[coord] = tuple([] for _ in range(2))
plines[a].append(line)
boxes = ApproxVecDict()
for plines in points.values():
if not (plines[ANGLE_HORIZONTAL] and plines[ANGLE_VERTICAL]):
continue
plines[ANGLE_HORIZONTAL].sort(key=lambda l: (l.y0, l.x1))
plines[ANGLE_VERTICAL].sort(key=lambda l: (l.x0, l.y1))
for hline in plines[ANGLE_HORIZONTAL]:
try:
vparallels = points[hline.x1, hline.y0][ANGLE_VERTICAL]
except KeyError:
continue
if not vparallels:
continue
for vline in plines[ANGLE_VERTICAL]:
try:
hparallels = points[vline.x0, vline.y1][ANGLE_HORIZONTAL]
except KeyError:
continue
if not hparallels:
continue
hparallels = [hpar for hpar in hparallels if almost_eq(hpar.x1, hline.x1)]
if not hparallels:
continue
vparallels = [vpar for vpar in vparallels if almost_eq(vpar.y1, vline.y1)]
if not vparallels:
continue
assert len(hparallels) == 1 and len(vparallels) == 1
assert almost_eq(hparallels[0].y0, vparallels[0].y1)
assert almost_eq(vparallels[0].x0, hparallels[0].x1)
box = Rect(hline.x0, hline.y0, hline.x1, vline.y1)
boxes.setdefault((vline.y0, vline.y1), []).append(box)
rows = list(boxes.values())
new_rows = []
for row in rows:
row.sort(key=lambda box: box.x0)
if row:
row = [row[0]] + [c for n, c in enumerate(row[1:], 1) if row[n-1].x0 != c.x0]
new_rows.append(row)
rows = new_rows
rows.sort(key=lambda row: row[0].y0)
return rows
def find_in_table(rows, rect):
for j, row in enumerate(rows):
if ApproxFloat(row[0].y0) > rect.y1:
break
if not (ApproxFloat(row[0].y0) <= rect.y0 and ApproxFloat(row[0].y1) >= rect.y1):
continue
for i, box in enumerate(row):
if ApproxFloat(box.x0) <= rect.x0 and ApproxFloat(box.x1) >= rect.x1:
return i, j
def arrange_texts_in_rows(rows, trects):
table = [[[] for _ in row] for row in rows]
for trect in trects:
pos = find_in_table(rows, trect)
if not pos:
continue
table[pos[1]][pos[0]].append(trect.text)
return table
LOGGER = logging.getLogger(__name__)
DEBUGFILES = logging.DEBUG - 1
[docs]def get_pdf_rows(data, miner_layout=True):
"""
Takes PDF file content as string and yield table row data for each page.
For each page in the PDF, the function yields a list of rows.
Each row is a list of cells. Each cell is a list of strings present in the cell.
Note that the rows may belong to different tables.
There are no logic tables in PDF format, so this parses PDF drawing instructions
and tries to find rectangles and arrange them in rows, then arrange text in
the rectangles.
External dependencies:
PDFMiner (https://github.com/euske/pdfminer).
"""
try:
from pdfminer.pdfparser import PDFParser, PDFSyntaxError
except ImportError:
raise ImportError('Please install python3-pdfminer')
try:
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfpage import PDFPage
newapi = True
except ImportError:
from pdfminer.pdfparser import PDFDocument
newapi = False
from pdfminer.converter import PDFPageAggregator
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.layout import LAParams, LTRect, LTTextBox, LTTextLine, LTLine, LTChar, LTCurve
parser = PDFParser(BytesIO(data))
try:
if newapi:
doc = PDFDocument(parser)
else:
doc = PDFDocument()
parser.set_document(doc)
doc.set_parser(parser)
except PDFSyntaxError:
return
rsrcmgr = PDFResourceManager()
if miner_layout:
device = PDFPageAggregator(rsrcmgr, laparams=LAParams())
else:
device = PDFPageAggregator(rsrcmgr)
interpreter = PDFPageInterpreter(rsrcmgr, device)
if newapi:
pages = PDFPage.get_pages(BytesIO(data), check_extractable=True)
else:
doc.initialize()
pages = doc.get_pages()
if LOGGER.isEnabledFor(DEBUGFILES):
import tempfile
import PIL.Image as Image
import PIL.ImageDraw as ImageDraw
import random
path = tempfile.mkdtemp(prefix='pdf')
for npage, page in enumerate(pages):
LOGGER.debug('processing page %s', npage)
interpreter.process_page(page)
page_layout = device.get_result()
texts = sum([list(lttext_to_multilines(obj, page_layout)) for obj in page_layout._objs if isinstance(obj, (LTTextBox, LTTextLine, LTChar))], [])
LOGGER.debug('found %d text objects', len(texts))
if LOGGER.isEnabledFor(DEBUGFILES):
img = Image.new('RGB', (int(page.mediabox[2]), int(page.mediabox[3])), (255, 255, 255))
draw = ImageDraw.Draw(img)
for t in texts:
color = (random.randint(127, 255), random.randint(127, 255), random.randint(127, 255))
draw.rectangle((t.x0, t.y0, t.x1, t.y1), outline=color)
draw.text((t.x0, t.y0), t.text.encode('utf-8'), color)
fpath = '%s/1text-%03d.png' % (path, npage)
img.save(fpath)
LOGGER.log(DEBUGFILES, 'saved %r', fpath)
if not miner_layout:
texts.sort(key=lambda t: (t.y0, t.x0))
# TODO filter ltcurves that are not lines?
# TODO convert rects to 4 lines?
lines = [lt_to_coords(obj, page_layout) for obj in page_layout._objs if isinstance(obj, (LTRect, LTLine, LTCurve))]
LOGGER.debug('found %d lines', len(lines))
if LOGGER.isEnabledFor(DEBUGFILES):
img = Image.new('RGB', (int(page.mediabox[2]), int(page.mediabox[3])), (255, 255, 255))
draw = ImageDraw.Draw(img)
for l in lines:
color = (random.randint(127, 255), random.randint(127, 255), random.randint(127, 255))
draw.rectangle((l.x0, l.y0, l.x1, l.y1), outline=color)
fpath = '%s/2lines-%03d.png' % (path, npage)
img.save(fpath)
LOGGER.log(DEBUGFILES, 'saved %r', fpath)
lines = list(uniq_lines(lines))
LOGGER.debug('found %d unique lines', len(lines))
rows = build_rows(lines)
LOGGER.debug('built %d rows (%d boxes)', len(rows), sum(len(row) for row in rows))
if LOGGER.isEnabledFor(DEBUGFILES):
img = Image.new('RGB', (int(page.mediabox[2]), int(page.mediabox[3])), (255, 255, 255))
draw = ImageDraw.Draw(img)
for r in rows:
for b in r:
color = (random.randint(127, 255), random.randint(127, 255), random.randint(127, 255))
draw.rectangle((b.x0 + 1, b.y0 + 1, b.x1 - 1, b.y1 - 1), outline=color)
fpath = '%s/3rows-%03d.png' % (path, npage)
img.save(fpath)
LOGGER.log(DEBUGFILES, 'saved %r', fpath)
textrows = arrange_texts_in_rows(rows, texts)
LOGGER.debug('assigned %d strings', sum(sum(len(c) for c in r) for r in textrows))
if LOGGER.isEnabledFor(DEBUGFILES):
img = Image.new('RGB', (int(page.mediabox[2]), int(page.mediabox[3])), (255, 255, 255))
draw = ImageDraw.Draw(img)
for row, trow in zip(rows, textrows):
for b, tlines in zip(row, trow):
color = (random.randint(127, 255), random.randint(127, 255), random.randint(127, 255))
draw.rectangle((b.x0 + 1, b.y0 + 1, b.x1 - 1, b.y1 - 1), outline=color)
draw.text((b.x0 + 1, b.y0 + 1), '\n'.join(tlines).encode('utf-8'), color)
fpath = '%s/4cells-%03d.png' % (path, npage)
img.save(fpath)
LOGGER.log(DEBUGFILES, 'saved %r', fpath)
yield textrows
device.close()
# Export part #
def html_to_pdf(browser, url=None, data=None, extra_options=None):
"""
Convert html to PDF.
:param browser: browser instance
:param url: link to the html ressource
:param data: HTML content
:return: the document converted in PDF
:rtype: bytes
"""
try:
import pdfkit # https://pypi.python.org/pypi/pdfkit
except ImportError:
raise ImportError('Please install python3-pdfkit')
assert (url or data) and not (url and data), 'Please give only url or data parameter'
callback = pdfkit.from_url if url else pdfkit.from_string
options = {}
try:
cookies = browser.session.cookies
except AttributeError:
pass
else:
options.update({
'cookie': [(cookie, value) for cookie, value in cookies.items() if value], # cookies of browser
})
if extra_options:
options.update(extra_options)
return callback(url or data, False, options=options)
class BlinkPdfError(Exception):
pass
def blinkpdf(browser, url, extra_options=None, filter_cookie=None, start_xvfb=True, timeout=120):
# - xvfb is required for blinkpdf 1.0, but not for 1.1
# - xvfb is not necessary for QtWebEngine 5.14, but it is for 5.11, which is the version
# available on the ppa for debian/buster stable
xvfb_exists = False
blinkpdf_exists = False
paths = os.getenv('PATH', os.defpath).split(os.pathsep)
for path in paths:
fpath = os.path.join(path, 'xvfb-run')
if os.path.exists(fpath) and os.access(fpath, os.X_OK):
xvfb_exists = True
fpath = os.path.join(path, 'blinkpdf')
if os.path.exists(fpath) and os.access(fpath, os.X_OK):
blinkpdf_exists = True
if (not xvfb_exists and start_xvfb) or not blinkpdf_exists:
raise NotImplementedError()
args = []
for c in browser.session.cookies:
if c.value:
if not filter_cookie or filter_cookie(c):
args.append('--cookie')
args.append('%s=%s' % (c.name, c.value))
for key, value in browser.session.headers.items():
args.append('--header')
args.append('%s=%s' % (key, value))
if extra_options and 'run-script' in extra_options:
args.append('--run-script')
args.append(extra_options['run-script'][0])
args.append(url)
args.append('-') # - : don't write it on disk, simply return value
if start_xvfb:
# put a very small resolution to reduce used memory, because we don't really need it, it doesn't influence pdf size
# -screen 0 width*height*bit depth
prepend = ['xvfb-run', '-a', '-s', '-screen 0 2x2x8', 'blinkpdf']
else:
prepend = ['blinkpdf']
cmd = list(prepend) + list(args)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
# Will raise a TimeoutExpired after timeout seconds
stdout, stderr = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
# A timeout doesn't kill the child process
proc.kill()
# Log the error output after the end of the process. 20 seconds should
# be enough for the process to terminate cleanly
_, stderr = proc.communicate(timeout=20)
LOGGER.error('The blinkpdf process took too long to complete. Error output: %s', stderr.decode('utf-8'))
raise
if proc.returncode != 0:
raise BlinkPdfError('command returned non-zero exit status 1: %s' % stderr.decode('utf-8'))
return stdout
# extract all text from PDF
def extract_text(data):
try:
try:
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfpage import PDFPage
newapi = True
except ImportError:
from pdfminer.pdfparser import PDFDocument
newapi = False
from pdfminer.pdfparser import PDFParser, PDFSyntaxError
from pdfminer.converter import TextConverter
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
except ImportError:
raise ImportError('Please install python3-pdfminer to parse PDF')
else:
parser = PDFParser(BytesIO(data))
try:
if newapi:
doc = PDFDocument(parser)
else:
doc = PDFDocument()
parser.set_document(doc)
doc.set_parser(parser)
except PDFSyntaxError:
return
rsrcmgr = PDFResourceManager()
out = StringIO()
device = TextConverter(rsrcmgr, out)
interpreter = PDFPageInterpreter(rsrcmgr, device)
if newapi:
pages = PDFPage.create_pages(doc)
else:
doc.initialize()
pages = doc.get_pages()
for page in pages:
interpreter.process_page(page)
return out.getvalue()