Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash Signing Updates #2

Merged
merged 20 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Secure your Software Supply Chain and your Content Authenticity with immutable data trails. This GitHub Action uses DataTrails implementation of the IETF Supply Chain, Integrity and Trust ([SCITT](https://scitt.io)) APIs.

**NOTE:**:
This SCITT GitHub Action is in Preview, pending adoption of the [SCITT Reference APIs (SCRAPI)](https://datatracker.ietf.org/doc/draft-ietf-scitt-scrapi/).
To use a production supported implementation, please contact [DataTrails](https://www.datatrails.ai/contactus/) for more info.

## Getting Started

To create immutable data trails, an account with a `Client_ID` and `Secret` are required.
Expand Down
43 changes: 26 additions & 17 deletions action.yml
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
name: 'DataTrails SCITT API'
description: 'Register, Get Receipts and Query Feeds from the DataTrails SCITT API'
inputs:
content-type:
description: 'The payload content type (iana mediaType) to be registered on the SCITT Service (eg: application/spdx+json, application/vnd.cyclonedx+json, Scan Result, Attestation)'
required: true
datatrails-client_id:
description: 'The CLIENT_ID used to access the DataTrails SCITT APIs'
required: true
datatrails-secret:
description: 'The SECRET used to access the DataTrails SCITT APIs'
required: true
subject:
description: 'Unique ID for the collection of statements about an artifact'
issuer:
description: 'The name of the issuer, set to CTW_Claims:iss'
required: true
payload:
payload-file:
description: 'The payload file to be registered on the SCITT Service (eg: SBOM, Scan Result, Attestation)'
required: true
content-type:
description: 'The payload content type (iana mediaType) to be registered on the SCITT Service (eg: application/spdx+json, application/vnd.cyclonedx+json, Scan Result, Attestation)'
required: true
signed-statement-file:
description: 'File representing the signed SCITT Statement that will be registered on SCITT.'
payload-location:
description: 'Optional location the content of the payload may be stored.'
required: false
default: 'signed-statement.cbor'
receipt-file:
description: 'The file to save the cbor receipt'
description: 'The filename to save the cbor receipt'
required: false
default: 'receipt.cbor'
signed-statement-file:
description: 'File representing the signed SCITT Statement that will be registered on SCITT.'
required: false
default: 'signed-statement.cbor'
signing-key-file:
description: 'The .pem file used to sign the statement'
required: true
issuer:
description: 'The name of the issuer, set to CTW_Claims:iss'
skip-receipt:
description: 'To skip receipt retrieval, set to 1'
required: false
default: '0'
subject:
description: 'Unique ID for the collection of statements about an artifact'
required: true
outputs:
token: # id of output
Expand All @@ -37,12 +44,14 @@ runs:
using: 'docker'
image: 'Dockerfile'
args:
- ${{ inputs.content-type }}
- ${{ inputs.datatrails-client_id }}
- ${{ inputs.datatrails-secret }}
- ${{ inputs.subject }}
- ${{ inputs.payload }}
- ${{ inputs.content-type }}
- ${{ inputs.signed-statement-file }}
- ${{ inputs.issuer }}
- ${{ inputs.payload-file }}
- ${{ inputs.payload-location}}
- ${{ inputs.receipt-file }}
- ${{ inputs.signed-statement-file }}
- ${{ inputs.signing-key-file }}
- ${{ inputs.issuer }}
- ${{ inputs.skip-receipt }}
- ${{ inputs.subject }}
67 changes: 40 additions & 27 deletions scitt-scripts/check_operation_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
import argparse
import logging
import sys

from time import sleep as time_sleep

Expand All @@ -10,7 +12,7 @@

# all timeouts and durations are in seconds
REQUEST_TIMEOUT = 30
POLL_TIMEOUT = 360
POLL_TIMEOUT = 60
POLL_INTERVAL = 10


Expand Down Expand Up @@ -39,10 +41,7 @@ def get_operation_status(operation_id: str, headers: dict) -> dict:

while True:
response = requests.get(url, timeout=30, headers=headers)
# print("***response:", flush=True)
# print(response, flush=True)
# print(response.json, flush=True)
# print("***response:", flush=True)

if response.status_code == 200:
break
elif response.status_code == 400:
Expand All @@ -53,27 +52,35 @@ def get_operation_status(operation_id: str, headers: dict) -> dict:
return response.json()


def poll_operation_status(operation_id: str, headers: dict) -> str:
def poll_operation_status(
operation_id: str, headers: dict, logger: logging.Logger
) -> str:
"""
polls for the operation status to be 'succeeded'.
"""

poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL)

for _ in range(poll_attempts):
operation_status = get_operation_status(operation_id, headers)
# print("***operation_status:", flush=True)
# print(operation_status, flush=True)
# print("***operation_status:", flush=True)

# pylint: disable=fixme
# TODO: ensure get_operation_status handles error cases from the rest request
if "status" in operation_status and operation_status["status"] == "succeeded":
return operation_status["entryID"]
logger.info("starting to poll for operation status 'succeeded'")

for _ in range(poll_attempts):
try:
operation_status = get_operation_status(operation_id, headers)

# pylint: disable=fixme
# TODO: ensure get_operation_status handles error cases from the rest request
if (
"status" in operation_status
and operation_status["status"] == "succeeded"
):
return operation_status["entryID"]

except requests.HTTPError as e:
logger.debug("failed getting operation status, error: %s", e)

time_sleep(POLL_INTERVAL)

raise TimeoutError("signed statement not registered within polling duration.")
raise TimeoutError("signed statement not registered within polling duration")


def main():
Expand Down Expand Up @@ -108,21 +115,27 @@ def main():
default=default_token_file_name,
)

# log level
parser.add_argument(
"--log-level",
type=str,
help="log level. for any individual poll errors use DEBUG, defaults to WARNING",
default="WARNING",
)

args = parser.parse_args()

# print("args.token_file_name:", flush=True)
# print(args.token_file_name, flush=True)
logger = logging.getLogger("check operation status")
logging.basicConfig(level=logging.getLevelName(args.log_level))

headers = get_token_from_file(args.token_file_name)
# print("headers:", flush=True)
# print(headers, flush=True)

# print("operation_id:", flush=True)
# print(args.operation_id, flush=True)

entry_id = poll_operation_status(args.operation_id, headers)
print(entry_id, flush=True)

try:
entry_id = poll_operation_status(args.operation_id, headers, logger)
print(entry_id)
except TimeoutError as e:
print(e, file=sys.stderr)
sys.exit(1)

if __name__ == "__main__":
main()
85 changes: 46 additions & 39 deletions scitt-scripts/create_hashed_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@

# Signed Hash envelope header labels from:
# https://github.com/OR13/draft-steele-cose-hash-envelope/blob/main/draft-steele-cose-hash-envelope.md
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM = 998
HEADER_LABEL_LOCATION = 999
# pre-adoption/private use parameters
# https://www.iana.org/assignments/cose/cose.xhtml#header-parameters
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM = -6800
HEADER_LABEL_LOCATION = -6801

# CBOR Object Signing and Encryption (COSE) "typ" (type) Header Parameter
# https://datatracker.ietf.org/doc/rfc9596/
HEADER_LABEL_TYPE = 16
COSE_TYPE="application/hashed+cose"

def open_signing_key(key_file: str) -> SigningKey:
"""
Expand All @@ -61,12 +67,12 @@ def open_payload(payload_file: str) -> str:


def create_hashed_signed_statement(
signing_key: SigningKey,
content_type: str,
issuer: str,
payload: str,
payload_location: str,
signing_key: SigningKey,
subject: str,
issuer: str,
content_type: str,
location: str,
) -> bytes:
"""
creates a hashed signed statement, given the signing_key, payload, subject and issuer
Expand All @@ -87,6 +93,7 @@ def create_hashed_signed_statement(
# create a protected header where
# the verification key is attached to the cwt claims
protected_header = {
HEADER_LABEL_TYPE: COSE_TYPE,
Algorithm: Es256,
KID: b"testkey",
ContentType: content_type,
Expand All @@ -103,7 +110,7 @@ def create_hashed_signed_statement(
},
},
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM: -16, # for sha256
HEADER_LABEL_LOCATION: location,
HEADER_LABEL_LOCATION: payload_location,
}

# now create a sha256 hash of the payload
Expand Down Expand Up @@ -139,71 +146,71 @@ def main():

parser = argparse.ArgumentParser(description="Create a signed statement.")

# signing key file
# content-type
parser.add_argument(
"--signing-key-file",
"--content-type",
type=str,
help="filepath to the stored ecdsa P-256 signing key, in pem format.",
default="scitt-signing-key.pem",
help="The iana.org media type for the payload",
default="application/json",
)

# payload-file (a reference to the file that will become the payload of the SCITT Statement)
# issuer
parser.add_argument(
"--payload-file",
"--issuer",
type=str,
help="filepath to the content that will be hashed into the payload of the SCITT Statement.",
default="scitt-payload.json",
help="issuer who owns the signing key.",
)

# content-type
# output file
parser.add_argument(
"--content-type",
"--output-file",
type=str,
help="The iana.org media type for the payload",
default="application/json",
help="name of the output file to store the signed statement.",
default="signed-statement.cbor",
)

# subject
# payload-file (a reference to the file that will become the payload of the SCITT Statement)
parser.add_argument(
"--subject",
"--payload-file",
type=str,
help="subject to correlate statements made about an artifact.",
help="filepath to the content that will be hashed into the payload of the SCITT Statement.",
default="scitt-payload.json",
)

# issuer
# payload-location
parser.add_argument(
"--issuer",
"--payload-location",
type=str,
help="issuer who owns the signing key.",
help="location hint for the original statement that was hashed.",
)

# location hint
# signing key file
parser.add_argument(
"--location-hint",
"--signing-key-file",
type=str,
help="location hint for the original statement that was hashed.",
help="filepath to the stored ecdsa P-256 signing key, in pem format.",
default="scitt-signing-key.pem",
)

# output file
# subject
parser.add_argument(
"--output-file",
"--subject",
type=str,
help="name of the output file to store the signed statement.",
default="signed-statement.cbor",
help="subject to correlate statements made about an artifact.",
)

args = parser.parse_args()

signing_key = open_signing_key(args.signing_key_file)
payload = open_payload(args.payload_file)
payload_contents = open_payload(args.payload_file)

signed_statement = create_hashed_signed_statement(
signing_key,
payload,
args.subject,
args.issuer,
args.content_type,
args.location_hint,
content_type=args.content_type,
issuer=args.issuer,
payload=payload_contents,
payload_location=args.payload_location,
signing_key=signing_key,
subject=args.subject
)

with open(args.output_file, "wb") as output_file:
Expand Down
Loading