From cf63ae8480e6b03aca437b658cc10a935129a819 Mon Sep 17 00:00:00 2001 From: Guangning E Date: Fri, 19 Feb 2021 07:02:11 +0800 Subject: [PATCH] [feature][python-client]support python end to end encryption (#9588) * Support python end to end encryption * Add test * Add document for new args * Fixed test by use absolute path --- .../include/pulsar/CryptoKeyReader.h | 4 +- pulsar-client-cpp/lib/CryptoKeyReader.cc | 5 +++ pulsar-client-cpp/python/CMakeLists.txt | 3 +- pulsar-client-cpp/python/pulsar/__init__.py | 39 ++++++++++++++++++- pulsar-client-cpp/python/pulsar_test.py | 22 ++++++++++- pulsar-client-cpp/python/src/config.cc | 17 ++++++++ .../python/src/cryptoKeyReader.cc | 32 +++++++++++++++ pulsar-client-cpp/python/src/pulsar.cc | 2 + pulsar-client-cpp/python/src/utils.h | 7 ++++ 9 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 pulsar-client-cpp/python/src/cryptoKeyReader.cc diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h index 0d81d9429e66e..6b371f0d24a5d 100644 --- a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h +++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h @@ -62,6 +62,8 @@ class PULSAR_PUBLIC CryptoKeyReader { }; /* namespace pulsar */ +typedef std::shared_ptr CryptoKeyReaderPtr; + class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader { private: std::string publicKeyPath_; @@ -76,9 +78,9 @@ class PULSAR_PUBLIC DefaultCryptoKeyReader : public CryptoKeyReader { EncryptionKeyInfo& encKeyInfo) const; Result getPrivateKey(const std::string& keyName, std::map& metadata, EncryptionKeyInfo& encKeyInfo) const; + static CryptoKeyReaderPtr create(const std::string& publicKeyPath, const std::string& privateKeyPath); }; /* namespace pulsar */ -typedef std::shared_ptr CryptoKeyReaderPtr; } // namespace pulsar #endif /* CRYPTOKEYREADER_H_ */ diff --git a/pulsar-client-cpp/lib/CryptoKeyReader.cc b/pulsar-client-cpp/lib/CryptoKeyReader.cc index 7a5d9eeee277f..1eb73e8fe9fe9 100644 --- a/pulsar-client-cpp/lib/CryptoKeyReader.cc +++ b/pulsar-client-cpp/lib/CryptoKeyReader.cc @@ -72,4 +72,9 @@ Result DefaultCryptoKeyReader::getPrivateKey(const std::string& keyName, encKeyInfo.setKey(keyContents); return ResultOk; +} + +CryptoKeyReaderPtr DefaultCryptoKeyReader::create(const std::string& publicKeyPath, + const std::string& privateKeyPath) { + return CryptoKeyReaderPtr(new DefaultCryptoKeyReader(publicKeyPath, privateKeyPath)); } \ No newline at end of file diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt index 2607ab38c16b2..e78d80a8eb06e 100644 --- a/pulsar-client-cpp/python/CMakeLists.txt +++ b/pulsar-client-cpp/python/CMakeLists.txt @@ -28,7 +28,8 @@ ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/message.cc src/authentication.cc src/reader.cc - src/schema.cc) + src/schema.cc + src/cryptoKeyReader.cc) SET(CMAKE_SHARED_LIBRARY_PREFIX ) SET(CMAKE_SHARED_LIBRARY_SUFFIX .so) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index cfae7e0f1eac9..b47c87d32e6e2 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -452,6 +452,8 @@ def create_producer(self, topic, message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, properties=None, batching_type=BatchingType.Default, + encryption_key=None, + crypto_key_reader=None ): """ Create a new producer on a given topic. @@ -519,6 +521,11 @@ def create_producer(self, topic, (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] + * encryption_key: + The key used for symmetric encryption, configured on the producer side + * crypto_key_reader: + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, topic, 'topic') _check_type_or_none(str, producer_name, 'producer_name') @@ -535,6 +542,8 @@ def create_producer(self, topic, _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') _check_type_or_none(dict, properties, 'properties') _check_type(BatchingType, batching_type, 'batching_type') + _check_type_or_none(str, encryption_key, 'encryption_key') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) @@ -557,6 +566,10 @@ def create_producer(self, topic, conf.property(k, v) conf.schema(schema.schema_info()) + if encryption_key: + conf.encryption_key(encryption_key) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) p = Producer() p._producer = self._client.create_producer(topic, conf) @@ -576,7 +589,8 @@ def subscribe(self, topic, subscription_name, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, - initial_position=InitialPosition.Latest + initial_position=InitialPosition.Latest, + crypto_key_reader=None ): """ Subscribe to the given topic and subscription combination. @@ -649,6 +663,9 @@ def my_listener(consumer, message): Set the initial position of a consumer when subscribing to the topic. It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. Default: `Latest`. + * crypto_key_reader: + Symmetric encryption class implementation, configuring public key encryption messages for the producer + and private key decryption messages for the consumer """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -664,6 +681,7 @@ def my_listener(consumer, message): _check_type(bool, is_read_compacted, 'is_read_compacted') _check_type_or_none(dict, properties, 'properties') _check_type(InitialPosition, initial_position, 'initial_position') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -686,6 +704,9 @@ def my_listener(consumer, message): conf.schema(schema.schema_info()) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + c = Consumer() if isinstance(topic, str): # Single topic @@ -1224,6 +1245,22 @@ def close(self): self._reader.close() self._client._consumers.remove(self) +class CryptoKeyReader: + """ + Default crypto key reader implementation + """ + def __init__(self, public_key_path, private_key_path): + """ + Create crypto key reader. + + **Args** + + * `public_key_path`: Path to the public key + * `private_key_path`: Path to private key + """ + _check_type(str, public_key_path, 'public_key_path') + _check_type(str, private_key_path, 'private_key_path') + self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) def _check_type(var_type, var, name): if not isinstance(var, var_type): diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index f05683235b5c3..e7d05f37cd6d6 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -26,7 +26,8 @@ from datetime import timedelta from pulsar import Client, MessageId, \ CompressionType, ConsumerType, PartitionsRoutingMode, \ - AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition + AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ + CryptoKeyReader from _pulsar import ProducerConfiguration, ConsumerConfiguration @@ -357,6 +358,25 @@ def test_tls_auth2(self): client.close() + def test_encryption(self): + publicKeyPath = "/pulsar//pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem" + privateKeyPath = "/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem" + crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath) + client = Client(self.serviceUrl) + topic = 'my-python-test-end-to-end-encryption' + consumer = client.subscribe(topic=topic, + subscription_name='my-subscription', + crypto_key_reader=crypto_key_reader) + producer = client.create_producer(topic=topic, + encryption_key="client-rsa.pem", + crypto_key_reader=crypto_key_reader) + producer.send('hello') + msg = consumer.receive(TM) + self.assertTrue(msg) + self.assertEqual(msg.value(), 'hello') + consumer.unsubscribe() + client.close() + def test_tls_auth3(self): certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/' if not os.path.exists(certs_dir): diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 9aadf92a1f16e..188aaf546d176 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -74,6 +74,20 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur return conf; } +static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf, + py::object cryptoKeyReader) { + CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract(cryptoKeyReader); + conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); + return conf; +} + +static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf, + py::object cryptoKeyReader) { + CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract(cryptoKeyReader); + conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); + return conf; +} + void export_config() { using namespace boost::python; @@ -128,6 +142,8 @@ void export_config() { .def("property", &ProducerConfiguration::setProperty, return_self<>()) .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>()) .def("batching_type", &ProducerConfiguration::getBatchingType) + .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) + .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()) ; class_("ConsumerConfiguration") @@ -155,6 +171,7 @@ void export_config() { .def("property", &ConsumerConfiguration::setProperty, return_self<>()) .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) + .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>()) ; class_("ReaderConfiguration") diff --git a/pulsar-client-cpp/python/src/cryptoKeyReader.cc b/pulsar-client-cpp/python/src/cryptoKeyReader.cc new file mode 100644 index 0000000000000..ccefe6f18b970 --- /dev/null +++ b/pulsar-client-cpp/python/src/cryptoKeyReader.cc @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "utils.h" + +CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {} + +CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath, + const std::string& privateKeyPath) { + this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath); +} + +void export_cryptoKeyReader() { + using namespace boost::python; + + class_("CryptoKeyReader", init()); +} \ No newline at end of file diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc index b26a25208b16c..f80c9a4014f2b 100644 --- a/pulsar-client-cpp/python/src/pulsar.cc +++ b/pulsar-client-cpp/python/src/pulsar.cc @@ -27,6 +27,7 @@ void export_config(); void export_enums(); void export_authentication(); void export_schema(); +void export_cryptoKeyReader(); static void translateException(const PulsarException& ex) { @@ -53,4 +54,5 @@ BOOST_PYTHON_MODULE(_pulsar) export_enums(); export_authentication(); export_schema(); + export_cryptoKeyReader(); } diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/utils.h index 8471d03fad356..457d1f85382b0 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/utils.h @@ -43,3 +43,10 @@ struct AuthenticationWrapper { AuthenticationWrapper(); AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString); }; + +struct CryptoKeyReaderWrapper { + CryptoKeyReaderPtr cryptoKeyReader; + + CryptoKeyReaderWrapper(); + CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath); +};