Skip to content

Commit

Permalink
server: asyncio refactoring (#582)
Browse files Browse the repository at this point in the history
Refactored server and client to use `Streams` (higher-level abstraction compared to `sockets`)
  • Loading branch information
jarulraj authored Feb 14, 2023
1 parent 898fc7d commit 034a74d
Show file tree
Hide file tree
Showing 32 changed files with 1,005 additions and 1,263 deletions.
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ jobs:
command: |
source test_evadb/bin/activate
sh script/test/test.sh
coveralls
- save_cache:
key: v1-model_cache-{{ checksum "setup.py" }}
Expand Down
1 change: 0 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[run]
omit =
eva/parser/evaql/*
eva/udfs/abstract/*
eva/udfs/emotion_detector.py
eva/experimental/*
Expand Down
36 changes: 12 additions & 24 deletions eva/eva_cmd_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import sys
from os.path import abspath, dirname, join

Expand All @@ -29,40 +29,28 @@
from eva.server.interpreter import start_cmd_client # noqa: E402


def eva_client(host="0.0.0.0", port=5432):
async def eva_client():
"""
Start the eva system
"""

# Sets up logger
config = ConfigurationManager() # noqa: F841
# Get the hostname and port information from the configuration file
config = ConfigurationManager()
host = config.get_value("server", "host")
port = config.get_value("server", "port")

# Launch server
# Launch client
try:
start_cmd_client(host=host, port=port)
await start_cmd_client(host, port)
except KeyboardInterrupt:
pass
except Exception as e:
logger.critical(e)


def parse_args(args):
parser = argparse.ArgumentParser(description="")
parser.add_argument(
"-H",
"--host",
dest="host",
type=str,
help="Host address for EVA server",
default="0.0.0.0",
)
parser.add_argument(
"-P", "--port", dest="port", type=int, help="Port for EVA server", default=5432
)
return parser.parse_args(args)
raise e


def main():
args = parse_args(sys.argv[1:])
eva_client(host=args.host, port=args.port)
asyncio.run(eva_client())


if __name__ == "__main__":
Expand Down
35 changes: 10 additions & 25 deletions eva/eva_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

from psutil import process_iter

from eva.utils.logging_manager import logger

"""
To allow running eva_server from any location
"""
Expand All @@ -30,36 +28,22 @@
sys.path.append(EVA_CODE_DIR)

from eva.configuration.configuration_manager import ConfigurationManager # noqa: E402
from eva.server.server import start_server # noqa: E402
from eva.server.server import EvaServer # noqa: E402
from eva.udfs.udf_bootstrap_queries import init_builtin_udfs # noqa: E402


def eva():
async def start_eva_server():
"""
Start the eva system
Start the eva server
"""
# Get the hostname and port information from the configuration file
config = ConfigurationManager()
hostname = config.get_value("server", "host")
host = config.get_value("server", "host")
port = config.get_value("server", "port")
socket_timeout = config.get_value("server", "socket_timeout")
loop = asyncio.new_event_loop()
stop_server_future = loop.create_future()

# Launch server
try:
asyncio.run(
start_server(
host=hostname,
port=port,
loop=loop,
socket_timeout=socket_timeout,
stop_server_future=stop_server_future,
)
)

except Exception as e:
logger.critical(e)

eva_server = EvaServer()

await eva_server.start_eva_server(host, port)


def stop_server():
Expand Down Expand Up @@ -101,7 +85,8 @@ def main():
if args.start:
mode = ConfigurationManager().get_value("core", "mode")
init_builtin_udfs(mode=mode)
eva()

asyncio.run(start_eva_server())


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions eva/models/server/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class Response:
Data model for EVA server response
"""

status: ResponseStatus
batch: Batch
status: ResponseStatus = ResponseStatus.FAIL
batch: Batch = None
error: Optional[str] = None
query_time: Optional[float] = None

Expand Down
118 changes: 0 additions & 118 deletions eva/server/async_protocol.py

This file was deleted.

10 changes: 6 additions & 4 deletions eva/server/command_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from eva.optimizer.plan_generator import PlanGenerator
from eva.optimizer.statement_to_opr_convertor import StatementToPlanConvertor
from eva.parser.parser import Parser
from eva.server.networking_utils import serialize_message
from eva.utils.logging_manager import logger
from eva.utils.timer import Timer

Expand Down Expand Up @@ -56,7 +55,7 @@ def execute_query_fetch_all(query, **kwargs) -> Optional[Batch]:


@asyncio.coroutine
def handle_request(transport, request_message):
def handle_request(client_writer, request_message):
"""
Reads a request from a client and processes it
Expand Down Expand Up @@ -91,8 +90,11 @@ def handle_request(transport, request_message):

query_runtime.log_elapsed_time("Query Response Time")

responseData = serialize_message(response)
logger.debug(response)

transport.write(responseData)
response_data = Response.serialize(response)

client_writer.write(b"%d\n" % len(response_data))
client_writer.write(response_data)

return response
Loading

0 comments on commit 034a74d

Please sign in to comment.