Skip to content

Commit

Permalink
[Feature] Dubbo-Python Optimization (#40)
Browse files Browse the repository at this point in the history
* feat: New stream-related features

* docs: update some samples

* docs: update some samples

* docs: update some samples
  • Loading branch information
cnzakii authored Oct 29, 2024
1 parent e5e9f7b commit aff0858
Show file tree
Hide file tree
Showing 43 changed files with 1,625 additions and 629 deletions.
65 changes: 40 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,76 +29,91 @@ Visit [the official website](https://dubbo.apache.org/) for more information.
- **Serialization**: Customizable(protobuf, json...)


## Getting started

Before you begin, ensure that you have **`python 3.11+`**. Then, install Dubbo-Python in your project using the following steps:
## Installation

Before you start, make sure you have **`python 3.11+`** installed.

```shell
git clone https://github.com/apache/dubbo-python.git
cd dubbo-python && pip install .
```
1. Install from source

```sh
git clone https://github.com/apache/dubbo-python.git
cd dubbo-python && pip install .
```


## Getting started

Get started with Dubbo-Python in just 5 minutes by following our [Quick Start Guide](https://github.com/apache/dubbo-python/tree/main/samples).
Get up and running with Dubbo-Python in just 5 minutes by following our [Quick Start Guide](https://github.com/apache/dubbo-python/tree/main/samples).

It's as simple as the following code snippet. With just a few lines of code, you can launch a fully functional point-to-point RPC service :
It's as simple as the code snippet below. With just a few lines of code, you can launch a fully functional point-to-point RPC service:

1. Build and start the Server
1. Build and start the server

```python
import dubbo
from dubbo.configs import ServiceConfig
from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler
from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler


def handle_unary(request):
s = request.decode("utf-8")
print(f"Received request: {s}")
return (s + " world").encode("utf-8")
class UnaryServiceServicer:
def say_hello(self, message: bytes) -> bytes:
print(f"Received message from client: {message}")
return b"Hello from server"


if __name__ == "__main__":
def build_service_handler():
# build a method handler
method_handler = RpcMethodHandler.unary(handle_unary)
method_handler = RpcMethodHandler.unary(
method=UnaryServiceServicer().say_hello, method_name="unary"
)
# build a service handler
service_handler = RpcServiceHandler(
service_name="org.apache.dubbo.samples.HelloWorld",
method_handlers={"unary": method_handler},
method_handlers=[method_handler],
)
return service_handler

service_config = ServiceConfig(service_handler)

if __name__ == "__main__":
# build service config
service_handler = build_service_handler()
service_config = ServiceConfig(
service_handler=service_handler, host="127.0.0.1", port=50051
)
# start the server
server = dubbo.Server(service_config).start()

input("Press Enter to stop the server...\n")
```

2. Build and start the Client
1. Build and start the Client

```python
import dubbo
from dubbo.configs import ReferenceConfig


class UnaryServiceStub:

def __init__(self, client: dubbo.Client):
self.unary = client.unary(method_name="unary")

def unary(self, request):
return self.unary(request)
def say_hello(self, message: bytes) -> bytes:
return self.unary(message)


if __name__ == "__main__":
# Create a client
reference_config = ReferenceConfig.from_url(
"tri://127.0.0.1:50051/org.apache.dubbo.samples.HelloWorld"
)
dubbo_client = dubbo.Client(reference_config)

unary_service_stub = UnaryServiceStub(dubbo_client)

result = unary_service_stub.unary("hello".encode("utf-8"))
print(result.decode("utf-8"))
# Call the remote method
result = unary_service_stub.say_hello(b"Hello from client")
print(result)

```


Expand Down
215 changes: 213 additions & 2 deletions dubbo/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,44 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import threading
from typing import Callable, Optional, Tuple, Any, Union

from dubbo.types import DeserializingFunction, RpcType, RpcTypes, SerializingFunction

__all__ = [
"EOF",
"SingletonBase",
"MethodDescriptor",
"ReadStream",
"WriteStream",
"ReadWriteStream",
]


class _EOF:
"""
EOF is a class representing the end flag.
"""

_repr_str = "<dubbo.classes.EOF>"

def __bool__(self):
return False

def __len__(self):
return 0

__all__ = ["SingletonBase"]
def __repr__(self) -> str:
return self._repr_str

def __str__(self) -> str:
return self._repr_str


# The EOF object -> global constant
EOF = _EOF()


class SingletonBase:
Expand All @@ -39,3 +73,180 @@ def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(SingletonBase, cls).__new__(cls)
return cls._instance


class MethodDescriptor:
"""
MethodDescriptor is a descriptor for a method.
It contains the method name, the method, and the method's serialization and deserialization methods.
"""

__slots__ = [
"_callable_method",
"_method_name",
"_rpc_type",
"_arg_serialization",
"_return_serialization",
]

def __init__(
self,
method_name: str,
arg_serialization: Tuple[
Optional[SerializingFunction], Optional[DeserializingFunction]
],
return_serialization: Tuple[
Optional[SerializingFunction], Optional[DeserializingFunction]
],
rpc_type: Union[RpcType, RpcTypes, str] = RpcTypes.UNARY.value,
callable_method: Optional[Callable] = None,
):
"""
Initialize the method model.
:param method_name:
The name of the method.
:type method_name: str
:param arg_serialization:
A tuple containing serialization and deserialization methods for the function's arguments.
:type arg_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]]
:param return_serialization:
A tuple containing serialization and deserialization methods for the function's return values.
:type return_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]]
:param rpc_type:
The RPC type. default is RpcTypes.UNARY.
:type rpc_type: RpcType
:param callable_method:
The main callable method to be executed.
:type callable_method: Optional[Callable]
"""
self._method_name = method_name
self._arg_serialization = arg_serialization
self._return_serialization = return_serialization
self._callable_method = callable_method

if isinstance(rpc_type, str):
rpc_type = RpcTypes.from_name(rpc_type)
elif isinstance(rpc_type, RpcTypes):
rpc_type = rpc_type.value
elif not isinstance(rpc_type, RpcType):
raise TypeError(
f"rpc_type must be of type RpcType, RpcTypes, or str, not {type(rpc_type)}"
)
self._rpc_type = rpc_type

def get_method(self) -> Callable:
"""
Get the callable method.
:return: The callable method.
:rtype: Callable
"""
return self._callable_method

def get_method_name(self) -> str:
"""
Get the method name.
:return: The method name.
:rtype: str
"""
return self._method_name

def get_rpc_type(self) -> RpcType:
"""
Get the RPC type.
:return: The RPC type.
:rtype: RpcType
"""
return self._rpc_type

def get_arg_serializer(self) -> Optional[SerializingFunction]:
"""
Get the argument serializer.
:return: The argument serializer. If not set, return None.
:rtype: Optional[SerializingFunction]
"""
return self._arg_serialization[0] if self._arg_serialization else None

def get_arg_deserializer(self) -> Optional[DeserializingFunction]:
"""
Get the argument deserializer.
:return: The argument deserializer. If not set, return None.
:rtype: Optional[DeserializingFunction]
"""
return self._arg_serialization[1] if self._arg_serialization else None

def get_return_serializer(self) -> Optional[SerializingFunction]:
"""
Get the return value serializer.
:return: The return value serializer. If not set, return None.
:rtype: Optional[SerializingFunction]
"""
return self._return_serialization[0] if self._return_serialization else None

def get_return_deserializer(self) -> Optional[DeserializingFunction]:
"""
Get the return value deserializer.
:return: The return value deserializer. If not set, return None.
:rtype: Optional[DeserializingFunction]
"""
return self._return_serialization[1] if self._return_serialization else None


class ReadStream(abc.ABC):
"""
ReadStream is an abstract class for reading streams.
"""

@abc.abstractmethod
def read(self, *args, **kwargs) -> Any:
"""
Read the stream.
:param args: The arguments to pass to the read method.
:param kwargs: The keyword arguments to pass to the read method.
:return: The read value.
"""
raise NotImplementedError()


class WriteStream(abc.ABC):
"""
WriteStream is an abstract class for writing streams.
"""

@abc.abstractmethod
def can_write_more(self) -> bool:
"""
Check if the stream can write more data.
:return: True if the stream can write more data, False otherwise.
:rtype: bool
"""
raise NotImplementedError()

@abc.abstractmethod
def write(self, *args, **kwargs) -> None:
"""
Write to the stream.
:param args: The arguments to pass to the write method.
:param kwargs: The keyword arguments to pass to the write method.
"""
raise NotImplementedError()

@abc.abstractmethod
def done_writing(self, **kwargs) -> None:
"""
Done writing to the stream.
:param kwargs: The keyword arguments to pass to the done
"""
raise NotImplementedError()


class ReadWriteStream(ReadStream, WriteStream, abc.ABC):
"""
ReadWriteStream is an abstract class for reading and writing streams.
"""

pass
Loading

0 comments on commit aff0858

Please sign in to comment.