Skip to content

Commit

Permalink
ENH: Add AES support for encrypting PDF files (#1918)
Browse files Browse the repository at this point in the history
* PdfWriter.encrypt: Add 'algorithm' parameter
* PdfWriter: Add _encryption property
* PdfWriter: Add _encrypt_entry property

This change was made in #1816 to avoid merge conflicts /
get it merged soon.

Full credit for the work goes to exiledkingcc
who did all of the work in #1816

Co-authored-by: exiledkingcc <exiledkingcc@gmail.com>
  • Loading branch information
MartinThoma and exiledkingcc authored Jul 2, 2023
1 parent 44790c4 commit 71be38b
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pypdf/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,10 +1273,10 @@ def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encrypti
EncryptMetadata=EncryptMetadata,
first_id_entry=first_id_entry,
values=values,
StmF=StmF,
StrF=StrF,
StmF=StmF,
EFF=EFF,
entry=encryption_entry, # can be deleted?
entry=encryption_entry, # Dummy entry for the moment; will get removed
)

@staticmethod
Expand Down
79 changes: 32 additions & 47 deletions pypdf/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import enum
import hashlib
import re
import struct
import uuid
import warnings
from hashlib import md5
from io import BytesIO, FileIO, IOBase
from pathlib import Path
from types import TracebackType
Expand All @@ -56,11 +54,10 @@
cast,
)

