Skip to content

Commit

Permalink
Ensure algorithm query param is passed for CSFLE (#1889)
Browse files Browse the repository at this point in the history
* Add missing algorithm query param

* Add test
  • Loading branch information
rayokota authored Jan 3, 2025
1 parent 4f25c8c commit 3e3f17a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _field_transform(self, ctx: RuleContext, field_ctx: FieldContext, field_valu
"name": field_ctx.name,
"typeName": field_ctx.type_name(),
"tags": [celtypes.StringType(tag) for tag in field_ctx.tags],
"message": msg_to_cel(field_value),
"message": msg_to_cel(field_ctx.containing_message),
}
return self._executor.execute(ctx, field_value, args)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ def get_dek(
if dek is not None:
return dek

query = {'deleted': deleted}
query = {'algorithm': algorithm, 'deleted': deleted}
response = self._rest_client.get('/dek-registry/v1/keks/{}/deks/{}/versions/{}'
.format(urllib.parse.quote(kek_name),
urllib.parse.quote(subject, safe=''),
Expand Down
68 changes: 68 additions & 0 deletions tests/schema_registry/test_avro_serdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,74 @@ def test_avro_encryption():
assert obj == obj2


def test_avro_encryption_deterministic():
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())

conf = {'url': _BASE_URL}
client = SchemaRegistryClient.new_client(conf)
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
rule_conf = {'secret': 'mysecret'}
schema = {
'type': 'record',
'name': 'test',
'fields': [
{'name': 'intField', 'type': 'int'},
{'name': 'doubleField', 'type': 'double'},
{'name': 'stringField', 'type': 'string', 'confluent:tags': ['PII']},
{'name': 'booleanField', 'type': 'boolean'},
{'name': 'bytesField', 'type': 'bytes', 'confluent:tags': ['PII']},
]
}

rule = Rule(
"test-encrypt",
"",
RuleKind.TRANSFORM,
RuleMode.WRITEREAD,
"ENCRYPT",
["PII"],
RuleParams({
"encrypt.kek.name": "kek1",
"encrypt.kms.type": "local-kms",
"encrypt.kms.key.id": "mykey",
"encrypt.dek.algorithm": "AES256_SIV"
}),
None,
None,
"ERROR,NONE",
False
)
client.register_schema(_SUBJECT, Schema(
json.dumps(schema),
"AVRO",
[],
None,
RuleSet(None, [rule])
))

obj = {
'intField': 123,
'doubleField': 45.67,
'stringField': 'hi',
'booleanField': True,
'bytesField': b'foobar',
}
ser = AvroSerializer(client, schema_str=None, conf=ser_conf, rule_conf=rule_conf)
dek_client = executor.client
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
obj_bytes = ser(obj, ser_ctx)

# reset encrypted fields
assert obj['stringField'] != 'hi'
obj['stringField'] = 'hi'
obj['bytesField'] = b'foobar'

deser = AvroDeserializer(client, rule_conf=rule_conf)
executor.client = dek_client
obj2 = deser(obj_bytes, ser_ctx)
assert obj == obj2


def test_avro_encryption_cel():
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())

Expand Down

0 comments on commit 3e3f17a

Please sign in to comment.