Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix L3-iGrant/api#689: Update to presentation submission for holder a… #35

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 274 additions & 8 deletions sdjwt/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def match_credentials_for_sd_jwt(
credential_subject, key_mapping = (
decode_credential_sd_to_credential_subject_with_key_mapping(
disclosure_mapping=disclosure_mapping,
credential_subject=credential_decoded
credential_subject=credential_decoded,
)
)
credential = credential_decoded
Expand Down Expand Up @@ -802,23 +802,29 @@ def validate_vp_token(
"limit_disclosure", None
)
if "vc+sd-jwt" in credential_format:
matches = match_credentials(
is_valid, err = validate_vc_token_for_sd_jwt(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims)],
credential_token=vc_token,
limit_disclosure=True if limit_disclosure else False,
)
if is_valid:
is_verified = True
else:
return False
else:
matches = match_credentials(
json.dumps(input_descriptor),
credentials=[json.dumps(vc_claims["vc"])],
)
if not matches or not matches[0]:
return False
else:
is_verified = True
if not matches or not matches[0]:
return False
else:
is_verified = True
if not is_verified:
return False
return True


def decode_credential_sd_to_credential_claims(
disclosure_mapping: dict, credential_subject: dict
) -> dict:
Expand All @@ -830,7 +836,9 @@ def replace_sd_with_credential_subject_attributes(sds: list, disclosure: dict):
for sd in sds:
disclosure_base64 = disclosure.get(sd)
if disclosure_base64:
key, value = decode_disclosure_base64(disclosure_base64=disclosure_base64)
key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
credential_attribute[key] = value
return credential_attribute

Expand Down Expand Up @@ -866,3 +874,261 @@ def iterate_mapping(obj, path):

iterate_mapping(credential_subject, [])
return credential_subject["credentialSubject"]


def remove_and_add_matching_disclosures_to_token(
token: str, disclosure_mapping: dict
) -> str:
token_without_disclosures = token.split("~")[0]
disclosures = []
for sd, disclosure in disclosure_mapping.items():
disclosures.append(disclosure)
if len(disclosures) > 0:
token_with_disclosures = token_without_disclosures + "~" + "~".join(disclosures)
else:
token_with_disclosures = token_without_disclosures

return token_with_disclosures


def apply_json_path_for_sd_jwt(input_json_string, path, disclosure_mapping):
is_matched = False
try:
# Parse input JSON string
parsed_input = json.loads(input_json_string)
except json.JSONDecodeError as e:
return None, e, None

try:
# Parse JSON path string
jsonpath_expr = parse(path)
except Exception as e:
return None, e, None

disclosures = {}

for match in jsonpath_expr.find(parsed_input):
matches = match.value

if isinstance(matches, dict):
child_sd = matches.get("_sd")
if child_sd:
for sd in child_sd:
disclosures[sd] = disclosure_mapping.get(sd)
return True, disclosures, None, matches

path_parts = path.split(".")
length_of_path_parts = len(path_parts)
poped_parts = []
matches = None
while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1:
poped_part = path_parts.pop() # Remove the last segment
parent_path = ".".join(path_parts)
parent_expr = parse(parent_path)
for match in parent_expr.find(parsed_input):
if isinstance(match.value, dict):
sds = match.value.get("_sd")
if sds:
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)

key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
if key == poped_part:
if len(poped_parts) > 0:
disclosure_value = None
if isinstance(value, dict):
for pp in poped_parts:
disclosure_value = value.get(pp)
if disclosure_value:
break
if disclosure_value:
is_matched = True
matches = disclosure_value
disclosures[sd] = disclosure_mapping.get(sd)
else:
is_matched = True
matches = value
disclosures[sd] = disclosure_mapping.get(sd)
poped_parts.append(poped_part)
return is_matched, disclosures, None, matches


def match_credentials_for_ietf_sd_jwt(
input_descriptor_json,
credentials,
limit_disclosure: bool,
):
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return [], e

# To store the matched credentials
matched_credentials = []

