diff --git a/README.md b/README.md index d151dc0..dcca35b 100644 --- a/README.md +++ b/README.md @@ -29,51 +29,65 @@ Visit [the official website](https://dubbo.apache.org/) for more information. - **Serialization**: Customizable(protobuf, json...) -## Getting started -Before you begin, ensure that you have **`python 3.11+`**. Then, install Dubbo-Python in your project using the following steps: +## Installation + +Before you start, make sure you have **`python 3.11+`** installed. -```shell -git clone https://github.com/apache/dubbo-python.git -cd dubbo-python && pip install . -``` +1. Install from source + + ```sh + git clone https://github.com/apache/dubbo-python.git + cd dubbo-python && pip install . + ``` + + +## Getting started -Get started with Dubbo-Python in just 5 minutes by following our [Quick Start Guide](https://github.com/apache/dubbo-python/tree/main/samples). +Get up and running with Dubbo-Python in just 5 minutes by following our [Quick Start Guide](https://github.com/apache/dubbo-python/tree/main/samples). -It's as simple as the following code snippet. With just a few lines of code, you can launch a fully functional point-to-point RPC service : +It's as simple as the code snippet below. With just a few lines of code, you can launch a fully functional point-to-point RPC service: -1. Build and start the Server +1. Build and start the server ```python import dubbo from dubbo.configs import ServiceConfig - from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler + from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler - def handle_unary(request): - s = request.decode("utf-8") - print(f"Received request: {s}") - return (s + " world").encode("utf-8") + class UnaryServiceServicer: + def say_hello(self, message: bytes) -> bytes: + print(f"Received message from client: {message}") + return b"Hello from server" - if __name__ == "__main__": + def build_service_handler(): # build a method handler - method_handler = RpcMethodHandler.unary(handle_unary) + method_handler = RpcMethodHandler.unary( + method=UnaryServiceServicer().say_hello, method_name="unary" + ) # build a service handler service_handler = RpcServiceHandler( service_name="org.apache.dubbo.samples.HelloWorld", - method_handlers={"unary": method_handler}, + method_handlers=[method_handler], ) + return service_handler - service_config = ServiceConfig(service_handler) + if __name__ == "__main__": + # build service config + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() input("Press Enter to stop the server...\n") ``` -2. Build and start the Client +1. Build and start the Client ```python import dubbo @@ -81,24 +95,25 @@ It's as simple as the following code snippet. With just a few lines of code, you class UnaryServiceStub: - def __init__(self, client: dubbo.Client): self.unary = client.unary(method_name="unary") - def unary(self, request): - return self.unary(request) + def say_hello(self, message: bytes) -> bytes: + return self.unary(message) if __name__ == "__main__": + # Create a client reference_config = ReferenceConfig.from_url( "tri://127.0.0.1:50051/org.apache.dubbo.samples.HelloWorld" ) dubbo_client = dubbo.Client(reference_config) - unary_service_stub = UnaryServiceStub(dubbo_client) - result = unary_service_stub.unary("hello".encode("utf-8")) - print(result.decode("utf-8")) + # Call the remote method + result = unary_service_stub.say_hello(b"Hello from client") + print(result) + ``` diff --git a/dubbo/classes.py b/dubbo/classes.py index b27c7b9..0a396c8 100644 --- a/dubbo/classes.py +++ b/dubbo/classes.py @@ -13,10 +13,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import abc import threading +from typing import Callable, Optional, Tuple, Any, Union + +from dubbo.types import DeserializingFunction, RpcType, RpcTypes, SerializingFunction + +__all__ = [ + "EOF", + "SingletonBase", + "MethodDescriptor", + "ReadStream", + "WriteStream", + "ReadWriteStream", +] + + +class _EOF: + """ + EOF is a class representing the end flag. + """ + + _repr_str = "" + + def __bool__(self): + return False + + def __len__(self): + return 0 -__all__ = ["SingletonBase"] + def __repr__(self) -> str: + return self._repr_str + + def __str__(self) -> str: + return self._repr_str + + +# The EOF object -> global constant +EOF = _EOF() class SingletonBase: @@ -39,3 +73,180 @@ def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(SingletonBase, cls).__new__(cls) return cls._instance + + +class MethodDescriptor: + """ + MethodDescriptor is a descriptor for a method. + It contains the method name, the method, and the method's serialization and deserialization methods. + """ + + __slots__ = [ + "_callable_method", + "_method_name", + "_rpc_type", + "_arg_serialization", + "_return_serialization", + ] + + def __init__( + self, + method_name: str, + arg_serialization: Tuple[ + Optional[SerializingFunction], Optional[DeserializingFunction] + ], + return_serialization: Tuple[ + Optional[SerializingFunction], Optional[DeserializingFunction] + ], + rpc_type: Union[RpcType, RpcTypes, str] = RpcTypes.UNARY.value, + callable_method: Optional[Callable] = None, + ): + """ + Initialize the method model. + + :param method_name: + The name of the method. + :type method_name: str + + :param arg_serialization: + A tuple containing serialization and deserialization methods for the function's arguments. + :type arg_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]] + + :param return_serialization: + A tuple containing serialization and deserialization methods for the function's return values. + :type return_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]] + + :param rpc_type: + The RPC type. default is RpcTypes.UNARY. + :type rpc_type: RpcType + + :param callable_method: + The main callable method to be executed. + :type callable_method: Optional[Callable] + """ + self._method_name = method_name + self._arg_serialization = arg_serialization + self._return_serialization = return_serialization + self._callable_method = callable_method + + if isinstance(rpc_type, str): + rpc_type = RpcTypes.from_name(rpc_type) + elif isinstance(rpc_type, RpcTypes): + rpc_type = rpc_type.value + elif not isinstance(rpc_type, RpcType): + raise TypeError( + f"rpc_type must be of type RpcType, RpcTypes, or str, not {type(rpc_type)}" + ) + self._rpc_type = rpc_type + + def get_method(self) -> Callable: + """ + Get the callable method. + :return: The callable method. + :rtype: Callable + """ + return self._callable_method + + def get_method_name(self) -> str: + """ + Get the method name. + :return: The method name. + :rtype: str + """ + return self._method_name + + def get_rpc_type(self) -> RpcType: + """ + Get the RPC type. + :return: The RPC type. + :rtype: RpcType + """ + return self._rpc_type + + def get_arg_serializer(self) -> Optional[SerializingFunction]: + """ + Get the argument serializer. + :return: The argument serializer. If not set, return None. + :rtype: Optional[SerializingFunction] + """ + return self._arg_serialization[0] if self._arg_serialization else None + + def get_arg_deserializer(self) -> Optional[DeserializingFunction]: + """ + Get the argument deserializer. + :return: The argument deserializer. If not set, return None. + :rtype: Optional[DeserializingFunction] + """ + return self._arg_serialization[1] if self._arg_serialization else None + + def get_return_serializer(self) -> Optional[SerializingFunction]: + """ + Get the return value serializer. + :return: The return value serializer. If not set, return None. + :rtype: Optional[SerializingFunction] + """ + return self._return_serialization[0] if self._return_serialization else None + + def get_return_deserializer(self) -> Optional[DeserializingFunction]: + """ + Get the return value deserializer. + :return: The return value deserializer. If not set, return None. + :rtype: Optional[DeserializingFunction] + """ + return self._return_serialization[1] if self._return_serialization else None + + +class ReadStream(abc.ABC): + """ + ReadStream is an abstract class for reading streams. + """ + + @abc.abstractmethod + def read(self, *args, **kwargs) -> Any: + """ + Read the stream. + :param args: The arguments to pass to the read method. + :param kwargs: The keyword arguments to pass to the read method. + :return: The read value. + """ + raise NotImplementedError() + + +class WriteStream(abc.ABC): + """ + WriteStream is an abstract class for writing streams. + """ + + @abc.abstractmethod + def can_write_more(self) -> bool: + """ + Check if the stream can write more data. + :return: True if the stream can write more data, False otherwise. + :rtype: bool + """ + raise NotImplementedError() + + @abc.abstractmethod + def write(self, *args, **kwargs) -> None: + """ + Write to the stream. + :param args: The arguments to pass to the write method. + :param kwargs: The keyword arguments to pass to the write method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def done_writing(self, **kwargs) -> None: + """ + Done writing to the stream. + :param kwargs: The keyword arguments to pass to the done + """ + raise NotImplementedError() + + +class ReadWriteStream(ReadStream, WriteStream, abc.ABC): + """ + ReadWriteStream is an abstract class for reading and writing streams. + """ + + pass diff --git a/dubbo/client.py b/dubbo/client.py index 4fa7770..d02e9ea 100644 --- a/dubbo/client.py +++ b/dubbo/client.py @@ -17,27 +17,23 @@ from typing import Optional from dubbo.bootstrap import Dubbo +from dubbo.classes import MethodDescriptor from dubbo.configs import ReferenceConfig from dubbo.constants import common_constants from dubbo.extension import extensionLoader from dubbo.protocol import Invoker, Protocol -from dubbo.proxy import RpcCallable -from dubbo.proxy.callables import MultipleRpcCallable +from dubbo.proxy import RpcCallable, RpcCallableFactory +from dubbo.proxy.callables import DefaultRpcCallableFactory from dubbo.registry.protocol import RegistryProtocol from dubbo.types import ( - BiStreamCallType, - CallType, - ClientStreamCallType, DeserializingFunction, SerializingFunction, - ServerStreamCallType, - UnaryCallType, + RpcTypes, ) +from dubbo.url import URL __all__ = ["Client"] -from dubbo.url import URL - class Client: def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None): @@ -51,6 +47,8 @@ def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None): self._protocol: Optional[Protocol] = None self._invoker: Optional[Invoker] = None + self._callable_factory: RpcCallableFactory = DefaultRpcCallableFactory() + # initialize the invoker self._initialize() @@ -97,10 +95,12 @@ def unary( response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: return self._callable( - UnaryCallType, - method_name, - request_serializer, - response_deserializer, + MethodDescriptor( + method_name=method_name, + arg_serialization=(request_serializer, None), + return_serialization=(None, response_deserializer), + rpc_type=RpcTypes.UNARY.value, + ) ) def client_stream( @@ -110,10 +110,12 @@ def client_stream( response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: return self._callable( - ClientStreamCallType, - method_name, - request_serializer, - response_deserializer, + MethodDescriptor( + method_name=method_name, + arg_serialization=(request_serializer, None), + return_serialization=(None, response_deserializer), + rpc_type=RpcTypes.CLIENT_STREAM.value, + ) ) def server_stream( @@ -123,42 +125,34 @@ def server_stream( response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: return self._callable( - ServerStreamCallType, - method_name, - request_serializer, - response_deserializer, + MethodDescriptor( + method_name=method_name, + arg_serialization=(request_serializer, None), + return_serialization=(None, response_deserializer), + rpc_type=RpcTypes.SERVER_STREAM.value, + ) ) - def bidi_stream( + def bi_stream( self, method_name: str, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: + # create method descriptor return self._callable( - BiStreamCallType, - method_name, - request_serializer, - response_deserializer, + MethodDescriptor( + method_name=method_name, + arg_serialization=(request_serializer, None), + return_serialization=(None, response_deserializer), + rpc_type=RpcTypes.BI_STREAM.value, + ) ) - def _callable( - self, - call_type: CallType, - method_name: str, - request_serializer: Optional[SerializingFunction] = None, - response_deserializer: Optional[DeserializingFunction] = None, - ) -> RpcCallable: + def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable: """ Generate a proxy for the given method - :param call_type: The call type. - :type call_type: str - :param method_name: The method name. - :type method_name: str - :param request_serializer: The request serializer. - :type request_serializer: Optional[SerializingFunction] - :param response_deserializer: The response deserializer. - :type response_deserializer: Optional[DeserializingFunction] + :param method_descriptor: The method descriptor. :return: The proxy. :rtype: RpcCallable """ @@ -167,13 +161,11 @@ def _callable( # clone url url = url.copy() - url.parameters[common_constants.METHOD_KEY] = method_name - # set call type - url.attributes[common_constants.CALL_KEY] = call_type - - # set serializer and deserializer - url.attributes[common_constants.SERIALIZER_KEY] = request_serializer - url.attributes[common_constants.DESERIALIZER_KEY] = response_deserializer + url.parameters[common_constants.METHOD_KEY] = ( + method_descriptor.get_method_name() + ) + # set method descriptor + url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor # create proxy - return MultipleRpcCallable(self._invoker, url) + return self._callable_factory.get_callable(self._invoker, url) diff --git a/dubbo/cluster/monitor/cpu.py b/dubbo/cluster/monitor/cpu.py index 774b224..33f9e5e 100644 --- a/dubbo/cluster/monitor/cpu.py +++ b/dubbo/cluster/monitor/cpu.py @@ -15,14 +15,13 @@ # limitations under the License. import threading from typing import Dict, List + from dubbo.cluster.directories import RegistryDirectory -from dubbo.constants import common_constants from dubbo.loggers import loggerFactory -from dubbo.protocol import Protocol, Invoker +from dubbo.protocol import Invoker, Protocol from dubbo.protocol.invocation import RpcInvocation -from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler +from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler from dubbo.registry import Registry -from dubbo.types import UnaryCallType from dubbo.url import URL from dubbo.utils import CpuUtils @@ -32,9 +31,6 @@ "org.apache.dubbo.MetricsService", "cpu", str(1).encode("utf-8"), - attributes={ - common_constants.CALL_KEY: UnaryCallType, - }, ) @@ -160,7 +156,11 @@ def get_service_handler() -> RpcServiceHandler: """ return RpcServiceHandler( "org.apache.dubbo.MetricsService", - {"cpu": RpcMethodHandler.unary(CpuInnerRpcHandler.get_cpu_usage)}, + [ + RpcMethodHandler.unary( + CpuInnerRpcHandler.get_cpu_usage, method_name="cpu" + ), + ], ) @staticmethod diff --git a/dubbo/configs.py b/dubbo/configs.py index 27899e9..3c081a8 100644 --- a/dubbo/configs.py +++ b/dubbo/configs.py @@ -24,6 +24,10 @@ registry_constants, ) +from dubbo.proxy.handlers import RpcServiceHandler +from dubbo.url import URL, create_url +from dubbo.utils import NetworkUtils + __all__ = [ "ApplicationConfig", "ReferenceConfig", @@ -32,9 +36,6 @@ "LoggerConfig", ] -from dubbo.proxy.handlers import RpcServiceHandler -from dubbo.url import URL, create_url - class AbstractConfig(abc.ABC): """ @@ -379,12 +380,18 @@ class ServiceConfig(AbstractConfig): def __init__( self, service_handler: RpcServiceHandler, + host: Optional[str] = None, port: Optional[int] = None, protocol: Optional[str] = None, ): super().__init__() self._service_handler = service_handler + self._host = ( + host + or NetworkUtils.get_local_address() + or common_constants.LOCAL_HOST_VALUE + ) self._port = port or common_constants.DEFAULT_PORT self._protocol = protocol or common_constants.TRIPLE_SHORT @@ -406,6 +413,24 @@ def service_handler(self, service_handler: RpcServiceHandler) -> None: """ self._service_handler = service_handler + @property + def host(self) -> str: + """ + Get the host of the service. + :return: The host of the service. + :rtype: str + """ + return self._host + + @host.setter + def host(self, host: str) -> None: + """ + Set the host of the service. + :param host: The host of the service. + :type host: str + """ + self._host = host + @property def port(self) -> int: """ @@ -450,7 +475,7 @@ def to_url(self) -> URL: """ return URL( scheme=self.protocol, - host=common_constants.LOCAL_HOST_VALUE, + host=self.host, port=self.port, parameters={ common_constants.SERVICE_KEY: self.service_handler.service_name diff --git a/dubbo/constants/common_constants.py b/dubbo/constants/common_constants.py index b98ed61..c997ac8 100644 --- a/dubbo/constants/common_constants.py +++ b/dubbo/constants/common_constants.py @@ -56,7 +56,9 @@ TRUE_VALUE = "true" FALSE_VALUE = "false" -CALL_KEY = "call" +RPC_TYPE_KEY = "rpc-type" + +METHOD_DESCRIPTOR_KEY = "method-descriptor" LOADBALANCE_KEY = "loadbalance" diff --git a/dubbo/protocol/triple/call/client_call.py b/dubbo/protocol/triple/call/client_call.py index ba1f417..1854efa 100644 --- a/dubbo/protocol/triple/call/client_call.py +++ b/dubbo/protocol/triple/call/client_call.py @@ -21,14 +21,19 @@ from dubbo.protocol.triple.call import ClientCall from dubbo.protocol.triple.constants import GRpcCode from dubbo.protocol.triple.metadata import RequestMetadata -from dubbo.protocol.triple.results import TriResult from dubbo.protocol.triple.status import TriRpcStatus from dubbo.protocol.triple.stream import ClientStream from dubbo.protocol.triple.stream.client_stream import TriClientStream from dubbo.remoting.aio.http2.stream_handler import StreamClientMultiplexHandler from dubbo.serialization import Deserializer, SerializationError, Serializer -__all__ = ["TripleClientCall", "DefaultClientCallListener"] +__all__ = [ + "TripleClientCall", + "FutureToClientCallListenerAdapter", + "ReadStreamToClientCallListenerAdapter", +] + +from dubbo.utils import FunctionHelper _LOGGER = loggerFactory.get_logger() @@ -81,7 +86,11 @@ def send_message(self, message: Any, last: bool) -> None: # send message try: - data = self._serializer.serialize(message) + data = ( + FunctionHelper.call_func(self._serializer.serialize, message) + if message + else b"" + ) compress_flag = ( 0 if self._compressor.get_message_encoding() @@ -160,19 +169,38 @@ def on_cancel_by_remote(self, status: TriRpcStatus) -> None: self.on_complete(status, {}) -class DefaultClientCallListener(ClientCall.Listener): +class FutureToClientCallListenerAdapter(ClientCall.Listener): """ - The default client call listener. + The future call listener. """ - def __init__(self, result: TriResult): - self._result = result + def __init__(self, future): + self._future = future + self._message = None def on_message(self, message: Any) -> None: - self._result.set_value(message) + self._message = message def on_close(self, status: TriRpcStatus, attachments: Dict[str, Any]) -> None: if status.code != GRpcCode.OK: - self._result.set_exception(status.as_exception()) + self._future.set_exception(status.as_exception()) + else: + self._future.set_result(self._message) + + +class ReadStreamToClientCallListenerAdapter(ClientCall.Listener): + """ + Adapter from stream to client call listener. + """ + + def __init__(self, read_stream): + self._read_stream = read_stream + + def on_message(self, message: Any) -> None: + self._read_stream.put(message) + + def on_close(self, status: TriRpcStatus, trailers: Dict[str, Any]) -> None: + if status.code != GRpcCode.OK: + self._read_stream.put_exception(status.as_exception()) else: - self._result.complete_value() + self._read_stream.put_eof() diff --git a/dubbo/protocol/triple/call/server_call.py b/dubbo/protocol/triple/call/server_call.py index 8b788f1..71fb52b 100644 --- a/dubbo/protocol/triple/call/server_call.py +++ b/dubbo/protocol/triple/call/server_call.py @@ -18,11 +18,8 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Dict -from dubbo.deliverers import ( - MessageDeliverer, - MultiMessageDeliverer, - SingleMessageDeliverer, -) +from dubbo.classes import ReadWriteStream +from dubbo.loggers import loggerFactory from dubbo.protocol.triple.call import ServerCall from dubbo.protocol.triple.constants import ( GRpcCode, @@ -31,6 +28,11 @@ ) from dubbo.protocol.triple.status import TriRpcStatus from dubbo.protocol.triple.stream import ServerStream +from dubbo.protocol.triple.streams import ( + TriReadStream, + TriServerWriteStream, + TriReadWriteStream, +) from dubbo.proxy.handlers import RpcMethodHandler from dubbo.remoting.aio.http2.headers import Http2Headers from dubbo.remoting.aio.http2.registries import HttpStatus @@ -40,38 +42,53 @@ DirectDeserializer, DirectSerializer, ) +from dubbo.types import RpcType, RpcTypes +from dubbo.utils import FunctionHelper + + +_LOGGER = loggerFactory.get_logger() class TripleServerCall(ServerCall, ServerStream.Listener): def __init__( self, - stream: ServerStream, + server_stream: ServerStream, method_handler: RpcMethodHandler, executor: ThreadPoolExecutor, ): - self._stream = stream - self._method_runner: MethodRunner = MethodRunnerFactory.create( - method_handler, self - ) - + self._server_stream = server_stream self._executor = executor - # get serializer - serializing_function = method_handler.response_serializer - self._serializer = ( - CustomSerializer(serializing_function) - if serializing_function - else DirectSerializer() + # create read stream + self._read_stream = TriReadStream() + + # create write stream + write_stream = TriServerWriteStream(self) + read_write_stream = TriReadWriteStream(write_stream, self._read_stream) + + self._method_runner: MethodRunner = MethodRunnerFactory.create( + method_handler, read_write_stream ) - # get deserializer - deserializing_function = method_handler.request_deserializer + # get method descriptor + method_descriptor = method_handler.method_descriptor + + # get arguments deserializer + arg_deserializer = method_descriptor.get_arg_deserializer() self._deserializer = ( - CustomDeserializer(deserializing_function) - if deserializing_function + CustomDeserializer(arg_deserializer) + if arg_deserializer else DirectDeserializer() ) + # get return serializer + return_serializer = method_descriptor.get_return_serializer() + self._serializer = ( + CustomSerializer(return_serializer) + if return_serializer + else DirectSerializer() + ) + self._headers_sent = False def send_message(self, message: Any) -> None: @@ -82,34 +99,35 @@ def send_message(self, message: Any) -> None: TripleHeaderName.CONTENT_TYPE.value, TripleHeaderValue.APPLICATION_GRPC_PROTO.value, ) - self._stream.send_headers(headers) + self._server_stream.send_headers(headers) - serialized_data = self._serializer.serialize(message) + serialized_data = FunctionHelper.call_func(self._serializer.serialize, message) # TODO support compression - self._stream.send_message(serialized_data, False) + self._server_stream.send_message(serialized_data, False) def complete(self, status: TriRpcStatus, attachments: Dict[str, Any]) -> None: if not attachments.get(TripleHeaderName.CONTENT_TYPE.value): attachments[TripleHeaderName.CONTENT_TYPE.value] = ( TripleHeaderValue.APPLICATION_GRPC_PROTO.value ) - self._stream.complete(status, attachments) + self._server_stream.complete(status, attachments) def on_headers(self, headers: Dict[str, Any]) -> None: # start a new thread to run the method self._executor.submit(self._method_runner.run) def on_message(self, data: bytes) -> None: + if data == b"": + return deserialized_data = self._deserializer.deserialize(data) - self._method_runner.receive_arg(deserialized_data) + self._read_stream.put(deserialized_data) def on_complete(self) -> None: - self._method_runner.receive_complete() + self._read_stream.put_eof() def on_cancel_by_remote(self, status: TriRpcStatus) -> None: # cancel the method runner. - self._executor.shutdown() - self._executor = None + self._read_stream.put_exception(status.as_exception()) class MethodRunner(abc.ABC): @@ -117,22 +135,6 @@ class MethodRunner(abc.ABC): Interface for method runner. """ - @abc.abstractmethod - def receive_arg(self, arg: Any) -> None: - """ - Receive argument. - :param arg: argument - :type arg: Any - """ - raise NotImplementedError() - - @abc.abstractmethod - def receive_complete(self) -> None: - """ - Receive complete. - """ - raise NotImplementedError() - @abc.abstractmethod def run(self) -> None: """ @@ -167,32 +169,20 @@ class DefaultMethodRunner(MethodRunner): def __init__( self, func: Callable, - server_call: TripleServerCall, - client_stream: bool, - server_stream: bool, + read_write_stream: ReadWriteStream, + rpc_type: RpcType, ): - self._server_call: TripleServerCall = server_call + self._read_write_stream = read_write_stream self._func = func - self._deliverer: MessageDeliverer = ( - MultiMessageDeliverer() if client_stream else SingleMessageDeliverer() - ) - self._server_stream = server_stream - - self._completed = False - - def receive_arg(self, arg: Any) -> None: - self._deliverer.add(arg) - - def receive_complete(self) -> None: - self._deliverer.complete() + self._rpc_type = rpc_type def run(self) -> None: try: - if isinstance(self._deliverer, SingleMessageDeliverer): - result = self._func(self._deliverer.get()) + if self._rpc_type == RpcTypes.UNARY.value: + result = self._func(self._read_write_stream.read()) else: - result = self._func(self._deliverer) + result = self._func(self._read_write_stream) # handle the result self.handle_result(result) except Exception as e: @@ -201,28 +191,31 @@ def run(self) -> None: def handle_result(self, result: Any) -> None: try: - if not self._server_stream: + # check if the stream is completed + if not self._read_write_stream.can_write_more(): + return + + if not self._rpc_type.server_stream: # get single result - self._server_call.send_message(result) + self._read_write_stream.write(result) else: # get multi results for message in result: - self._server_call.send_message(message) + self._read_write_stream.write(message) - self._server_call.complete(TriRpcStatus(GRpcCode.OK), {}) - self._completed = True + self._read_write_stream.done_writing() except Exception as e: self.handle_exception(e) def handle_exception(self, e: Exception) -> None: - if not self._completed: + if self._read_write_stream.can_write_more(): + _LOGGER.exception("Invoke method failed: %s", e) status = TriRpcStatus( GRpcCode.INTERNAL, description=f"Invoke method failed: {str(e)}", cause=e, ) - self._server_call.complete(status, {}) - self._completed = True + self._read_write_stream.done_writing(tri_rpc_status=status) class MethodRunnerFactory: @@ -231,23 +224,24 @@ class MethodRunnerFactory: """ @staticmethod - def create(method_handler: RpcMethodHandler, server_call) -> MethodRunner: + def create( + method_handler: RpcMethodHandler, read_write_stream: ReadWriteStream + ) -> MethodRunner: """ Create a method runner. :param method_handler: method handler :type method_handler: RpcMethodHandler - :param server_call: server call - :type server_call: TripleServerCall + :param read_write_stream: read write stream + :type read_write_stream: ReadWriteStream :return: method runner :rtype: MethodRunner """ - call_type = method_handler.call_type + method_descriptor = method_handler.method_descriptor return DefaultMethodRunner( - method_handler.behavior, - server_call, - call_type.client_stream, - call_type.server_stream, + method_descriptor.get_method(), + read_write_stream, + method_descriptor.get_rpc_type(), ) diff --git a/dubbo/protocol/triple/constants.py b/dubbo/protocol/triple/constants.py index 98d71ad..24db2a0 100644 --- a/dubbo/protocol/triple/constants.py +++ b/dubbo/protocol/triple/constants.py @@ -118,7 +118,7 @@ class TripleHeaderValue(enum.Enum): TRAILERS = "trailers" HTTP = "http" HTTPS = "https" - APPLICATION_GRPC_PROTO = "application/grpc+proto" + APPLICATION_GRPC_PROTO = "application/grpc+data" APPLICATION_GRPC = "application/grpc" TEXT_PLAIN_UTF8 = "text/plain; encoding=utf-8" diff --git a/dubbo/protocol/triple/invoker.py b/dubbo/protocol/triple/invoker.py index de93e94..61b4a64 100644 --- a/dubbo/protocol/triple/invoker.py +++ b/dubbo/protocol/triple/invoker.py @@ -13,7 +13,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import concurrent +from collections.abc import Iterator +from dubbo.classes import MethodDescriptor from dubbo.compression import Compressor, Identity from dubbo.constants import common_constants from dubbo.extension import ExtensionError, extensionLoader @@ -21,10 +24,18 @@ from dubbo.protocol import Invoker, Result from dubbo.protocol.invocation import Invocation, RpcInvocation from dubbo.protocol.triple.call import TripleClientCall -from dubbo.protocol.triple.call.client_call import DefaultClientCallListener +from dubbo.protocol.triple.call.client_call import ( + ReadStreamToClientCallListenerAdapter, + FutureToClientCallListenerAdapter, +) from dubbo.protocol.triple.constants import TripleHeaderName, TripleHeaderValue from dubbo.protocol.triple.metadata import RequestMetadata from dubbo.protocol.triple.results import TriResult +from dubbo.protocol.triple.streams import ( + TriReadWriteStream, + TriClientWriteStream, + TriReadStream, +) from dubbo.remoting import Client from dubbo.remoting.aio.exceptions import RemotingError from dubbo.remoting.aio.http2.stream_handler import StreamClientMultiplexHandler @@ -34,11 +45,13 @@ DirectDeserializer, DirectSerializer, ) -from dubbo.types import CallType from dubbo.url import URL +from dubbo.utils import FunctionHelper + __all__ = ["TripleInvoker"] + _LOGGER = loggerFactory.get_logger() @@ -57,36 +70,65 @@ def __init__( self._stream_multiplexer = stream_multiplexer def invoke(self, invocation: RpcInvocation) -> Result: - call_type: CallType = invocation.get_attribute(common_constants.CALL_KEY) - result = TriResult(call_type) - + """ + Invoke the invocation. + :param invocation: The invocation to invoke. + :type invocation: RpcInvocation + :return: The result of the invocation. + :rtype: Result + """ + future = concurrent.futures.Future() + result = TriResult(future) if not self._client.is_connected(): result.set_exception( RemotingError("The client is not connected to the server.") ) return result - # get serializer - serializer = DirectSerializer() - serializing_function = invocation.get_attribute(common_constants.SERIALIZER_KEY) - if serializing_function: - serializer = CustomSerializer(serializing_function) + # get method descriptor + method_descriptor: MethodDescriptor = invocation.get_attribute( + common_constants.METHOD_DESCRIPTOR_KEY + ) + + # get arg_serializer + arg_serializing_function = method_descriptor.get_arg_serializer() + arg_serializer = ( + CustomSerializer(arg_serializing_function) + if arg_serializing_function + else DirectSerializer() + ) - # get deserializer - deserializer = DirectDeserializer() - deserializing_function = invocation.get_attribute( - common_constants.DESERIALIZER_KEY + # get return_deserializer + return_deserializing_function = method_descriptor.get_return_deserializer() + return_deserializer = ( + CustomDeserializer(return_deserializing_function) + if return_deserializing_function + else DirectDeserializer() ) - if deserializing_function: - deserializer = CustomDeserializer(deserializing_function) + + write_stream = TriClientWriteStream() + read_stream = TriReadStream() + + # create listener + rpc_type = method_descriptor.get_rpc_type() + is_unary = not rpc_type.client_stream and not rpc_type.server_stream + if is_unary: + listener = FutureToClientCallListenerAdapter(future) + else: + read_stream = TriReadStream() + listener = ReadStreamToClientCallListenerAdapter(read_stream) # Create a new TriClientCall tri_client_call = TripleClientCall( self._stream_multiplexer, - DefaultClientCallListener(result), - serializer, - deserializer, + listener, + arg_serializer, + return_deserializer, ) + write_stream.set_call(tri_client_call) + + if not is_unary: + write_stream = TriReadWriteStream(write_stream, read_stream) # start the call try: @@ -96,60 +138,31 @@ def invoke(self, invocation: RpcInvocation) -> Result: result.set_exception(e) return result - # invoke - if not call_type.client_stream: - self._invoke_unary(tri_client_call, invocation) + # write the message + if not rpc_type.client_stream: + # if the client call is not a stream, we send the message directly + FunctionHelper.call_func(write_stream.write, invocation.get_argument()) + write_stream.done_writing() else: - self._invoke_stream(tri_client_call, invocation) + # try to get first argument and check if it is an iterable + args, _ = invocation.get_argument() + if args and isinstance(args[0], Iterator): + # if the argument is an iterator, we need to write the stream + for arg in args[0]: + write_stream.write(arg) + write_stream.done_writing() + + # If the call is not unary, we need to return the stream + # server_stream -> return read_stream + # client_stream or bidirectional_stream -> return write_read_stream + if not is_unary: + if rpc_type.server_stream and not rpc_type.client_stream: + result.set_value(read_stream) + else: + result.set_value(write_stream) return result - def _invoke_unary(self, call: TripleClientCall, invocation: Invocation) -> None: - """ - Invoke a unary call. - :param call: The call to invoke. - :type call: TripleClientCall - :param invocation: The invocation to invoke. - :type invocation: Invocation - """ - try: - argument = invocation.get_argument() - if callable(argument): - argument = argument() - except Exception as e: - _LOGGER.exception(f"Invoke failed: {str(e)}", e) - call.cancel_by_local(e) - return - - # send the message - call.send_message(argument, last=True) - - def _invoke_stream(self, call: TripleClientCall, invocation: Invocation) -> None: - """ - Invoke a stream call. - :param call: The call to invoke. - :type call: TripleClientCall - :param invocation: The invocation to invoke. - :type invocation: Invocation - """ - try: - # get the argument - argument = invocation.get_argument() - iterator = argument() if callable(argument) else argument - - # send the messages - BEGIN_SIGNAL = object() - next_message = BEGIN_SIGNAL - for message in iterator: - if next_message is not BEGIN_SIGNAL: - call.send_message(next_message, last=False) - next_message = message - next_message = next_message if next_message is not BEGIN_SIGNAL else None - call.send_message(next_message, last=True) - except Exception as e: - _LOGGER.exception(f"Invoke failed: {str(e)}", e) - call.cancel_by_local(e) - def _create_metadata(self, invocation: Invocation) -> RequestMetadata: """ Create the metadata. diff --git a/dubbo/protocol/triple/results.py b/dubbo/protocol/triple/results.py index b9c9e00..a8ced5f 100644 --- a/dubbo/protocol/triple/results.py +++ b/dubbo/protocol/triple/results.py @@ -16,9 +16,7 @@ from typing import Any -from dubbo.deliverers import MultiMessageDeliverer, SingleMessageDeliverer from dubbo.protocol import Result -from dubbo.types import CallType class TriResult(Result): @@ -26,49 +24,31 @@ class TriResult(Result): The triple result. """ - __slots__ = ["_call_type", "_deliverer", "_exception"] + __slots__ = ["_future"] - def __init__(self, call_type: CallType): - self._call_type = call_type - - self._deliverer = ( - MultiMessageDeliverer() - if self._call_type.server_stream - else SingleMessageDeliverer() - ) - - self._exception = None + def __init__(self, future): + self._future = future def set_value(self, value: Any) -> None: """ Set the value. """ - self._deliverer.add(value) - - def complete_value(self) -> None: - """ - Complete the value. - """ - self._deliverer.complete() + self._future.set_result(value) def value(self) -> Any: """ Get the value. """ - if self._call_type.server_stream: - return self._deliverer - else: - return self._deliverer.get() + return self._future.result() def set_exception(self, exception: Exception) -> None: """ Set the exception. """ - self._exception = exception - self._deliverer.cancel(exception) + self._future.set_exception(exception) def exception(self) -> Exception: """ Get the exception. """ - return self._exception + return self._future.exception() diff --git a/dubbo/protocol/triple/streams.py b/dubbo/protocol/triple/streams.py new file mode 100644 index 0000000..3ef0b2c --- /dev/null +++ b/dubbo/protocol/triple/streams.py @@ -0,0 +1,286 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import queue +import threading +from typing import Any, Optional, Union + +from dubbo.classes import ReadStream, EOF, WriteStream, ReadWriteStream +from dubbo.protocol.triple.call import ClientCall, ServerCall +from dubbo.protocol.triple.constants import GRpcCode +from dubbo.protocol.triple.exceptions import RpcError +from dubbo.protocol.triple.status import TriRpcStatus + + +class TriReadStream(ReadStream): + """ + Triple read stream. Support reading data from the stream. + """ + + __slots__ = ["_storage", "_lock", "_read_done"] + + def __init__(self): + self._read_done = False + self._storage = queue.Queue(maxsize=1000) + self._lock = threading.RLock() + + def put(self, data: Any) -> None: + """ + Put data into the stream. It is private and should not be called by the user. + :param data: The data to put into the stream. + :type data: Any + """ + if self._read_done: + return + self._storage.put_nowait(data) + + def put_eof(self) -> None: + """ + Put EOF into the stream. It is private and should not be called by the user. + """ + if self._read_done: + return + self._read_done = True + self._storage.put_nowait(EOF) + + def put_exception(self, e: Exception) -> None: + """ + Set an exception to the stream. It is private and should not be called by the user. + :param e: The exception to set. + :type e: Exception + """ + # Stop the read stream + self.put_eof() + # Raise the exception + raise e + + def read(self, timeout: Optional[int] = None) -> Any: + """ + Read the stream. + :param timeout: + The timeout in seconds. If None, it will block until the data is available. + :type timeout: Optional[int] + :return: + The data read from the stream. + If no more data, return EOF. + If no data available within the timeout, return None. + :rtype: Any + """ + # If you can't read more data, return EOF + if self._read_done and self._storage.empty(): + return EOF + + try: + data = self._storage.get( + timeout=max(0, timeout) if timeout is not None else None + ) + return data + except queue.Empty: + return None + + def __iter__(self): + return self + + def __next__(self): + data = self.read() + if data is EOF: + raise StopIteration + return data + + +class TriClientWriteStream(WriteStream): + """ + Triple client write stream. Support writing data to the stream. + """ + + __slots__ = ["_call", "_write_done"] + + def __init__(self, call: Optional[ClientCall] = None): + self._call: Optional[ClientCall] = call + self._write_done = False + + def set_call(self, call: ClientCall): + self._call = call + + def can_write_more(self) -> bool: + """ + Check if the stream can write more data. + :return: True if the stream can write more data, False otherwise. + :rtype: bool + """ + return not self._write_done + + def write(self, *args, **kwargs) -> None: + """ + Write data to the stream. + :param args: The arguments to pass to the write method. + :param kwargs: The keyword arguments to pass to the write method. + :raises RpcError: If write after done writing. + """ + if self._write_done: + raise RpcError("Write after done writing") + self._call.send_message((args, kwargs), False) + + def done_writing(self, **kwargs) -> None: + """ + Done writing to the stream. + :raises RpcError: If done writing multiple times. + """ + if self._write_done: + raise RpcError("Done writing multiple times") + + self._call.send_message(None, True) + self._write_done = True + + +class TriServerWriteStream(WriteStream): + """ + Triple server write stream. Support writing data to the stream. + """ + + __slots__ = ["_call", "_write_done"] + + def __init__(self, call: ServerCall): + self._call = call + self._write_done = False + + def can_write_more(self) -> bool: + """ + Check if the stream can write more data. + :return: True if the stream can write more data, False otherwise. + :rtype: bool + """ + return not self._write_done + + def write(self, *args, **kwargs) -> None: + """ + Write data to the stream. + :param args: The arguments to pass to the write method. + :param kwargs: The keyword arguments to pass to the write method. + :raises RpcError: If write after done writing. + """ + if self._write_done: + raise RpcError("Write after done writing") + self._call.send_message((args, kwargs)) + + def done_writing(self, **kwargs) -> None: + """ + Done writing to the stream. + :param kwargs: The keyword arguments to pass to the done + :raises RpcError: If done writing multiple times. + """ + if self._write_done: + raise RpcError("Done writing multiple times") + + # try to get TriRpcStatus from kwargs + status = kwargs.get("tri_rpc_status") + if status is None: + status = TriRpcStatus(GRpcCode.OK) + elif not isinstance(status, TriRpcStatus): + raise RpcError("Invalid status type") + + # remove the status from kwargs + kwargs.pop("tri-rpc-status", None) + + self._call.complete(status, kwargs) + self._write_done = True + + +class TriReadWriteStream(ReadWriteStream): + """ + Triple client read write stream. Support reading and writing data from the stream. + """ + + __slots__ = ["_read_stream", "_write_stream"] + + def __init__( + self, + write_stream: Union[TriClientWriteStream, TriServerWriteStream], + read_stream: TriReadStream, + ): + """ + Initialize the read write stream. + :param write_stream: The write stream. + :type write_stream: TriClientWriteStream + :param read_stream: The read stream. + :type read_stream: TriReadStream + """ + self._read_stream = read_stream + self._write_stream = write_stream + + def can_write_more(self) -> bool: + """ + Check if the stream can write more data. + :return: True if the stream can write more data, False otherwise. + :rtype: bool + """ + if self._write_stream is None: + raise RpcError("Write stream is not set") + return self._write_stream.can_write_more() + + def can_read_more(self) -> bool: + """ + Check if there is more data to read. + :return: True if there is more data to read, False otherwise. + :rtype: bool + """ + if self._read_stream is None: + raise RpcError("Read stream is not set") + return self._read_stream.can_read_more() + + def read(self, timeout: Optional[int] = None) -> Any: + """ + Read the stream. + :param timeout: + The timeout in seconds. If None, it will block until the data is available. + :type timeout: Optional[int] + :return: + The data read from the stream. + If no more data, return EOF. + If no data available within the timeout, return None. + :rtype: Any + """ + if self._read_stream is None: + raise RpcError("Read stream is not set") + return self._read_stream.read(timeout) + + def write(self, *args, **kwargs) -> None: + """ + Write data to the stream. + :param args: The arguments to pass to the write method. + :param kwargs: The keyword arguments to pass to the write method. + :raises RpcError: If write after done writing. + """ + if self._write_stream is None: + raise RpcError("Write stream is not set") + self._write_stream.write(*args, **kwargs) + + def done_writing(self, **kwargs) -> None: + """ + Done writing to the stream. + :raises RpcError: If done writing multiple times. + """ + if self._write_stream is None: + raise RpcError("Write stream is not set") + self._write_stream.done_writing(**kwargs) + + def __iter__(self): + return self + + def __next__(self): + data = self.read() + if data is EOF: + raise StopIteration + return data diff --git a/dubbo/proxy/callables.py b/dubbo/proxy/callables.py index a079d1a..3d6fdb0 100644 --- a/dubbo/proxy/callables.py +++ b/dubbo/proxy/callables.py @@ -16,13 +16,14 @@ from typing import Any +from dubbo.classes import MethodDescriptor from dubbo.constants import common_constants from dubbo.protocol import Invoker from dubbo.protocol.invocation import RpcInvocation from dubbo.proxy import RpcCallable, RpcCallableFactory from dubbo.url import URL -__all__ = ["MultipleRpcCallable"] +__all__ = ["MultipleRpcCallable", "DefaultRpcCallableFactory"] from dubbo.proxy.handlers import RpcServiceHandler @@ -35,28 +36,25 @@ class MultipleRpcCallable(RpcCallable): def __init__(self, invoker: Invoker, url: URL): self._invoker = invoker self._url = url - self._service_name = self._url.path - self._method_name = self._url.parameters[common_constants.METHOD_KEY] - self._call_type = self._url.attributes[common_constants.CALL_KEY] - self._serializer = self._url.attributes[common_constants.SERIALIZER_KEY] - self._deserializer = self._url.attributes[common_constants.DESERIALIZER_KEY] + self._method_model: MethodDescriptor = url.attributes[ + common_constants.METHOD_DESCRIPTOR_KEY + ] + + self._service_name = url.path + self._method_name = self._method_model.get_method_name() def _create_invocation(self, argument: Any) -> RpcInvocation: return RpcInvocation( self._service_name, self._method_name, argument, - attributes={ - common_constants.CALL_KEY: self._call_type, - common_constants.SERIALIZER_KEY: self._serializer, - common_constants.DESERIALIZER_KEY: self._deserializer, - }, + attributes={common_constants.METHOD_DESCRIPTOR_KEY: self._method_model}, ) - def __call__(self, argument: Any) -> Any: + def __call__(self, *args, **kwargs) -> Any: # Create a new RpcInvocation - invocation = self._create_invocation(argument) + invocation = self._create_invocation((args, kwargs)) # Do invoke. result = self._invoker.invoke(invocation) return result.value() diff --git a/dubbo/proxy/handlers.py b/dubbo/proxy/handlers.py index 3190afc..78aa39b 100644 --- a/dubbo/proxy/handlers.py +++ b/dubbo/proxy/handlers.py @@ -14,16 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Dict, Optional +from typing import Callable, Optional, List, Dict +from dubbo.classes import MethodDescriptor from dubbo.types import ( - BiStreamCallType, - CallType, - ClientStreamCallType, DeserializingFunction, SerializingFunction, - ServerStreamCallType, - UnaryCallType, + RpcTypes, ) __all__ = ["RpcMethodHandler", "RpcServiceHandler"] @@ -34,95 +31,147 @@ class RpcMethodHandler: Rpc method handler """ - def __init__( - self, - call_type: CallType, - behavior: Callable, - request_deserializer: Optional[DeserializingFunction] = None, - response_serializer: Optional[SerializingFunction] = None, - ): + __slots__ = ["_method_descriptor"] + + def __init__(self, method_descriptor: MethodDescriptor): """ Initialize the RpcMethodHandler - :param call_type: the call type. - :type call_type: CallType - :param behavior: the behavior of the method. - :type behavior: Callable - :param request_deserializer: the request deserializer. - :type request_deserializer: Optional[DeserializingFunction] - :param response_serializer: the response serializer. - :type response_serializer: Optional[SerializingFunction] + :param method_descriptor: the method descriptor. + :type method_descriptor: MethodDescriptor + """ + self._method_descriptor = method_descriptor + + @property + def method_descriptor(self) -> MethodDescriptor: + """ + Get the method descriptor + :return: the method descriptor + :rtype: MethodDescriptor """ - self.call_type = call_type - self.behavior = behavior - self.request_deserializer = request_deserializer - self.response_serializer = response_serializer + return self._method_descriptor @classmethod def unary( cls, - behavior: Callable, + method: Callable, + method_name: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, - ): + ) -> "RpcMethodHandler": """ Create a unary method handler + :param method: the method. + :type method: Callable + :param method_name: the method name. If not provided, the method name will be used. + :type method_name: Optional[str] + :param request_deserializer: the request deserializer. + :type request_deserializer: Optional[DeserializingFunction] + :param response_serializer: the response serializer. + :type response_serializer: Optional[SerializingFunction] + :return: the unary method handler. + :rtype: RpcMethodHandler """ return cls( - UnaryCallType, - behavior, - request_deserializer, - response_serializer, + MethodDescriptor( + callable_method=method, + method_name=method_name or method.__name__, + arg_serialization=(None, request_deserializer), + return_serialization=(response_serializer, None), + rpc_type=RpcTypes.UNARY.value, + ) ) @classmethod def client_stream( cls, - behavior: Callable, + method: Callable, + method_name: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, ): """ Create a client stream method handler + :param method: the method. + :type method: Callable + :param method_name: the method name. If not provided, the method name will be used. + :type method_name: Optional[str] + :param request_deserializer: the request deserializer. + :type request_deserializer: Optional[DeserializingFunction] + :param response_serializer: the response serializer. + :type response_serializer: Optional[SerializingFunction] + :return: the client stream method handler. + :rtype: RpcMethodHandler """ return cls( - ClientStreamCallType, - behavior, - request_deserializer, - response_serializer, + MethodDescriptor( + callable_method=method, + method_name=method_name or method.__name__, + arg_serialization=(None, request_deserializer), + return_serialization=(response_serializer, None), + rpc_type=RpcTypes.CLIENT_STREAM.value, + ) ) @classmethod def server_stream( cls, - behavior: Callable, + method: Callable, + method_name: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, ): """ Create a server stream method handler + :param method: the method. + :type method: Callable + :param method_name: the method name. If not provided, the method name will be used. + :type method_name: Optional[str] + :param request_deserializer: the request deserializer. + :type request_deserializer: Optional[DeserializingFunction] + :param response_serializer: the response serializer. + :type response_serializer: Optional[SerializingFunction] + :return: the server stream method handler. + :rtype: RpcMethodHandler """ return cls( - ServerStreamCallType, - behavior, - request_deserializer, - response_serializer, + MethodDescriptor( + callable_method=method, + method_name=method_name or method.__name__, + arg_serialization=(None, request_deserializer), + return_serialization=(response_serializer, None), + rpc_type=RpcTypes.SERVER_STREAM.value, + ) ) @classmethod def bi_stream( cls, - behavior: Callable, + method: Callable, + method_name: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, ): """ Create a bidi stream method handler + :param method: the method. + :type method: Callable + :param method_name: the method name. If not provided, the method name will be used. + :type method_name: Optional[str] + :param request_deserializer: the request deserializer. + :type request_deserializer: Optional[DeserializingFunction] + :param response_serializer: the response serializer. + :type response_serializer: Optional[SerializingFunction] + :return: the bidi stream method handler. + :rtype: RpcMethodHandler """ return cls( - BiStreamCallType, - behavior, - request_deserializer, - response_serializer, + MethodDescriptor( + callable_method=method, + method_name=method_name or method.__name__, + arg_serialization=(None, request_deserializer), + return_serialization=(response_serializer, None), + rpc_type=RpcTypes.BI_STREAM.value, + ) ) @@ -131,13 +180,37 @@ class RpcServiceHandler: Rpc service handler """ - def __init__(self, service_name: str, method_handlers: Dict[str, RpcMethodHandler]): + __slots__ = ["_service_name", "_method_handlers"] + + def __init__(self, service_name: str, method_handlers: List[RpcMethodHandler]): """ Initialize the RpcServiceHandler :param service_name: the name of the service. :type service_name: str :param method_handlers: the method handlers. - :type method_handlers: Dict[str, RpcMethodHandler] + :type method_handlers: List[RpcMethodHandler] + """ + self._service_name = service_name + self._method_handlers: Dict[str, RpcMethodHandler] = {} + + for method_handler in method_handlers: + method_name = method_handler.method_descriptor.get_method_name() + self._method_handlers[method_name] = method_handler + + @property + def service_name(self) -> str: + """ + Get the service name + :return: the service name + :rtype: str + """ + return self._service_name + + @property + def method_handlers(self) -> Dict[str, RpcMethodHandler]: + """ + Get the method handlers + :return: the method handlers + :rtype: Dict[str, RpcMethodHandler] """ - self.service_name = service_name - self.method_handlers = method_handlers + return self._method_handlers diff --git a/dubbo/registry/protocol.py b/dubbo/registry/protocol.py index ebee40f..52afad5 100644 --- a/dubbo/registry/protocol.py +++ b/dubbo/registry/protocol.py @@ -16,7 +16,7 @@ from dubbo.cluster.directories import RegistryDirectory from dubbo.cluster.failfast_cluster import FailfastCluster -from dubbo.cluster.monitor.cpu import CpuMonitor, CpuInnerRpcHandler +from dubbo.cluster.monitor.cpu import CpuInnerRpcHandler, CpuMonitor from dubbo.configs import RegistryConfig from dubbo.constants import common_constants from dubbo.extension import extensionLoader diff --git a/dubbo/registry/zookeeper/__init__.py b/dubbo/registry/zookeeper/__init__.py index 88487e2..8a9af4c 100644 --- a/dubbo/registry/zookeeper/__init__.py +++ b/dubbo/registry/zookeeper/__init__.py @@ -22,7 +22,6 @@ ZookeeperTransport, ) - __all__ = [ "ChildrenListener", "DataListener", diff --git a/dubbo/remoting/aio/aio_transporter.py b/dubbo/remoting/aio/aio_transporter.py index f0dd4eb..d650f6c 100644 --- a/dubbo/remoting/aio/aio_transporter.py +++ b/dubbo/remoting/aio/aio_transporter.py @@ -258,7 +258,11 @@ async def _inner_operation(_future: concurrent.futures.Future): raise RemotingError("Failed to export the server") from exc else: self._exported = True - _LOGGER.info("Exported the server. port: %s", self._url.port) + _LOGGER.info( + "Exported the server. host: %s, port: %s", + self._url.host, + self._url.port, + ) finally: self._exporting = False diff --git a/dubbo/serialization/_interfaces.py b/dubbo/serialization/_interfaces.py index 65e808d..5793dc1 100644 --- a/dubbo/serialization/_interfaces.py +++ b/dubbo/serialization/_interfaces.py @@ -61,11 +61,11 @@ class Serializer(abc.ABC): """ @abc.abstractmethod - def serialize(self, obj: Any) -> bytes: + def serialize(self, *args, **kwargs) -> bytes: """ Serialize an object to bytes. - :param obj: The object to serialize. - :type obj: Any + :param args: The arguments to serialize. + :param kwargs: The keyword arguments to serialize. :return: The serialized bytes. :rtype: bytes :raises SerializationError: If serialization fails. diff --git a/dubbo/serialization/custom_serializers.py b/dubbo/serialization/custom_serializers.py index 2b22b6a..83ee494 100644 --- a/dubbo/serialization/custom_serializers.py +++ b/dubbo/serialization/custom_serializers.py @@ -26,6 +26,8 @@ __all__ = ["CustomSerializer", "CustomDeserializer"] +from dubbo.utils import FunctionHelper + class CustomSerializer(Serializer): """ @@ -37,17 +39,17 @@ class CustomSerializer(Serializer): def __init__(self, serializer: SerializingFunction): self.serializer = serializer - def serialize(self, obj: Any) -> bytes: + def serialize(self, *args, **kwargs) -> bytes: """ Serialize an object to bytes. - :param obj: The object to serialize. - :type obj: Any + :param args: The arguments to serialize. + :param kwargs: The keyword arguments to serialize. :return: The serialized bytes. :rtype: bytes :raises SerializationError: If the object is not of type bytes, bytearray, or memoryview. """ try: - serialized_obj = self.serializer(obj) + serialized_obj = FunctionHelper.call_func(self.serializer, (args, kwargs)) except Exception as e: raise SerializationError( f"SerializationError: Failed to serialize object. Please check the serializer. \nDetails: {str(e)}", diff --git a/dubbo/serialization/direct_serializers.py b/dubbo/serialization/direct_serializers.py index 82585a8..0b41557 100644 --- a/dubbo/serialization/direct_serializers.py +++ b/dubbo/serialization/direct_serializers.py @@ -29,16 +29,16 @@ class DirectSerializer(Serializer, SingletonBase): expected types, a SerializationError is raised. This serializer is a singleton. """ - def serialize(self, obj: Any) -> bytes: + def serialize(self, *args, **kwargs) -> bytes: """ - Serialize an object to bytes. - :param obj: The object to serialize. - :type obj: Any + Serialize an object to bytes. we only return the first argument as bytes. + :param args: The arguments to serialize. + :param kwargs: The keyword arguments to serialize. :return: The serialized bytes. :rtype: bytes :raises SerializationError: If the object is not of type bytes, bytearray, or memoryview. """ - return ensure_bytes(obj) if obj is not None else b"" + return ensure_bytes(args[0]) if args else b"" class DirectDeserializer(Deserializer): diff --git a/dubbo/types.py b/dubbo/types.py index e1b3dad..6f0fd12 100644 --- a/dubbo/types.py +++ b/dubbo/types.py @@ -13,26 +13,50 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple +import enum +from dataclasses import dataclass from typing import Any, Callable __all__ = [ "SerializingFunction", "DeserializingFunction", - "CallType", - "UnaryCallType", - "ClientStreamCallType", - "ServerStreamCallType", - "BiStreamCallType", + "RpcType", + "RpcTypes", ] -SerializingFunction = Callable[[Any], bytes] +SerializingFunction = Callable[..., bytes] DeserializingFunction = Callable[[bytes], Any] -# CallType -CallType = namedtuple("CallType", ["name", "client_stream", "server_stream"]) -UnaryCallType = CallType("UnaryCall", False, False) -ClientStreamCallType = CallType("ClientStreamCall", True, False) -ServerStreamCallType = CallType("ServerStream", False, True) -BiStreamCallType = CallType("BiStreamCall", True, True) +@dataclass +class RpcType: + """ + RpcType + """ + + name: str + client_stream: bool + server_stream: bool + + +@enum.unique +class RpcTypes(enum.Enum): + UNARY = RpcType("Unary", False, False) + CLIENT_STREAM = RpcType("ClientStream", True, False) + SERVER_STREAM = RpcType("ServerStream", False, True) + BI_STREAM = RpcType("BiStream", True, True) + + @classmethod + def from_name(cls, name: str) -> RpcType: + """ + Get RpcType by name. Case-insensitive. + :param name: RpcType name + :return: RpcType + """ + for item in cls: + # ignore case + if item.value.name.lower() == name.lower(): + return item.value + if item.value.name == name: + return item.value + raise ValueError(f"Unknown RpcType name: {name}") diff --git a/dubbo/utils.py b/dubbo/utils.py index 6900237..a0a40d8 100644 --- a/dubbo/utils.py +++ b/dubbo/utils.py @@ -13,14 +13,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os import socket - -__all__ = ["EventHelper", "FutureHelper", "NetworkUtils", "CpuUtils"] - -from typing import List, Tuple +from collections.abc import Callable +from typing import Any, List, Tuple, Optional import psutil +import inspect + +__all__ = ["EventHelper", "FutureHelper", "NetworkUtils", "CpuUtils", "FunctionHelper"] class EventHelper: @@ -141,24 +142,66 @@ class NetworkUtils: """ @staticmethod - def get_host_name(): + def is_address_reachable(ip: str, timeout: int = 0) -> bool: """ - Get the host name of the host machine. + Use ping to check if the IP address is reachable. - :return: The host name of the host machine. - :rtype: str + :param ip: The IP address. + :type ip: str + :param timeout: The timeout in seconds. + :type timeout: int + :return: True if the IP address is reachable, False otherwise. + :rtype: bool + """ + try: + # Use the ping command to check if the IP address is reachable + result = os.system(f"ping -c 1 -W {timeout} {ip} > /dev/null 2>&1") + return result == 0 + except Exception: + return False + + @staticmethod + def is_loopback_address(ip: str) -> bool: """ - return socket.gethostname() + Check if the IP address is a loopback address. + + :param ip: The IP address. + :type ip: str + :return: True if the IP address is a loopback address, False otherwise. + :rtype: bool + """ + return ip.startswith("127.") or ip == "localhost" @staticmethod - def get_host_ip(): + def get_local_address() -> Optional[str]: """ - Get the IP address of the host machine. + Find first valid IP from local network card. - :return: The IP address of the host machine. + :return: The local IP address. If not found, return None. :rtype: str """ - return socket.gethostbyname(NetworkUtils.get_host_name()) + try: + # use psutil to get the local IP address + for iface_name, iface_addrs in psutil.net_if_addrs().items(): + for addr in iface_addrs: + # only consider IPv4 address + if addr.family == socket.AF_INET: + # ignore the loopback address and check if the IP address is reachable + if not NetworkUtils.is_loopback_address( + addr.address + ) and NetworkUtils.is_address_reachable(addr.address): + return addr.address + except Exception: + pass + + # if the local IP address is not found, try to get the IP address using the socket + try: + local_host_ip = socket.gethostbyname(socket.gethostname()) + return local_host_ip + except Exception: + pass + + return None class CpuUtils: @@ -227,3 +270,122 @@ def get_cpu_freq(): :return: The current CPU frequency. """ return psutil.cpu_freq() + + +class FunctionHelper: + """ + Helper class for function operations. + """ + + @staticmethod + def is_callable(callable_func: Callable) -> bool: + """ + Check if the function is callable. + + :param callable_func: The callable function. + :type callable_func: Callable + :return: True if the function is callable, False otherwise. + :rtype: bool + """ + return inspect.isfunction(callable_func) or inspect.ismethod(callable_func) + + @staticmethod + def has_args(func: Callable) -> bool: + """ + Check if the function has arguments. + + :param func: The callable function. + :type func: Callable + :return: True if the function has arguments, False otherwise. + :rtype: bool + """ + return inspect.Parameter.VAR_POSITIONAL in [ + p.kind for p in inspect.signature(func).parameters.values() + ] + + @staticmethod + def has_kwargs(func: Callable) -> bool: + """ + Check if the function has keyword arguments. + + :param func: The callable function. + :type func: Callable + :return: True if the function has keyword arguments, False otherwise. + :rtype: bool + """ + return inspect.Parameter.VAR_KEYWORD in [ + p.kind for p in inspect.signature(func).parameters.values() + ] + + @staticmethod + def call_func(func: Callable, args_and_kwargs: Any = None) -> Any: + """ + Call the function with the given arguments and keyword arguments. + + :param func: + The callable function. + :type func: Callable + :param args_and_kwargs: + The arguments and keyword arguments. + + the provided values must follow these forms: + - No arguments required, pass -> None + - Multiple positional arguments -> Tuple (e.g., ((1, 2),{})) + - Multiple keyword arguments -> Dict (e.g., ((),{"a": 1, "b": 2})) + - Both positional and keyword arguments -> Tuple of length 2 + (e.g., ((1, 2), {"a": 1, "b": 2})) + + :type args_and_kwargs: Tuple + :return: The result of the function. + :rtype: Any + """ + + # split the arguments and keyword arguments + if isinstance(args_and_kwargs, tuple) and len(args_and_kwargs) == 2: + args, kwargs = args_and_kwargs + else: + raise ValueError( + "Invalid function arguments, the provided values must follow these forms:" + "1.No arguments required, pass -> None" + "2.Multiple positional arguments -> Tuple (e.g., ((1, 2),{}))" + "3.Multiple keyword arguments -> Dict (e.g., ((),{'a': 1, 'b': 2}))" + "4.Both positional and keyword arguments -> Tuple of length 2" + " (e.g., ((1, 2), {'a': 1, 'b': 2}))" + ) + + # If the function is not callable, try to call the function directly + try: + if not FunctionHelper.is_callable(func): + return func(*args, **kwargs) + except Exception as e: + raise e + + # Get the function signature + sig = inspect.signature(func) + + # Get the function parameters and check if the function supports *args and **kwargs + params = sig.parameters + param_kinds = [p.kind for p in params.values()] + has_var_positional = inspect.Parameter.VAR_POSITIONAL in param_kinds + has_var_keyword = inspect.Parameter.VAR_KEYWORD in param_kinds + + # If the function has no arguments or only one argument, call the function directly + if len(params) == 0 or args_and_kwargs is None: + return func() + + # If the function accepts both *args and **kwargs + if has_var_positional and has_var_keyword: + return func(*args, **kwargs) + + # If the function supports *args but not **kwargs + if has_var_positional: + return func(*args) + + # If the function supports **kwargs but not *args + if has_var_keyword: + return func(**kwargs) + + # common case + bound_args = sig.bind(*args, **kwargs) + bound_args.apply_defaults() + return func(*bound_args.args, **bound_args.kwargs) diff --git a/samples/README.md b/samples/README.md index 55500b7..efa5432 100644 --- a/samples/README.md +++ b/samples/README.md @@ -1,12 +1,5 @@ # Dubbo-python Samples -Before you begin, ensure that you have **`Python 3.11+`**. Then, install Dubbo-Python in your project using the following steps: - -```shell -git clone https://github.com/apache/dubbo-python.git -cd dubbo-python && pip install . -``` - ## What It Contains 1. [**helloworld**](./helloworld): The simplest usage example for quick start. diff --git a/samples/helloworld/client.py b/samples/helloworld/client.py index 074aa28..be0a5f7 100644 --- a/samples/helloworld/client.py +++ b/samples/helloworld/client.py @@ -21,17 +21,18 @@ class UnaryServiceStub: def __init__(self, client: dubbo.Client): self.unary = client.unary(method_name="unary") - def unary(self, request): - return self.unary(request) + def say_hello(self, message: bytes) -> bytes: + return self.unary(message) if __name__ == "__main__": + # Create a client reference_config = ReferenceConfig.from_url( "tri://127.0.0.1:50051/org.apache.dubbo.samples.HelloWorld" ) dubbo_client = dubbo.Client(reference_config) - unary_service_stub = UnaryServiceStub(dubbo_client) - result = unary_service_stub.unary("hello".encode("utf-8")) - print(result.decode("utf-8")) + # Call the remote method + result = unary_service_stub.say_hello(b"Hello from client") + print(result) diff --git a/samples/helloworld/server.py b/samples/helloworld/server.py index 4828080..3d54eeb 100644 --- a/samples/helloworld/server.py +++ b/samples/helloworld/server.py @@ -18,23 +18,31 @@ from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler -def handle_unary(request): - s = request.decode("utf-8") - print(f"Received request: {s}") - return (s + " world").encode("utf-8") +class UnaryServiceServicer: + def say_hello(self, message: bytes) -> bytes: + print(f"Received message from client: {message}") + return b"Hello from server" -if __name__ == "__main__": +def build_service_handler(): # build a method handler - method_handler = RpcMethodHandler.unary(handle_unary) + method_handler = RpcMethodHandler.unary( + method=UnaryServiceServicer().say_hello, method_name="unary" + ) # build a service handler service_handler = RpcServiceHandler( service_name="org.apache.dubbo.samples.HelloWorld", - method_handlers={"unary": method_handler}, + method_handlers=[method_handler], ) + return service_handler - service_config = ServiceConfig(service_handler) +if __name__ == "__main__": + # build service config + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/registry/zookeeper/client.py b/samples/registry/zookeeper/client.py index 417551d..220f8e5 100644 --- a/samples/registry/zookeeper/client.py +++ b/samples/registry/zookeeper/client.py @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ReferenceConfig, RegistryConfig +from samples.proto import greeter_pb2 class GreeterServiceStub: @@ -37,12 +36,12 @@ def say_hello(self, request): bootstrap = dubbo.Dubbo(registry_config=registry_config) reference_config = ReferenceConfig( - protocol="tri", service="org.apache.dubbo.samples.proto.Greeter" + protocol="tri", service="org.apache.dubbo.samples.data.Greeter" ) dubbo_client = bootstrap.create_client(reference_config) stub = GreeterServiceStub(dubbo_client) - result = stub.say_hello(greeter_pb2.GreeterRequest(name="hello")) + result = stub.say_hello(greeter_pb2.GreeterRequest(name="dubbo-python")) print(result.message) diff --git a/samples/registry/zookeeper/server.py b/samples/registry/zookeeper/server.py index e7db4b7..8e9b463 100644 --- a/samples/registry/zookeeper/server.py +++ b/samples/registry/zookeeper/server.py @@ -13,34 +13,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo -from dubbo.configs import ServiceConfig, RegistryConfig +from dubbo.configs import RegistryConfig, ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler +from samples.proto import greeter_pb2 -def say_hello(request): - print(f"Received request: {request}") - return greeter_pb2.GreeterReply(message=f"{request.name} Dubbo!") +class GreeterServiceServicer: + def say_hello(self, request): + print(f"Received request: {request.name}") + return greeter_pb2.GreeterReply(message=f"Hello, {request.name}!") -if __name__ == "__main__": +def build_server_handler(): # build a method handler method_handler = RpcMethodHandler.unary( - say_hello, + GreeterServiceServicer().say_hello, + method_name="sayHello", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"sayHello": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler + +if __name__ == "__main__": registry_config = RegistryConfig.from_url("zookeeper://127.0.0.1:2181") bootstrap = dubbo.Dubbo(registry_config=registry_config) + # build a service config + service_handler = build_server_handler() service_config = ServiceConfig(service_handler) # start the server diff --git a/samples/serialization/README.md b/samples/serialization/README.md index d78e8a5..d711d4f 100644 --- a/samples/serialization/README.md +++ b/samples/serialization/README.md @@ -1,28 +1,24 @@ -## Defining and Using Serialization Functions +## Custom Serialization -Python is a dynamic language, and its flexibility makes it challenging to design a universal serialization layer as seen in other languages. Therefore, we have removed the "serialization layer" and left it to the users to implement (since users know the formats of the data they will pass). +Python is a dynamic language, and its flexibility makes it challenging to design a universal serialization layer for other languages. Therefore, we removed the framework-level serialization layer and instead exposed interfaces, allowing users to implement their own (as they know best the data format they will be passing). -Serialization typically consists of two parts: serialization and deserialization. We have defined the types for these functions, and custom serialization/deserialization functions must adhere to these "formats." +Serialization typically involves two parts: serialization and deserialization. We have defined the types for these functions, and custom serialization/deserialization functions must follow these "formats." - - -First, for serialization functions, we specify: +For serialization functions, we specify: ```python -# A function that takes an argument of any type and returns data of type bytes -SerializingFunction = Callable[[Any], bytes] +# A function that takes any number of arguments and returns data of type bytes +SerializingFunction = Callable[..., bytes] ``` -Next, for deserialization functions, we specify: +For deserialization functions, we specify: ```python # A function that takes an argument of type bytes and returns data of any type DeserializingFunction = Callable[[bytes], Any] ``` -Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. - - +Below, I'll demonstrate how to use `protobuf` and `json` for serialization. ### [protobuf](./protobuf) @@ -47,37 +43,46 @@ Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. if __name__ == "__main__": reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" + "tri://127.0.0.1:50051/org.apache.dubbo.samples.data.Greeter" ) dubbo_client = dubbo.Client(reference_config) stub = GreeterServiceStub(dubbo_client) - result = stub.say_hello(greeter_pb2.GreeterRequest(name="hello")) - print(result.message) + result = stub.say_hello(greeter_pb2.GreeterRequest(name="Dubbo-python")) + print(f"Received reply: {result.message}") ``` server ```python - def say_hello(request): - print(f"Received request: {request}") - return greeter_pb2.GreeterReply(message=f"{request.name} Dubbo!") + class GreeterServiceServicer: + def say_hello(self, request): + print(f"Received request: {request}") + return greeter_pb2.GreeterReply(message=f"Hello, {request.name}") - if __name__ == "__main__": + def build_service_handler(): # build a method handler method_handler = RpcMethodHandler.unary( - say_hello, + GreeterServiceServicer().say_hello, + method_name="sayHello", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"sayHello": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler - service_config = ServiceConfig(service_handler) + + if __name__ == "__main__": + # build a service handler + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() @@ -89,7 +94,7 @@ Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. ### [Json](./json) -`protobuf` does not fully illustrate how to implement custom serialization and deserialization because its built-in functions perfectly meet the requirements. Instead, I'll demonstrate how to create custom serialization and deserialization functions using `orjson`: +We have already implemented single-parameter serialization and deserialization using `protobuf`. Now, I will demonstrate how to write a multi-parameter `Json` serialization and deserialization function to enable remote calls for methods with multiple parameters. 1. Install `orjson`: @@ -102,12 +107,13 @@ Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. client ```python - def request_serializer(data: Dict) -> bytes: - return orjson.dumps(data) + def request_serializer(name: str, age: int) -> bytes: + return orjson.dumps({"name": name, "age": age}) - def response_deserializer(data: bytes) -> Dict: - return orjson.loads(data) + def response_deserializer(data: bytes) -> str: + json_dict = orjson.loads(data) + return json_dict["message"] class GreeterServiceStub: @@ -118,8 +124,8 @@ Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. response_deserializer=response_deserializer, ) - def say_hello(self, request): - return self.unary(request) + def say_hello(self, name: str, age: int): + return self.unary(name, age) if __name__ == "__main__": @@ -129,40 +135,51 @@ Below, I'll demonstrate how to use custom functions with `protobuf` and `json`. dubbo_client = dubbo.Client(reference_config) stub = GreeterServiceStub(dubbo_client) - result = stub.say_hello({"name": "world"}) + result = stub.say_hello("dubbo-python", 18) print(result) ``` server ```python - def request_deserializer(data: bytes) -> Dict: - return orjson.loads(data) + def request_deserializer(data: bytes) -> Tuple[str, int]: + json_dict = orjson.loads(data) + return json_dict["name"], json_dict["age"] - def response_serializer(data: Dict) -> bytes: - return orjson.dumps(data) + def response_serializer(message: str) -> bytes: + return orjson.dumps({"message": message}) - def handle_unary(request): - print(f"Received request: {request}") - return {"message": f"Hello, {request['name']}"} + class GreeterServiceServicer: + def say_hello(self, request): + name, age = request + print(f"Received request: {name}, {age}") + return f"Hello, {name}, you are {age} years old." - if __name__ == "__main__": + def build_service_handler(): # build a method handler method_handler = RpcMethodHandler.unary( - handle_unary, + GreeterServiceServicer().say_hello, + method_name="unary", request_deserializer=request_deserializer, response_serializer=response_serializer, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.HelloWorld", - method_handlers={"unary": method_handler}, + service_name="org.apache.dubbo.samples.serialization.json", + method_handlers=[method_handler], ) + return service_handler + - service_config = ServiceConfig(service_handler) + if __name__ == "__main__": + # build server config + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/serialization/json/client.py b/samples/serialization/json/client.py index 84cc5c2..c7760b1 100644 --- a/samples/serialization/json/client.py +++ b/samples/serialization/json/client.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict import orjson @@ -21,12 +20,13 @@ from dubbo.configs import ReferenceConfig -def request_serializer(data: Dict) -> bytes: - return orjson.dumps(data) +def request_serializer(name: str, age: int) -> bytes: + return orjson.dumps({"name": name, "age": age}) -def response_deserializer(data: bytes) -> Dict: - return orjson.loads(data) +def response_deserializer(data: bytes) -> str: + json_dict = orjson.loads(data) + return json_dict["message"] class GreeterServiceStub: @@ -37,8 +37,8 @@ def __init__(self, client: dubbo.Client): response_deserializer=response_deserializer, ) - def say_hello(self, request): - return self.unary(request) + def say_hello(self, name: str, age: int): + return self.unary(name, age) if __name__ == "__main__": @@ -48,5 +48,5 @@ def say_hello(self, request): dubbo_client = dubbo.Client(reference_config) stub = GreeterServiceStub(dubbo_client) - result = stub.say_hello({"name": "world"}) + result = stub.say_hello("dubbo-python", 18) print(result) diff --git a/samples/serialization/json/server.py b/samples/serialization/json/server.py index 7701fca..eb1ba22 100644 --- a/samples/serialization/json/server.py +++ b/samples/serialization/json/server.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict +from typing import Tuple import orjson @@ -22,33 +22,44 @@ from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler -def request_deserializer(data: bytes) -> Dict: - return orjson.loads(data) +def request_deserializer(data: bytes) -> Tuple[str, int]: + json_dict = orjson.loads(data) + return json_dict["name"], json_dict["age"] -def response_serializer(data: Dict) -> bytes: - return orjson.dumps(data) +def response_serializer(message: str) -> bytes: + return orjson.dumps({"message": message}) -def handle_unary(request): - print(f"Received request: {request}") - return {"message": f"Hello, {request['name']}"} +class GreeterServiceServicer: + def say_hello(self, request): + name, age = request + print(f"Received request: {name}, {age}") + return f"Hello, {name}, you are {age} years old." -if __name__ == "__main__": +def build_service_handler(): # build a method handler method_handler = RpcMethodHandler.unary( - handle_unary, + GreeterServiceServicer().say_hello, + method_name="unary", request_deserializer=request_deserializer, response_serializer=response_serializer, ) # build a service handler service_handler = RpcServiceHandler( service_name="org.apache.dubbo.samples.serialization.json", - method_handlers={"unary": method_handler}, + method_handlers=[method_handler], ) + return service_handler + - service_config = ServiceConfig(service_handler) +if __name__ == "__main__": + # build server config + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/serialization/protobuf/client.py b/samples/serialization/protobuf/client.py index 829ad26..a936928 100644 --- a/samples/serialization/protobuf/client.py +++ b/samples/serialization/protobuf/client.py @@ -13,9 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 import dubbo from dubbo.configs import ReferenceConfig +from samples.proto import greeter_pb2 class GreeterServiceStub: @@ -32,10 +32,10 @@ def say_hello(self, request): if __name__ == "__main__": reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" + "tri://127.0.0.1:50051/org.apache.dubbo.samples.data.Greeter" ) dubbo_client = dubbo.Client(reference_config) stub = GreeterServiceStub(dubbo_client) - result = stub.say_hello(greeter_pb2.GreeterRequest(name="hello")) - print(result.message) + result = stub.say_hello(greeter_pb2.GreeterRequest(name="Dubbo-python")) + print(f"Received reply: {result.message}") diff --git a/samples/serialization/protobuf/server.py b/samples/serialization/protobuf/server.py index 935a011..fa39a6e 100644 --- a/samples/serialization/protobuf/server.py +++ b/samples/serialization/protobuf/server.py @@ -13,31 +13,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 import dubbo from dubbo.configs import ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler +from samples.proto import greeter_pb2 -def say_hello(request): - print(f"Received request: {request}") - return greeter_pb2.GreeterReply(message=f"{request.name} Dubbo!") +class GreeterServiceServicer: + def say_hello(self, request): + print(f"Received request: {request}") + return greeter_pb2.GreeterReply(message=f"Hello, {request.name}") -if __name__ == "__main__": +def build_service_handler(): # build a method handler method_handler = RpcMethodHandler.unary( - say_hello, + GreeterServiceServicer().say_hello, + method_name="sayHello", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"sayHello": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler + - service_config = ServiceConfig(service_handler) +if __name__ == "__main__": + # build a service handler + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/stream/README.md b/samples/stream/README.md index f17aab7..51100e0 100644 --- a/samples/stream/README.md +++ b/samples/stream/README.md @@ -1,70 +1,94 @@ ## Streaming Calls -Dubbo-python supports streaming calls, including `ClientStream`, `ServerStream`, and `BidirectionalStream`. The key difference in these calls is the use of iterators: passing an iterator as a parameter for `ClientStream`, receiving an iterator for `ServerStream`, or both passing and receiving iterators for `BidirectionalStream`. +Dubbo-Python supports streaming calls, including `ClientStream`, `ServerStream`, and `BidirectionalStream` modes. -When using `BidirectionalStream`, the client needs to pass an iterator as a parameter to send multiple data points, while also receiving an iterator to handle multiple responses from the server. +Streaming calls can be divided into write-streams and read-streams. For `ClientStream`, it’s multiple writes with a single read; for `ServerStream`, a single write with multiple reads; and `BidirectionalStream` allows multiple writes and reads. -Here’s an example of the client-side code: +### Write-Stream + +Write operations in streaming calls can be divided into single write (`ServerStream`) and multiple writes (`ClientStream` and `BidirectionalStream`). + +#### Single Write + +Single write calls are similar to unary mode. For example: ```python -class GreeterServiceStub: - def __init__(self, client: dubbo.Client): - self.bidi_stream = client.bidi_stream( - method_name="biStream", - request_serializer=greeter_pb2.GreeterRequest.SerializeToString, - response_deserializer=greeter_pb2.GreeterReply.FromString, - ) +stub.server_stream(greeter_pb2.GreeterRequest(name="hello world from dubbo-python")) +``` - def bi_stream(self, values): - return self.bidi_stream(values) +#### Multiple Writes +For multiple writes, users can write data using either an iterator or `writeStream` (only one of these options should be used). -if __name__ == "__main__": - reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" - ) - dubbo_client = dubbo.Client(reference_config) +1. **Iterator-based Write**: Writing via iterator is similar to unary mode, with the main difference being the use of an iterator for multiple writes. For example: - stub = GreeterServiceStub(dubbo_client) + ```python + # Use an iterator to send multiple requests + def request_generator(): + for i in ["hello", "world", "from", "dubbo-python"]: + yield greeter_pb2.GreeterRequest(name=str(i)) - # Iterator of request - def request_generator(): - for item in ["hello", "world", "from", "dubbo-python"]: - yield greeter_pb2.GreeterRequest(name=str(item)) + # Call the remote method and return a read_stream + stream = stub.client_stream(request_generator()) + ``` - result = stub.bi_stream(request_generator()) +2. **Using `writeStream`**: This method requires an empty argument, after which data is written incrementally using `write`, and `done_writing` is called to end the write-stream. For example: - for i in result: - print(f"Received response: {i.message}") -``` + ```python + stream = stub.bi_stream() + # Use the write method to send messages + stream.write(greeter_pb2.GreeterRequest(name="jock")) + stream.write(greeter_pb2.GreeterRequest(name="jane")) + stream.write(greeter_pb2.GreeterRequest(name="alice")) + stream.write(greeter_pb2.GreeterRequest(name="dave")) + # Call done_writing to notify the server that the client has finished writing + stream.done_writing() + ``` -And here’s the server-side code: +### Read-Stream + +Read operations for streaming calls can be single read (`ClientStream`) or multiple reads (`ServerStream` and `BidirectionalStream`). A `ReadStream` is returned in all cases, and data can be read using the `read` method or an iterator. When using `read`, please note: + +1. The `read` method supports a `timeout` parameter (in seconds). +2. The `read` method can return one of three values: the expected data, `None` (timeout exceeded), or `EOF` (end of the read-stream). + +#### Single Read + +A single call to the `read` method will retrieve the data, for example: ```python -def bi_stream(request_stream): - for request in request_stream: - print(f"Received message from {request.name}") - yield greeter_pb2.GreeterReply(message=request.name) - - -if __name__ == "__main__": - # build a method handler - method_handler = RpcMethodHandler.bi_stream( - bi_stream, - request_deserializer=greeter_pb2.GreeterRequest.FromString, - response_serializer=greeter_pb2.GreeterReply.SerializeToString, - ) - # build a service handler - service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"biStream": method_handler}, - ) - - service_config = ServiceConfig(service_handler) - - # start the server - server = dubbo.Server(service_config).start() - - input("Press Enter to stop the server...\n") +result = stream.read() +print(f"Received response: {result.message}") ``` +#### Multiple Reads + +Multiple reads can be done by repeatedly calling `read`, with handling for `None` and `EOF` values. Since `ReadStream` implements `__iter__` and `__next__`, an iterator-based approach can also be used, which automatically handles these values but doesn’t support a timeout. + +1. **Using Iterator (Recommended)**: + + ```python + def client_stream(self, request_iterator): + response = "" + for request in request_iterator: + print(f"Received request: {request.name}") + response += f"{request.name} " + return greeter_pb2.GreeterReply(message=response) + ``` + +2. **Multiple Calls to `read` Method**: + + ```python + # Use read method to receive messages + # If no message arrives within the specified time, returns None + # If the server has finished sending messages, returns EOF + while True: + i = stream.read(timeout=0.5) + if i is dubbo.classes.EOF: + break + elif i is None: + print("No message received") + continue + print(f"Received response: {i.message}") + ``` + diff --git a/samples/stream/bidi_stream/client.py b/samples/stream/bidi_stream/client.py index c069485..1d4f566 100644 --- a/samples/stream/bidi_stream/client.py +++ b/samples/stream/bidi_stream/client.py @@ -14,38 +14,54 @@ # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo +from dubbo.classes import EOF from dubbo.configs import ReferenceConfig +from samples.proto import greeter_pb2 class GreeterServiceStub: def __init__(self, client: dubbo.Client): - self.bidi_stream = client.bidi_stream( + self.bidi_stream = client.bi_stream( method_name="biStream", request_serializer=greeter_pb2.GreeterRequest.SerializeToString, response_deserializer=greeter_pb2.GreeterReply.FromString, ) - def bi_stream(self, values): - return self.bidi_stream(values) + def bi_stream(self, *args): + return self.bidi_stream(args) if __name__ == "__main__": + # Create a reference config reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" + "tri://127.0.0.1:50051/org.apache.dubbo.samples.data.Greeter" ) dubbo_client = dubbo.Client(reference_config) - stub = GreeterServiceStub(dubbo_client) - # Iterator of request - def request_generator(): - for item in ["hello", "world", "from", "dubbo-python"]: - yield greeter_pb2.GreeterRequest(name=str(item)) + stream = stub.bi_stream() + # use write method to send message + stream.write(greeter_pb2.GreeterRequest(name="jock")) + + # use read method to receive message + print(f"Received response: {stream.read().message}") - result = stub.bi_stream(request_generator()) + # continue to send message + stream.write(greeter_pb2.GreeterRequest(name="jane")) + stream.write(greeter_pb2.GreeterRequest(name="alice")) + stream.write(greeter_pb2.GreeterRequest(name="dave")) + # done_writing method must be called to notify the server that the client has finished writing + stream.done_writing() - for i in result: + # use read method to receive message + # If no message arrives within the specified time, returns None + # If the server has finished sending messages and the client has received all messages, returns EOF + while True: + i = stream.read(timeout=0.5) + if i is EOF: + break + elif i is None: + print("No message received") + continue print(f"Received response: {i.message}") diff --git a/samples/stream/bidi_stream/server.py b/samples/stream/bidi_stream/server.py index 8c295b9..b65c1c0 100644 --- a/samples/stream/bidi_stream/server.py +++ b/samples/stream/bidi_stream/server.py @@ -13,33 +13,47 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler +from samples.proto import greeter_pb2 +import time -def bi_stream(request_stream): - for request in request_stream: - print(f"Received message from {request.name}") - yield greeter_pb2.GreeterReply(message=request.name) +class GreeterServiceServicer: + def bi_stream(self, stream): + counter = 0 + for request in stream: + print(f"Received request: {request.name}") + # simulate a long time to process + if counter == 1: + time.sleep(1) + counter += 1 -if __name__ == "__main__": - # build a method handler + stream.write(greeter_pb2.GreeterReply(message=f"Hello, {request.name}!")) + + stream.done_writing() + + +def build_server_handler(): method_handler = RpcMethodHandler.bi_stream( - bi_stream, + GreeterServiceServicer().bi_stream, + method_name="biStream", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) - # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"biStream": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler - service_config = ServiceConfig(service_handler) + +if __name__ == "__main__": + # build a service config + service_handler = build_server_handler() + service_config = ServiceConfig(service_handler, host="127.0.0.1", port=50051) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/stream/client_stream/client.py b/samples/stream/client_stream/client.py index 43af989..3206d76 100644 --- a/samples/stream/client_stream/client.py +++ b/samples/stream/client_stream/client.py @@ -13,10 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ReferenceConfig +from samples.proto import greeter_pb2 class GreeterServiceStub: @@ -27,23 +26,23 @@ def __init__(self, client: dubbo.Client): response_deserializer=greeter_pb2.GreeterReply.FromString, ) - def client_stream(self, values): - return self.unary_stream(values) + def client_stream(self, request_iterator): + return self.unary_stream(request_iterator) if __name__ == "__main__": reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" + "tri://127.0.0.1:50051/org.apache.dubbo.samples.data.Greeter" ) dubbo_client = dubbo.Client(reference_config) - stub = GreeterServiceStub(dubbo_client) - # Iterator of request + # use iterator to send multiple requests def request_generator(): for i in ["hello", "world", "from", "dubbo-python"]: yield greeter_pb2.GreeterRequest(name=str(i)) - result = stub.client_stream(request_generator()) - - print(result.message) + # call the remote method and return a read_stream + stream = stub.client_stream(request_generator()) + # use read method to get the response + print(stream.read()) diff --git a/samples/stream/client_stream/server.py b/samples/stream/client_stream/server.py index 1cc6ba3..c473b2c 100644 --- a/samples/stream/client_stream/server.py +++ b/samples/stream/client_stream/server.py @@ -13,37 +13,43 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler +from samples.proto import greeter_pb2 -def client_stream(request_stream): - response = "" - for request in request_stream: - print(f"Received request: {request.name}") - response += f"{request.name} " - - return greeter_pb2.GreeterReply(message=response) +class GreeterServiceServicer: + def client_stream(self, request_iterator): + response = "" + for request in request_iterator: + print(f"Received request: {request.name}") + response += f"{request.name} " + return greeter_pb2.GreeterReply(message=response) -if __name__ == "__main__": +def build_service_handler(): # build a method handler method_handler = RpcMethodHandler.client_stream( - client_stream, + GreeterServiceServicer().client_stream, + method_name="clientStream", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"clientStream": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler - service_config = ServiceConfig(service_handler) +if __name__ == "__main__": + # build server config + service_handler = build_service_handler() + service_config = ServiceConfig( + service_handler=service_handler, host="127.0.0.1", port=50051 + ) # start the server server = dubbo.Server(service_config).start() diff --git a/samples/stream/server_stream/client.py b/samples/stream/server_stream/client.py index 27780a7..03fed16 100644 --- a/samples/stream/server_stream/client.py +++ b/samples/stream/server_stream/client.py @@ -13,10 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ReferenceConfig +from samples.proto import greeter_pb2 class GreeterServiceStub: @@ -33,15 +32,14 @@ def server_stream(self, values): if __name__ == "__main__": reference_config = ReferenceConfig.from_url( - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" + "tri://127.0.0.1:50051/org.apache.dubbo.samples.data.Greeter" ) dubbo_client = dubbo.Client(reference_config) - stub = GreeterServiceStub(dubbo_client) - request = greeter_pb2.GreeterRequest(name="hello world from dubbo-python") - - result = stub.server_stream(request) + stream = stub.server_stream( + greeter_pb2.GreeterRequest(name="hello world from dubbo-python") + ) - for i in result: + for i in stream: print(f"Received response: {i.message}") diff --git a/samples/stream/server_stream/server.py b/samples/stream/server_stream/server.py index 2ef5445..bce6aca 100644 --- a/samples/stream/server_stream/server.py +++ b/samples/stream/server_stream/server.py @@ -13,34 +13,41 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from samples.proto import greeter_pb2 - import dubbo from dubbo.configs import ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler +from samples.proto import greeter_pb2 -def server_stream(request): - print(f"Received request: {request.name}") - response = request.name.split(" ") - for i in response: - yield greeter_pb2.GreeterReply(message=i) +class GreeterServiceServicer: + def server_stream(self, stream): + request = stream.read() + print(f"Received request: {request.name}") + response = request.name.split(" ") + for i in response: + yield greeter_pb2.GreeterReply(message=i) -if __name__ == "__main__": +def build_server_handler(): # build a method handler method_handler = RpcMethodHandler.server_stream( - server_stream, + GreeterServiceServicer().server_stream, + method_name="serverStream", request_deserializer=greeter_pb2.GreeterRequest.FromString, response_serializer=greeter_pb2.GreeterReply.SerializeToString, ) # build a service handler service_handler = RpcServiceHandler( - service_name="org.apache.dubbo.samples.proto.Greeter", - method_handlers={"serverStream": method_handler}, + service_name="org.apache.dubbo.samples.data.Greeter", + method_handlers=[method_handler], ) + return service_handler + - service_config = ServiceConfig(service_handler) +if __name__ == "__main__": + # build a service config + service_handler = build_server_handler() + service_config = ServiceConfig(service_handler, host="127.0.0.1", port=50051) # start the server server = dubbo.Server(service_config).start() diff --git a/setup.py b/setup.py index 4b91932..7db984d 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,6 @@ # limitations under the License. from setuptools import find_packages, setup - # Read version from dubbo/__version__.py with open("dubbo/__version__.py", "r", encoding="utf-8") as f: global_vars = {} diff --git a/tests/common/__init__.py b/tests/common/__init__.py deleted file mode 100644 index bcba37a..0000000 --- a/tests/common/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/common/test_url.py b/tests/test_url.py similarity index 100% rename from tests/common/test_url.py rename to tests/test_url.py diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..67a10c6 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Tuple, List, Dict, Any + + +def test_func_helper(): + """ + Test the function helper. + """ + from dubbo.utils import FunctionHelper + + # zero arguments + def func_0(): + return 0 + + # one argument + def func_1(a): + return a + 1 + + # two arguments + def func_2(a, b): + return a + b + + # two arguments, one default + def func_3(a, b=2): + return a + b + + # two arguments, positional only + def func_4(a, b, /): + return a + b + + # two arguments, keyword only + def func_5(a, b, *, c): + return a + b + c + + # variable arguments + def func_6(*args): + return sum(args) + + # variable keyword arguments + def func_7(**kwargs): + return kwargs + + # variable arguments and keyword arguments + def func_8(*args, **kwargs): + return sum(args), kwargs + + # mixed arguments - 1 + def func_9(a: Tuple[int, int], b: List[int], c: Dict[str, Any]): + return f"a={a}, b={b}, c={c}" + + # class + class User: + def __init__(self, name, age): + self.name = name + self.age = age + + def func_10(user: User, name, age): + return user.name, user.age, name, age + + # test function helper + assert FunctionHelper.call_func(func_0) == 0 + assert FunctionHelper.call_func(func_1, ((1,), {})) == 2 + assert FunctionHelper.call_func(func_2, ((1, 2), {})) == 3 + assert FunctionHelper.call_func(func_3, ((1,), {})) == 3 + assert FunctionHelper.call_func(func_4, ((1, 2), {})) == 3 + assert FunctionHelper.call_func(func_5, ((1, 2), {"c": 3})) == 6 + assert FunctionHelper.call_func(func_6, ((1, 2, 3), {})) == 6 + assert FunctionHelper.call_func(func_7, ((), {"a": 1, "b": 2})) == {"a": 1, "b": 2} + assert FunctionHelper.call_func(func_8, ((1, 2, 3), {"a": 1, "b": 2})) == ( + 6, + {"a": 1, "b": 2}, + ) + assert ( + FunctionHelper.call_func( + func_9, + (((1, 2), [1, 2], {"a": 1, "b": 2}), {}), + ) + == "a=(1, 2), b=[1, 2], c={'a': 1, 'b': 2}" + ) + assert FunctionHelper.call_func( + func_10, ((User("Alice", 20),), {"name": "Bob", "age": 30}) + ) == ("Alice", 20, "Bob", 30)