diff --git a/pycardano/certificate.py b/pycardano/certificate.py index 67bac613..7b8ef42a 100644 --- a/pycardano/certificate.py +++ b/pycardano/certificate.py @@ -25,6 +25,12 @@ "StakeRegistrationAndDelegationAndVoteDelegation", "DRep", "DRepKind", + "AuthCommitteeHotCertificate", + "ResignCommitteeColdCertificate", + "Anchor", + "DrepCredential", + "UnregDrepCertificate", + "UpdateDrepCertificate", ] from pycardano.pool_params import PoolParams @@ -32,6 +38,17 @@ unit_interval = Tuple[int, int] +@dataclass(repr=False) +class Anchor(ArrayCBORSerializable): + url: str + data_hash: bytes + + @classmethod + @limit_primitive_type(list) + def from_primitive(cls: Type[Anchor], values: Union[list, tuple]) -> Anchor: + return cls(url=values[0], data_hash=values[1]) + + @dataclass(repr=False) class StakeCredential(ArrayCBORSerializable): _CODE: Optional[int] = field(init=False, default=None) @@ -403,6 +420,142 @@ def to_primitive(self): return [self.kind.value] +@dataclass(repr=False) +class AuthCommitteeHotCertificate(ArrayCBORSerializable): + _CODE: int = field(init=False, default=14) + + committee_cold_credential: StakeCredential + committee_hot_credential: StakeCredential + + def __post_init__(self): + self._CODE = 14 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[AuthCommitteeHotCertificate], values: Union[list, tuple] + ) -> AuthCommitteeHotCertificate: + if values[0] == 14: + return cls( + committee_cold_credential=StakeCredential.from_primitive(values[1]), + committee_hot_credential=StakeCredential.from_primitive(values[2]), + ) + else: + raise DeserializeException( + f"Invalid AuthCommitteeHotCertificate type {values[0]}" + ) + + +@dataclass(repr=False) +class ResignCommitteeColdCertificate(ArrayCBORSerializable): + _CODE: int = field(init=False, default=15) + + committee_cold_credential: StakeCredential + anchor: Optional[Anchor] + + def __post_init__(self): + self._CODE = 15 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[ResignCommitteeColdCertificate], values: Union[list, tuple] + ) -> ResignCommitteeColdCertificate: + if values[0] == 15: + return cls( + committee_cold_credential=StakeCredential.from_primitive(values[1]), + anchor=( + Anchor.from_primitive(values[2]) if values[2] is not None else None + ), + ) + else: + raise DeserializeException( + f"Invalid ResignCommitteeColdCertificate type {values[0]}" + ) + + +@dataclass(repr=False) +class DrepCredential(ArrayCBORSerializable): + """DRep credential is identical to StakeCredential in structure.""" + + _CODE: int = field(init=False, default=16) + + drep_credential: StakeCredential + coin: int + anchor: Optional[Anchor] = field(default=None) + + def __post_init__(self): + self._CODE = 16 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[DrepCredential], values: Union[list, tuple] + ) -> DrepCredential: + if values[0] == 16: + return cls( + drep_credential=StakeCredential.from_primitive(values[1]), + coin=values[2], + anchor=( + Anchor.from_primitive(values[3]) if values[3] is not None else None + ), + ) + else: + raise DeserializeException(f"Invalid DrepCredential type {values[0]}") + + +@dataclass(repr=False) +class UnregDrepCertificate(ArrayCBORSerializable): + _CODE: int = field(init=False, default=17) + + drep_credential: DrepCredential + coin: int + + def __post_init__(self): + self._CODE = 17 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[UnregDrepCertificate], values: Union[list, tuple] + ) -> UnregDrepCertificate: + if values[0] == 17: + return cls( + drep_credential=DrepCredential.from_primitive(values[1]), + coin=values[2], + ) + else: + raise DeserializeException(f"Invalid UnregDrepCertificate type {values[0]}") + + +@dataclass(repr=False) +class UpdateDrepCertificate(ArrayCBORSerializable): + _CODE: int = field(init=False, default=18) + + drep_credential: DrepCredential + anchor: Optional[Anchor] + + def __post_init__(self): + self._CODE = 18 + + @classmethod + @limit_primitive_type(list) + def from_primitive( + cls: Type[UpdateDrepCertificate], values: Union[list, tuple] + ) -> UpdateDrepCertificate: + if values[0] == 18: + return cls( + drep_credential=DrepCredential.from_primitive(values[1]), + anchor=( + Anchor.from_primitive(values[2]) if values[2] is not None else None + ), + ) + else: + raise DeserializeException( + f"Invalid UpdateDrepCertificate type {values[0]}" + ) + + Certificate = Union[ StakeRegistration, StakeDeregistration, @@ -416,4 +569,8 @@ def to_primitive(self): StakeRegistrationAndDelegation, StakeRegistrationAndVoteDelegation, StakeRegistrationAndDelegationAndVoteDelegation, + AuthCommitteeHotCertificate, + ResignCommitteeColdCertificate, + UnregDrepCertificate, + UpdateDrepCertificate, ] diff --git a/test/pycardano/test_certificate.py b/test/pycardano/test_certificate.py index 58159d42..6c74aa4f 100644 --- a/test/pycardano/test_certificate.py +++ b/test/pycardano/test_certificate.py @@ -1,16 +1,30 @@ import os +import pytest + from pycardano import StakeSigningKey, TransactionBody from pycardano.address import Address from pycardano.certificate import ( + Anchor, + AuthCommitteeHotCertificate, + DrepCredential, PoolRegistration, PoolRetirement, + ResignCommitteeColdCertificate, StakeCredential, StakeDelegation, StakeDeregistration, StakeRegistration, + UnregDrepCertificate, + UpdateDrepCertificate, +) +from pycardano.exception import DeserializeException, InvalidArgumentException +from pycardano.hash import ( # plutus_script_hash, + POOL_KEY_HASH_SIZE, + SCRIPT_HASH_SIZE, + PoolKeyHash, + ScriptHash, ) -from pycardano.hash import POOL_KEY_HASH_SIZE, SCRIPT_HASH_SIZE, PoolKeyHash, ScriptHash TEST_ADDR = Address.from_primitive( "stake_test1upyz3gk6mw5he20apnwfn96cn9rscgvmmsxc9r86dh0k66gswf59n" @@ -149,3 +163,239 @@ def test_staking_certificate_serdes(): after_serdes = TransactionBody.from_cbor(transaction_body.to_cbor()) assert after_serdes == transaction_body + + +def test_anchor(): + url = "https://example.com" + data_hash = bytes.fromhex("1234567890" * 6 + "12") # 32 bytes + anchor = Anchor(url=url, data_hash=data_hash) + + anchor_cbor_hex = anchor.to_cbor_hex() + + assert ( + anchor_cbor_hex + == "827368747470733a2f2f6578616d706c652e636f6d581f12345678901234567890123456789012345678901234567890123456789012" + ) + + assert Anchor.from_cbor(anchor_cbor_hex) == anchor + + +def test_drep_credential(): + stake_credential = StakeCredential(TEST_ADDR.staking_part) + drep_credential = DrepCredential(stake_credential, coin=0) + drep_credential_cbor_hex = drep_credential.to_cbor_hex() + assert ( + drep_credential_cbor_hex + == "84108200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d6900f6" + ) + assert DrepCredential.from_cbor(drep_credential_cbor_hex) == drep_credential + + +def test_unreg_drep_certificate(): + stake_credential = StakeCredential(TEST_ADDR.staking_part) + drep_credential = DrepCredential(stake_credential, coin=0) + coin = 1000000 + unreg_drep_cert = UnregDrepCertificate(drep_credential=drep_credential, coin=coin) + + unreg_drep_cert_cbor_hex = unreg_drep_cert.to_cbor_hex() + + assert ( + unreg_drep_cert_cbor_hex + == "831184108200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d6900f61a000f4240" + ) + + assert UnregDrepCertificate.from_cbor(unreg_drep_cert_cbor_hex) == unreg_drep_cert + + +def test_update_drep_certificate_with_anchor(): + stake_credential = StakeCredential(TEST_ADDR.staking_part) + drep_credential = DrepCredential(stake_credential, coin=0) + url = "https://pycardano.com" + data_hash = bytes.fromhex("1234567890" * 6 + "12") # 32 bytes + anchor = Anchor(url=url, data_hash=data_hash) + update_drep_cert = UpdateDrepCertificate( + drep_credential=drep_credential, anchor=anchor + ) + + update_drep_cert_cbor_hex = update_drep_cert.to_cbor_hex() + + assert ( + update_drep_cert_cbor_hex + == "831284108200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d6900f6827568747470733a2f2f707963617264616e6f2e636f6d581f12345678901234567890123456789012345678901234567890123456789012" + ) + + assert ( + UpdateDrepCertificate.from_cbor(update_drep_cert_cbor_hex) == update_drep_cert + ) + + +def test_update_drep_certificate_without_anchor(): + stake_credential = StakeCredential(TEST_ADDR.staking_part) + drep_credential = DrepCredential(stake_credential, coin=0) + update_drep_cert = UpdateDrepCertificate( + drep_credential=drep_credential, anchor=None + ) + update_drep_cert_cbor_hex = update_drep_cert.to_cbor_hex() + + assert ( + update_drep_cert_cbor_hex + == "831284108200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d6900f6f6" + ) + + assert ( + UpdateDrepCertificate.from_cbor(update_drep_cert_cbor_hex) == update_drep_cert + ) + + +def test_auth_committee_hot_certificate(): + committee_cold_credential = StakeCredential(TEST_ADDR.staking_part) + committee_hot_credential = StakeCredential(ScriptHash(b"1" * SCRIPT_HASH_SIZE)) + auth_committee_hot_cert = AuthCommitteeHotCertificate( + committee_cold_credential=committee_cold_credential, + committee_hot_credential=committee_hot_credential, + ) + + auth_committee_hot_cert_cbor_hex = auth_committee_hot_cert.to_cbor_hex() + + assert ( + auth_committee_hot_cert_cbor_hex + == "830e8200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d698201581c31313131313131313131313131313131313131313131313131313131" + ) + + assert ( + AuthCommitteeHotCertificate.from_cbor(auth_committee_hot_cert_cbor_hex) + == auth_committee_hot_cert + ) + + +def test_resign_committee_cold_certificate_with_anchor(): + committee_cold_credential = StakeCredential(TEST_ADDR.staking_part) + url = "https://pycardano.com" + data_hash = bytes.fromhex("1234567890" * 6 + "12") + anchor = Anchor(url=url, data_hash=data_hash) + resign_committee_cold_cert = ResignCommitteeColdCertificate( + committee_cold_credential=committee_cold_credential, + anchor=anchor, + ) + + resign_committee_cold_cert_cbor_hex = resign_committee_cold_cert.to_cbor_hex() + + assert ( + resign_committee_cold_cert_cbor_hex + == "830f8200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d69827568747470733a2f2f707963617264616e6f2e636f6d581f12345678901234567890123456789012345678901234567890123456789012" + ) + + assert ( + ResignCommitteeColdCertificate.from_cbor(resign_committee_cold_cert_cbor_hex) + == resign_committee_cold_cert + ) + + +def test_resign_committee_cold_certificate_without_anchor(): + committee_cold_credential = StakeCredential(TEST_ADDR.staking_part) + resign_committee_cold_cert = ResignCommitteeColdCertificate( + committee_cold_credential=committee_cold_credential, + anchor=None, + ) + resign_committee_cold_cert_cbor_hex = resign_committee_cold_cert.to_cbor_hex() + + assert ( + resign_committee_cold_cert_cbor_hex + == "830f8200581c4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d69f6" + ) + + assert ( + ResignCommitteeColdCertificate.from_cbor(resign_committee_cold_cert_cbor_hex) + == resign_committee_cold_cert + ) + + +def test_invalid_certificate_types(): + with pytest.raises(DeserializeException) as excinfo: + StakeRegistration.from_primitive([1, [0, b"1" * 28]]) + assert "Invalid StakeRegistration type 1" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + StakeDeregistration.from_primitive([0, [0, b"1" * 28]]) + assert "Invalid StakeDeregistration type 0" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + StakeDelegation.from_primitive([1, [0, b"1" * 28], b"1" * 28]) + assert "Invalid StakeDelegation type 1" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + PoolRegistration.from_primitive([4, b"1" * 28]) + assert "Invalid PoolRegistration type 4" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + PoolRetirement.from_primitive([3, b"1" * 28, 100]) + assert "Invalid PoolRetirement type 3" in str(excinfo.value) + + staking_key = StakeSigningKey.generate() + committee_cold_credential = StakeCredential( + staking_key.to_verification_key().hash() + ) + committee_hot_credential = StakeCredential(ScriptHash(b"1" * SCRIPT_HASH_SIZE)) + + with pytest.raises(DeserializeException) as excinfo: + AuthCommitteeHotCertificate.from_primitive( + [15, committee_cold_credential, committee_hot_credential] + ) + assert "Invalid AuthCommitteeHotCertificate type 15" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + ResignCommitteeColdCertificate.from_primitive( + [14, committee_cold_credential, None] + ) + assert "Invalid ResignCommitteeColdCertificate type 14" in str(excinfo.value) + + stake_credential = StakeCredential(TEST_ADDR.staking_part) + drep_credential = DrepCredential(stake_credential, coin=0) + + with pytest.raises(DeserializeException) as excinfo: + UnregDrepCertificate.from_primitive([18, drep_credential, 1000000]) + assert "Invalid UnregDrepCertificate type 18" in str(excinfo.value) + + with pytest.raises(DeserializeException) as excinfo: + UpdateDrepCertificate.from_primitive([17, drep_credential, None]) + assert "Invalid UpdateDrepCertificate type 17" in str(excinfo.value) + + +def test_certificate_in_transaction(): + staking_key = StakeSigningKey.generate() + committee_cold_credential = StakeCredential( + staking_key.to_verification_key().hash() + ) + committee_hot_credential = StakeCredential(ScriptHash(b"1" * SCRIPT_HASH_SIZE)) + + auth_committee_hot_cert = AuthCommitteeHotCertificate( + committee_cold_credential=committee_cold_credential, + committee_hot_credential=committee_hot_credential, + ) + + url = "https://example.com" + data_hash = bytes.fromhex("1234567890" * 6 + "12") # 32 bytes + anchor = Anchor(url=url, data_hash=data_hash) + resign_committee_cold_cert = ResignCommitteeColdCertificate( + committee_cold_credential=committee_cold_credential, + anchor=anchor, + ) + + # Create transaction with certificates + transaction_body = TransactionBody( + certificates=[ + auth_committee_hot_cert, + resign_committee_cold_cert, + ] + ) + + # Test serialization/deserialization + after_serdes = TransactionBody.from_cbor(transaction_body.to_cbor()) + assert after_serdes == transaction_body + + # Verify certificates in deserialized transaction + assert len(after_serdes.certificates) == 2 + assert isinstance(after_serdes.certificates[0], AuthCommitteeHotCertificate) + assert isinstance(after_serdes.certificates[1], ResignCommitteeColdCertificate) + assert after_serdes.certificates[0] == auth_committee_hot_cert + assert after_serdes.certificates[1] == resign_committee_cold_cert