# Iterate through each credential
for item in credentials:
for credential_id, credential_token in item.items():
disclosure_mapping = get_all_disclosures_with_sd_from_token(
token=credential_token
)
_, credential_decoded = decode_header_and_claims_in_jwt(credential_token)
credential = json.dumps(credential_decoded)
credential_matched = True
matched_disclosure_mapping = {}
# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

for path_index, path in enumerate(field["path"]):
is_matched, disclosures, err, matches = apply_json_path_for_sd_jwt(
credential, path, disclosure_mapping
)
if is_matched:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter

if validate_json_schema(matches, filter_bytes) is not None:
credential_matched = False
break
matched_disclosure_mapping.update(disclosures)

if not is_matched:
credential_matched = False
break
if credential_matched:
if limit_disclosure:
token = remove_and_add_matching_disclosures_to_token(
token=credential_token,
disclosure_mapping=matched_disclosure_mapping,
)
matched_credential = {credential_id: token}
matched_credentials.append(matched_credential)
else:
matched_credential = {credential_id: credential_token}
matched_credentials.append(matched_credential)

return matched_credentials


def apply_json_path_for_validating_sd_jwt(input_json_string, path, disclosure_mapping):
is_matched = False
try:
# Parse input JSON string
parsed_input = json.loads(input_json_string)
except json.JSONDecodeError as e:
return None, e, None

try:
# Parse JSON path string
jsonpath_expr = parse(path)
except Exception as e:
return None, e, None

for match in jsonpath_expr.find(parsed_input):
matches = match.value

if isinstance(matches, dict):
child_sd = matches.get("_sd")
if child_sd:
for sd in child_sd:
disclosure = disclosure_mapping.get(sd)
if not disclosure:
return False, None, matches
return True, None, matches

path_parts = path.split(".")
length_of_path_parts = len(path_parts)
poped_parts = []
matches = None
while len(path_parts) >= (length_of_path_parts - 1) and len(path_parts) > 1:
poped_part = path_parts.pop() # Remove the last segment
parent_path = ".".join(path_parts)
parent_expr = parse(parent_path)
for match in parent_expr.find(parsed_input):
if isinstance(match.value, dict):
sds = match.value.get("_sd")
if sds:
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)
if disclosure_base64:
key, value = decode_disclosure_base64(
disclosure_base64=disclosure_base64
)
if key == poped_part:
if len(poped_parts) > 0:
disclosure_value = None
if isinstance(value, dict):
for pp in poped_parts:
disclosure_value = value.get(pp)
if disclosure_value:
break
if disclosure_value:
is_matched = True
matches = disclosure_value
else:
is_matched = True
matches = value
poped_parts.append(poped_part)
return is_matched, None, matches


def validate_vc_token_for_sd_jwt(
input_descriptor_json,
credential_token: str,
limit_disclosure: bool,
):
# Deserialise input descriptor json string
try:
descriptor = json.loads(input_descriptor_json)
except json.JSONDecodeError as e:
return False, e

disclosure_mapping = get_all_disclosures_with_sd_from_token(token=credential_token)
_, credential_decoded = decode_header_and_claims_in_jwt(credential_token)
credential = json.dumps(credential_decoded)
credential_matched = True
# Iterate through fields specified in the constraints
for field_index, field in enumerate(descriptor["constraints"]["fields"]):

for path_index, path in enumerate(field["path"]):
is_matched, err, matches = apply_json_path_for_validating_sd_jwt(
credential, path, disclosure_mapping
)
if is_matched:
if "filter" in field:
try:
filter_bytes = json.dumps(field["filter"])
except (TypeError, ValueError) as e:
# Continue to next path, since filter has failed to serialise
continue

# Validate the matched JSON against the field's filter

if validate_json_schema(matches, filter_bytes) is not None:
credential_matched = False
break

if not is_matched:
credential_matched = False
break
if credential_matched:
if limit_disclosure:
return True, None
else:
sds = find_all_sd_values(credential_decoded)
for sd in sds:
disclosure_base64 = disclosure_mapping.get(sd)
if not disclosure_base64:
return False, None
return True, None
else:
return False, None