Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message broker module #11

Merged
merged 31 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dc43f02
Add message broker module
yoavnash May 16, 2022
f59eaa5
Add example
yoavnash May 16, 2022
0dfbe42
Update URL
yoavnash May 16, 2022
b5ed5a2
Minor revision
yoavnash May 23, 2022
e58d6ab
Compute queue name and add response model
yoavnash May 27, 2022
e06aa33
Rename code field
yoavnash May 27, 2022
788d9a3
Fix encoding
yoavnash May 27, 2022
fb0f382
Remove headers from response model
yoavnash May 27, 2022
bca89b2
Restore headers in models
yoavnash May 27, 2022
1024810
Remove base64 encoding; rename callback to handler
yoavnash May 31, 2022
5a11cd5
Fix comment text
yoavnash May 31, 2022
4ab3bd4
Add comment
yoavnash May 31, 2022
7286d48
Update comment text
yoavnash May 31, 2022
263bd6b
Import marketplace-standard-app-api from PyPI
yoavnash Jun 1, 2022
6ada3ed
Add message broker module
yoavnash May 16, 2022
99bdaa2
Add example
yoavnash May 16, 2022
b15af99
Update URL
yoavnash May 16, 2022
954b397
Minor revision
yoavnash May 23, 2022
810ddd8
Compute queue name and add response model
yoavnash May 27, 2022
66af553
Rename code field
yoavnash May 27, 2022
7dead82
Fix encoding
yoavnash May 27, 2022
cfe1341
Remove headers from response model
yoavnash May 27, 2022
2d7356d
Restore headers in models
yoavnash May 27, 2022
7d720d9
Remove base64 encoding; rename callback to handler
yoavnash May 31, 2022
a204e3c
Fix comment text
yoavnash May 31, 2022
d1e7161
Add comment
yoavnash May 31, 2022
570ce9a
Update comment text
yoavnash May 31, 2022
94b6796
Import marketplace-standard-app-api from PyPI
yoavnash Jun 1, 2022
73b2550
Merge branch 'message_broker' of github.com:materials-marketplace/pyt…
yoavnash Jun 2, 2022
615538d
Merge branch 'main' into message_broker
yoavnash Jun 2, 2022
00289c6
Remove pydantic from dependency list
yoavnash Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
exclude tests
recursive-exclude tests *
exclude examples
recursive-exclude examples *

exclude .pre-commit-config.yaml
exclude .gitlab-ci.yml

include LICENSE
include README.md
include example.py
recursive-include logos *.png
File renamed without changes.
33 changes: 33 additions & 0 deletions examples/rpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json

from marketplace_standard_app_api.models.message_broker import (
MessageBrokerRequestModel,
MessageBrokerResponseModel,
)

from marketplace.message_broker.rpc_server import RpcServer


def my_message_handler(
request_message: MessageBrokerRequestModel,
) -> MessageBrokerResponseModel:
print("Routing to endpoint %r..." % request_message.endpoint)
payload = json.loads(request_message.body) if request_message.body else {}
result = len(payload)
response = {"numberOfKeysInPayload": str(result)}
print("Done!")
response_message = MessageBrokerResponseModel(
status_code=200,
body=json.dumps(response),
headers={"Content-Type": "application/json"},
)
return response_message


rpc_server = RpcServer(
host="www.materials-marketplace.eu",
application_id="<application-id>",
application_secret="<application-secret>",
message_handler=my_message_handler,
)
rpc_server.consume_messages()
Empty file.
43 changes: 43 additions & 0 deletions marketplace/message_broker/rpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import json
import logging

import pika
from marketplace_standard_app_api.models.message_broker import MessageBrokerRequestModel

from .utils import calc_queue_name

logger = logging.getLogger(__name__)


class RpcServer:
def __init__(self, host, application_id, application_secret, message_handler):
self.queue_name = calc_queue_name(application_id, application_secret)
self.message_handler = message_handler
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self.channel = connection.channel()

self.channel.queue_declare(queue=self.queue_name)

def consume_messages(self):
def callback(ch, method, properties, body):
request_message = MessageBrokerRequestModel.parse_obj(
json.loads(body.decode())
)

response_message = self.message_handler(request_message)

ch.basic_publish(
exchange="",
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id,
),
body=response_message.json(),
)
ch.basic_ack(delivery_tag=method.delivery_tag)

self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback)

logger.info("Waiting for messages. To exit press CTRL+C")
self.channel.start_consuming()
11 changes: 11 additions & 0 deletions marketplace/message_broker/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import hashlib


def calc_queue_name(application_id: str, application_secret: str) -> str:
h = int(
hashlib.sha256(
(application_id + application_secret).encode("utf-8")
).hexdigest(),
16,
) % (10**16)
return f"{application_id}:{h}"
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ classifiers =
[options]
packages = find:
install_requires =
marketplace-standard-app-api==0.1.0
pika==1.2.1
requests~=2.26.0
python_requires = >=3.8

Expand Down