Skip to content

Commit

Permalink
sigstore: extract LogEntry conversions to their own functions
Browse files Browse the repository at this point in the history
  • Loading branch information
facutuesca committed Apr 29, 2024
1 parent 56adce4 commit 125caea
Showing 1 changed file with 83 additions and 71 deletions.
154 changes: 83 additions & 71 deletions sigstore/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1
from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1
from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
InclusionPromise,
InclusionProof,
)

Expand Down Expand Up @@ -187,6 +186,77 @@ def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
inclusion_promise=entry["verification"]["signedEntryTimestamp"],
)

@classmethod
def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry:
"""
Create a new `LogEntry` from the given Rekor TransparencyLogEntry.
"""
tlog_entry = rekor_v1.TransparencyLogEntry()
tlog_entry.from_dict(dict_)

inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
# This check is required by us as the client, not the
# protobuf-specs themselves.
if inclusion_proof is None or inclusion_proof.checkpoint.envelope is None:
raise InvalidBundle("entry must contain inclusion proof")

parsed_inclusion_proof = LogInclusionProof(
checkpoint=inclusion_proof.checkpoint.envelope,
hashes=[h.hex() for h in inclusion_proof.hashes],
log_index=inclusion_proof.log_index,
root_hash=inclusion_proof.root_hash.hex(),
tree_size=inclusion_proof.tree_size,
)

return LogEntry(
uuid=None,
body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
integrated_time=tlog_entry.integrated_time,
log_id=tlog_entry.log_id.key_id.hex(),
log_index=tlog_entry.log_index,
inclusion_proof=parsed_inclusion_proof,
inclusion_promise=B64Str(
base64.b64encode(
tlog_entry.inclusion_promise.signed_entry_timestamp
).decode()
),
)

def _to_dict_rekor(self, is_message_signature: bool) -> dict[str, Any]:
inclusion_promise: rekor_v1.InclusionPromise | None = None
if self.inclusion_promise:
inclusion_promise = rekor_v1.InclusionPromise(
signed_entry_timestamp=base64.b64decode(self.inclusion_promise)
)

inclusion_proof = rekor_v1.InclusionProof(
log_index=self.inclusion_proof.log_index,
root_hash=bytes.fromhex(self.inclusion_proof.root_hash),
tree_size=self.inclusion_proof.tree_size,
hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes],
checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint),
)

tlog_entry = rekor_v1.TransparencyLogEntry(
log_index=self.log_index,
log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)),
integrated_time=self.integrated_time,
inclusion_promise=inclusion_promise,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(self.body),
)

# Fill in the appropriate kind
if is_message_signature:
tlog_entry.kind_version = rekor_v1.KindVersion(
kind="hashedrekord", version="0.0.1"
)
else:
tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1")

tlog_entry_dict: dict[str, Any] = tlog_entry.to_dict()
return tlog_entry_dict

def encode_canonical(self) -> bytes:
"""
Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry.
Expand Down Expand Up @@ -366,55 +436,22 @@ def _verify_bundle(self) -> None:
# The inclusion promise is NOT required; if present, the client
# SHOULD verify it.
#
# Beneath all of this, we require that the inclusion proof be present.
inclusion_promise: InclusionPromise | None = tlog_entry.inclusion_promise
inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
# Before all of this, we require that the inclusion proof be present
# (when constructing the LogEntry).
log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict())

if media_type == BundleType.BUNDLE_0_1:
if not inclusion_promise:
if not log_entry.inclusion_promise:
raise InvalidBundle("bundle must contain an inclusion promise")
if inclusion_proof and not inclusion_proof.checkpoint.envelope:
if not log_entry.inclusion_proof.checkpoint:
_logger.debug(
"0.1 bundle contains inclusion proof without checkpoint; ignoring"
)
else:
if not inclusion_proof:
raise InvalidBundle("bundle must contain an inclusion proof")
if not inclusion_proof.checkpoint.envelope:
if not log_entry.inclusion_proof.checkpoint:
raise InvalidBundle("expected checkpoint in inclusion proof")

parsed_inclusion_proof: InclusionProof | None = None
if (
inclusion_proof is not None
and inclusion_proof.checkpoint.envelope is not None
):
parsed_inclusion_proof = LogInclusionProof(
checkpoint=inclusion_proof.checkpoint.envelope,
hashes=[h.hex() for h in inclusion_proof.hashes],
log_index=inclusion_proof.log_index,
root_hash=inclusion_proof.root_hash.hex(),
tree_size=inclusion_proof.tree_size,
)

# Sanity: the only way we can hit this is with a v1 bundle without
# an inclusion proof. Putting this check here rather than above makes
# it clear that this check is required by us as the client, not the
# protobuf-specs themselves.
if parsed_inclusion_proof is None:
raise InvalidBundle("bundle must contain inclusion proof")

self._log_entry = LogEntry(
uuid=None,
body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()),
integrated_time=tlog_entry.integrated_time,
log_id=tlog_entry.log_id.key_id.hex(),
log_index=tlog_entry.log_index,
inclusion_proof=parsed_inclusion_proof,
inclusion_promise=B64Str(
base64.b64encode(
tlog_entry.inclusion_promise.signed_entry_timestamp
).decode()
),
)
self._log_entry = log_entry

@property
def signing_certificate(self) -> Certificate:
Expand Down Expand Up @@ -476,30 +513,6 @@ def _from_parts(
"""
@private
"""
inclusion_promise: rekor_v1.InclusionPromise | None = None
if log_entry.inclusion_promise:
inclusion_promise = rekor_v1.InclusionPromise(
signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise)
)

inclusion_proof = rekor_v1.InclusionProof(
log_index=log_entry.inclusion_proof.log_index,
root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash),
tree_size=log_entry.inclusion_proof.tree_size,
hashes=[bytes.fromhex(hash_) for hash_ in log_entry.inclusion_proof.hashes],
checkpoint=rekor_v1.Checkpoint(
envelope=log_entry.inclusion_proof.checkpoint
),
)

tlog_entry = rekor_v1.TransparencyLogEntry(
log_index=log_entry.log_index,
log_id=common_v1.LogId(key_id=bytes.fromhex(log_entry.log_id)),
integrated_time=log_entry.integrated_time,
inclusion_promise=inclusion_promise,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(log_entry.body),
)

inner = _Bundle(
media_type=BundleType.BUNDLE_0_3.value,
Expand All @@ -508,16 +521,15 @@ def _from_parts(
),
)

is_message_signature = isinstance(content, common_v1.MessageSignature)
# Fill in the appropriate variants.
if isinstance(content, common_v1.MessageSignature):
if is_message_signature:
inner.message_signature = content
tlog_entry.kind_version = rekor_v1.KindVersion(
kind="hashedrekord", version="0.0.1"
)
else:
inner.dsse_envelope = content._inner
tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1")

tlog_entry = rekor_v1.TransparencyLogEntry()
tlog_entry.from_dict(log_entry._to_dict_rekor(is_message_signature))
inner.verification_material.tlog_entries = [tlog_entry]

return cls(inner)

0 comments on commit 125caea

Please sign in to comment.