Skip to content

Commit

Permalink
MAINT: Prepare for adding AES support via #1816 (#1917)
Browse files Browse the repository at this point in the history
* `CryptFilter.encrypt_object` implemented
* `AlgV5.generate_values` now crops the user_password / owner_password to 127 bytes
* The `EncryptAlgorithm` Enum was added. It contains the parameter V (version), R (revision), and length
* `Encryption.encrypt_object` was added
* `Encryption.write_entry` was added
* The static method `Encryption.make` was added

This PR was only made to make it easier to merge the other changes / to avoid merge conflicts of other changes with #1816 . Full credit goes to exiledkingcc.

The PR is marked as "MAINT" as it doesn't add a new feature that an end-user could use.

Co-authored-by: exiledkingcc <exiledkingcc@gmail.com>
  • Loading branch information
MartinThoma and exiledkingcc authored Jun 25, 2023
1 parent bd904ea commit 34a9abf
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 7 deletions.
146 changes: 143 additions & 3 deletions pypdf/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import hashlib
import secrets
import struct
from enum import IntEnum
from enum import Enum, IntEnum
from typing import Any, Dict, Optional, Tuple, Union, cast

from ._utils import logger_warning
Expand All @@ -36,6 +36,8 @@
ArrayObject,
ByteStringObject,
DictionaryObject,
NameObject,
NumberObject,
PdfObject,
StreamObject,
TextStringObject,
Expand Down Expand Up @@ -175,8 +177,28 @@ def __init__(
self.efCrypt = efCrypt

def encrypt_object(self, obj: PdfObject) -> PdfObject:
# TODO
return NotImplemented
if isinstance(obj, ByteStringObject):
data = self.strCrypt.encrypt(obj.original_bytes)
obj = ByteStringObject(data)
if isinstance(obj, TextStringObject):
data = self.strCrypt.encrypt(obj.get_encoded_bytes())
obj = ByteStringObject(data)
elif isinstance(obj, StreamObject):
obj2 = StreamObject()
obj2.update(obj)
obj2._data = self.stmCrypt.encrypt(obj._data)
obj = obj2
elif isinstance(obj, DictionaryObject):
obj2 = DictionaryObject() # type: ignore
for key, value in obj.items():
obj2[key] = self.encrypt_object(value)
obj = obj2
elif isinstance(obj, ArrayObject):
obj2 = ArrayObject() # type: ignore
for x in obj:
obj2.append(self.encrypt_object(x)) # type: ignore
obj = obj2
return obj

def decrypt_object(self, obj: PdfObject) -> PdfObject:
if isinstance(obj, (ByteStringObject, TextStringObject)):
Expand Down Expand Up @@ -737,6 +759,8 @@ def generate_values(
p: int,
metadata_encrypted: bool,
) -> Dict[Any, Any]:
user_password = user_password[:127]
owner_password = owner_password[:127]
u_value, ue_value = AlgV5.compute_U_value(R, user_password, key)
o_value, oe_value = AlgV5.compute_O_value(R, owner_password, key, u_value)
perms = AlgV5.compute_Perms_value(key, p, metadata_encrypted)
Expand Down Expand Up @@ -874,6 +898,15 @@ class PasswordType(IntEnum):
OWNER_PASSWORD = 2


class EncryptAlgorithm(tuple, Enum): # noqa: SLOT001
# V, R, Length
RC4_40 = (1, 2, 40)
RC4_128 = (2, 3, 128)
AES_128 = (4, 4, 128)
AES_256_R5 = (5, 5, 256)
AES_256 = (5, 6, 256)


class EncryptionValues:
O: bytes # noqa
U: bytes
Expand Down Expand Up @@ -940,6 +973,14 @@ def __init__(
def is_decrypted(self) -> bool:
return self._password_type != PasswordType.NOT_DECRYPTED

def encrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObject:
# skip calculate key
if not self._is_encryption_object(obj):
return obj

cf = self._make_crypt_filter(idnum, generation)
return cf.encrypt_object(obj)

def decrypt_object(self, obj: PdfObject, idnum: int, generation: int) -> PdfObject:
# skip calculate key
if not self._is_encryption_object(obj):
Expand Down Expand Up @@ -1103,6 +1144,77 @@ def verify_v5(self, password: bytes) -> Tuple[bytes, PasswordType]:
logger_warning("ignore '/Perms' verify failed", __name__)
return key, rc

def write_entry(
self, user_password: str, owner_password: Optional[str]
) -> DictionaryObject:
user_pwd = self._encode_password(user_password)
owner_pwd = self._encode_password(owner_password) if owner_password else None
if owner_pwd is None:
owner_pwd = user_pwd

if self.V <= 4:
self.compute_values_v4(user_pwd, owner_pwd)
else:
self._key = secrets.token_bytes(self.Length // 8)
values = AlgV5.generate_values(
self.R, user_pwd, owner_pwd, self._key, self.P, self.EncryptMetadata
)
self.values.O = values["/O"]
self.values.U = values["/U"]
self.values.OE = values["/OE"]
self.values.UE = values["/UE"]
self.values.Perms = values["/Perms"]

dictObj = DictionaryObject()
dictObj[NameObject("/V")] = NumberObject(self.V)
dictObj[NameObject("/R")] = NumberObject(self.R)
dictObj[NameObject("/Length")] = NumberObject(self.Length)
dictObj[NameObject("/P")] = NumberObject(self.P)
dictObj[NameObject("/Filter")] = NameObject("/Standard")
# ignore /EncryptMetadata

dictObj[NameObject("/O")] = ByteStringObject(self.values.O)
dictObj[NameObject("/U")] = ByteStringObject(self.values.U)

if self.V >= 4:
# TODO: allow different method
StdCF = DictionaryObject()
StdCF[NameObject("/AuthEvent")] = NameObject("/DocOpen")
StdCF[NameObject("/CFM")] = NameObject(self.StmF)
StdCF[NameObject("/Length")] = NumberObject(self.Length // 8)
CF = DictionaryObject()
CF[NameObject("/StdCF")] = StdCF
dictObj[NameObject("/CF")] = CF
dictObj[NameObject("/StmF")] = NameObject("/StdCF")
dictObj[NameObject("/StrF")] = NameObject("/StdCF")
# ignore EFF
# dictObj[NameObject("/EFF")] = NameObject("/StdCF")

if self.V >= 5:
dictObj[NameObject("/OE")] = ByteStringObject(self.values.OE)
dictObj[NameObject("/UE")] = ByteStringObject(self.values.UE)
dictObj[NameObject("/Perms")] = ByteStringObject(self.values.Perms)
return dictObj

def compute_values_v4(self, user_password: bytes, owner_password: bytes) -> None:
rc4_key = AlgV4.compute_O_value_key(owner_password, self.R, self.Length)
o_value = AlgV4.compute_O_value(rc4_key, user_password, self.R)

key = AlgV4.compute_key(
user_password,
self.R,
self.Length,
o_value,
self.P,
self.id1_entry,
self.EncryptMetadata,
)
u_value = AlgV4.compute_U_value(key, self.R, self.id1_entry)

self._key = key
self.values.O = o_value
self.values.U = u_value

@staticmethod
def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encryption":
filter = encryption_entry.get("/Filter")
Expand Down Expand Up @@ -1166,3 +1278,31 @@ def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encrypti
EFF=EFF,
entry=encryption_entry, # can be deleted?
)

@staticmethod
def make(
alg: EncryptAlgorithm, permissions: int, first_id_entry: bytes
) -> "Encryption":
V, R, Length = cast(tuple, alg)
P = permissions

StmF, StrF, EFF = "/V2", "/V2", "/V2"

if alg == EncryptAlgorithm.AES_128:
StmF, StrF, EFF = "/AESV2", "/AESV2", "/AESV2"
elif alg in (EncryptAlgorithm.AES_256_R5, EncryptAlgorithm.AES_256):
StmF, StrF, EFF = "/AESV3", "/AESV3", "/AESV3"

return Encryption(
V=V,
R=R,
Length=Length,
P=P,
EncryptMetadata=True,
first_id_entry=first_id_entry,
values=None,
StrF=StrF,
StmF=StmF,
EFF=EFF,
entry=DictionaryObject(), # Dummy entry for the moment; will get removed
)
10 changes: 7 additions & 3 deletions pypdf/generic/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,16 +557,20 @@ def get_original_bytes(self) -> bytes:
else:
raise Exception("no information about original bytes")

def write_to_stream(
self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
) -> None:
def get_encoded_bytes(self) -> bytes:
# Try to write the string out as a PDFDocEncoding encoded string. It's
# nicer to look at in the PDF file. Sadly, we take a performance hit
# here for trying...
try:
bytearr = encode_pdfdocencoding(self)
except UnicodeEncodeError:
bytearr = codecs.BOM_UTF16_BE + self.encode("utf-16be")
return bytearr

def write_to_stream(
self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
) -> None:
bytearr = self.get_encoded_bytes()
if encryption_key:
from .._security import RC4_encrypt

Expand Down
2 changes: 1 addition & 1 deletion tests/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# qpdf --encrypt "asdfzxcv" "" 40 -- unencrypted.pdf r2-user-password.pdf
("r2-user-password.pdf", False),
# created by:
# qpdf --encrypt "" "asdfzxcv" 40 -- unencrypted.pdf r2-user-password.pdf
# qpdf --encrypt "" "asdfzxcv" 40 -- unencrypted.pdf r2-owner-password.pdf
("r2-owner-password.pdf", False),
# created by:
# qpdf --encrypt "asdfzxcv" "" 128 -- unencrypted.pdf r3-user-password.pdf
Expand Down

0 comments on commit 34a9abf

Please sign in to comment.