from ._encryption import Encryption
from ._encryption import EncryptAlgorithm, Encryption
from ._page import PageObject, _VirtualList
from ._page_labels import nums_clear_range, nums_insert, nums_next
from ._reader import PdfReader
from ._security import _alg33, _alg34, _alg35
from ._utils import (
StrByteType,
StreamType,
Expand All @@ -84,13 +81,11 @@
UserAccessPermissions,
)
from .constants import Core as CO
from .constants import EncryptionDictAttributes as ED
from .constants import (
FieldDictionaryAttributes as FA,
)
from .constants import PageAttributes as PG
from .constants import PagesAttributes as PA
from .constants import StreamAttributes as SA
from .constants import TrailerKeys as TK
from .generic import (
PAGE_FIT,
Expand Down Expand Up @@ -209,6 +204,9 @@ def __init__(
self.fileobj = fileobj
self.with_as_usage = False

self._encryption: Optional[Encryption] = None
self._encrypt_entry: Optional[DictionaryObject] = None

def __enter__(self) -> "PdfWriter":
"""Store that writer is initialized by 'with'."""
self.with_as_usage = True
Expand Down Expand Up @@ -1160,6 +1158,8 @@ def encrypt(
permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
user_pwd: Optional[str] = None, # deprecated
owner_pwd: Optional[str] = None, # deprecated
*,
algorithm: Optional[str] = None,
) -> None:
"""
Encrypt this PDF file with the PDF Standard encryption handler.
Expand All @@ -1180,13 +1180,10 @@ def encrypt(
Bit position 3 is for printing, 4 is for modifying content,
5 and 6 control annotations, 9 for form fields,
10 for extraction of text and graphics.
algorithm: encrypt algorithm. Values maybe one of "RC4-40", "RC4-128",
"AES-128", "AES-256-R5", "AES-256". If it's valid,
`use_128bit` will be ignored.
"""
warnings.warn(
"pypdf only implements RC4 encryption so far. "
"The RC4 algorithm is insecure. Either use a library that supports "
"AES for encryption or put the PDF in an encrypted container, "
"for example an encrypted ZIP file."
)
if user_pwd is not None:
if user_password is not None:
raise ValueError(
Expand Down Expand Up @@ -1224,33 +1221,27 @@ def encrypt(

if owner_password is None:
owner_password = user_password
if use_128bit:
V = 2
rev = 3
keylen = int(128 / 8)
if algorithm is not None:
try:
alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
except AttributeError:
raise ValueError(f"algorithm '{algorithm}' NOT supported")
else:
V = 1
rev = 2
keylen = int(40 / 8)
P = permissions_flag
O = ByteStringObject(_alg33(owner_password, user_password, rev, keylen)) # type: ignore[arg-type] # noqa
alg = EncryptAlgorithm.RC4_128
if not use_128bit:
alg = EncryptAlgorithm.RC4_40
self.generate_file_identifiers()
if rev == 2:
U, key = _alg34(user_password, O, P, self._ID[0])
self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
# in case call `encrypt` again
entry = self._encryption.write_entry(user_password, owner_password)
if self._encrypt_entry:
# replace old encrypt_entry
assert self._encrypt_entry.indirect_reference is not None
entry.indirect_reference = self._encrypt_entry.indirect_reference
self._objects[entry.indirect_reference.idnum - 1] = entry
else:
assert rev == 3
U, key = _alg35(user_password, rev, keylen, O, P, self._ID[0], False) # type: ignore[arg-type]
encrypt = DictionaryObject()
encrypt[NameObject(SA.FILTER)] = NameObject("/Standard")
encrypt[NameObject("/V")] = NumberObject(V)
if V == 2:
encrypt[NameObject(SA.LENGTH)] = NumberObject(keylen * 8)
encrypt[NameObject(ED.R)] = NumberObject(rev)
encrypt[NameObject(ED.O)] = ByteStringObject(O)
encrypt[NameObject(ED.U)] = ByteStringObject(U)
encrypt[NameObject(ED.P)] = NumberObject(P)
self._encrypt = self._add_object(encrypt)
self._encrypt_key = key
self._add_object(entry)
self._encrypt_entry = entry

def write_stream(self, stream: StreamType) -> None:
if hasattr(stream, "mode") and "b" not in stream.mode:
Expand Down Expand Up @@ -1311,15 +1302,9 @@ def _write_pdf_structure(self, stream: StreamType) -> List[int]:
idnum = i + 1
object_positions.append(stream.tell())
stream.write(b_(str(idnum)) + b" 0 obj\n")
key = None
if hasattr(self, "_encrypt") and idnum != self._encrypt.idnum:
pack1 = struct.pack("<i", i + 1)[:3]
pack2 = struct.pack("<i", 0)[:2]
key = self._encrypt_key + pack1 + pack2
assert len(key) == (len(self._encrypt_key) + 5)
md5_hash = md5(key).digest()
key = md5_hash[: min(16, len(self._encrypt_key) + 5)]
obj.write_to_stream(stream, key)
if self._encryption and obj != self._encrypt_entry:
obj = self._encryption.encrypt_object(obj, idnum, 0)
obj.write_to_stream(stream)
stream.write(b"\nendobj\n")
return object_positions

Expand Down Expand Up @@ -1351,8 +1336,8 @@ def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
)
if hasattr(self, "_ID"):
trailer[NameObject(TK.ID)] = self._ID
if hasattr(self, "_encrypt"):
trailer[NameObject(TK.ENCRYPT)] = self._encrypt
if self._encrypt_entry:
trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
trailer.write_to_stream(stream)
stream.write(b_(f"\nstartxref\n{xref_location}\n%%EOF\n")) # eof

Expand Down
107 changes: 105 additions & 2 deletions tests/test_encryption.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Test the pypdf.encryption module."""
import secrets
from pathlib import Path

import pytest

import pypdf
from pypdf import PasswordType, PdfReader
from pypdf import PasswordType, PdfReader, PdfWriter
from pypdf._encryption import AlgV5, CryptRC4
from pypdf.errors import DependencyError, PdfReadError

Expand Down Expand Up @@ -215,7 +216,7 @@ def test_alg_v5_generate_values():
return
key = b"0123456789123451"
values = AlgV5.generate_values(
R=4,
R=5,
user_password=b"foo",
owner_password=b"bar",
key=key,
Expand All @@ -229,3 +230,105 @@ def test_alg_v5_generate_values():
"/OE": values["/OE"],
"/Perms": values["/Perms"],
}


@pytest.mark.parametrize(
("alg", "requires_pycryptodome"),
[
("RC4-40", False),
("RC4-128", False),
("AES-128", True),
("AES-256-R5", True),
("AES-256", True),
("ABCD", False),
],
)
def test_pdf_encrypt(pdf_file_path, alg, requires_pycryptodome):
user_password = secrets.token_urlsafe(10)
owner_password = secrets.token_urlsafe(10)

reader = PdfReader(RESOURCE_ROOT / "encryption" / "unencrypted.pdf")
page = reader.pages[0]
text0 = page.extract_text()

writer = PdfWriter()
writer.add_page(page)

# test with invalid algorithm name
if alg == "ABCD":
with pytest.raises(ValueError) as exc:
writer.encrypt(
user_password=user_password,
owner_password=owner_password,
algorithm=alg,
)
assert exc.value.args[0] == "algorithm 'ABCD' NOT supported"
return

if requires_pycryptodome and not HAS_PYCRYPTODOME:
with pytest.raises(DependencyError) as exc:
writer.encrypt(
user_password=user_password,
owner_password=owner_password,
algorithm=alg,
)
with open(pdf_file_path, "wb") as output_stream:
writer.write(output_stream)
assert exc.value.args[0] == "PyCryptodome is required for AES algorithm"
return

writer.encrypt(
user_password=user_password, owner_password=owner_password, algorithm=alg
)
with open(pdf_file_path, "wb") as output_stream:
writer.write(output_stream)

reader = PdfReader(pdf_file_path)
assert reader.is_encrypted
assert reader.decrypt(owner_password) == PasswordType.OWNER_PASSWORD
assert reader.decrypt(user_password) == PasswordType.USER_PASSWORD

page = reader.pages[0]
text1 = page.extract_text()
assert text0 == text1


@pytest.mark.parametrize(
"count",
[1, 2, 3, 4, 5, 10],
)
def test_pdf_encrypt_multiple(pdf_file_path, count):
user_password = secrets.token_urlsafe(10)
owner_password = secrets.token_urlsafe(10)

reader = PdfReader(RESOURCE_ROOT / "encryption" / "unencrypted.pdf")
page = reader.pages[0]
text0 = page.extract_text()

writer = PdfWriter()
writer.add_page(page)

if count == 1:
owner_password = None

for _i in range(count):
writer.encrypt(
user_password=user_password,
owner_password=owner_password,
algorithm="RC4-128",
)
with open(pdf_file_path, "wb") as output_stream:
writer.write(output_stream)

reader = PdfReader(pdf_file_path)
assert reader.is_encrypted
if owner_password is None:
# NOTICE: owner_password will set to user_password if it's None
assert reader.decrypt(user_password) == PasswordType.OWNER_PASSWORD
else:
assert reader.decrypt(owner_password) == PasswordType.OWNER_PASSWORD
assert reader.decrypt(user_password) == PasswordType.USER_PASSWORD

page = reader.pages[0]
text1 = page.extract_text()
assert text0 == text1
7 changes: 3 additions & 4 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ def test_basic_features(tmp_path):

# encrypt your new PDF and add a password
password = "secret"
with pytest.warns(UserWarning, match="pypdf only implements RC4 encryption"):
writer.encrypt(password)
# doing it twice should not change anything
writer.encrypt(password)
writer.encrypt(password)
# doing it twice should not change anything
writer.encrypt(password)

# finally, write "output" to pypdf-output.pdf
write_path = tmp_path / "pypdf-output.pdf"
Expand Down
29 changes: 14 additions & 15 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,26 +491,25 @@ def test_encrypt(use_128bit, user_password, owner_password, pdf_file_path):

writer.add_page(page)

with pytest.warns(UserWarning, match="pypdf only implements RC4 encryption"):
with pytest.raises(ValueError, match="owner_pwd of encrypt is deprecated."):
writer.encrypt(
owner_pwd=user_password,
owner_password=owner_password,
user_password=user_password,
use_128bit=use_128bit,
)
with pytest.raises(ValueError, match="'user_pwd' argument is deprecated"):
writer.encrypt(
owner_password=owner_password,
user_password=user_password,
user_pwd=user_password,
use_128bit=use_128bit,
)
with pytest.raises(ValueError, match="owner_pwd of encrypt is deprecated."):
writer.encrypt(
owner_pwd=user_password,
owner_password=owner_password,
user_password=user_password,
use_128bit=use_128bit,
)
with pytest.raises(ValueError, match="'user_pwd' argument is deprecated"):
writer.encrypt(
owner_password=owner_password,
user_password=user_password,
user_pwd=user_password,
use_128bit=use_128bit,
)
writer.encrypt(
user_password=user_password,
owner_password=owner_password,
use_128bit=use_128bit,
)

# write "output" to pypdf-output.pdf
with open(pdf_file_path, "wb") as output_stream:
Expand Down

0 comments on commit 71be38b

Please sign in to comment.