Chapter under construction. |
In this chapter:
We’ll see how to use events to communicate between multiple microservices.
We’ll use Redis as a publish-subscribe service
In the last chapter we never actually spoke about how we would receive the "batch quantity changed" events, or indeed, how we might notify the outside world about reallocations.
We’ve got a microservice with a web API, but what about other ways of talking to other systems? How does it know if, say, a shipment is delayed or the quantity is amended? How does it communicate to our warehouse system to say that an order has been allocated and needs to be sent to a customer?
In this chapter we’d like to show how the events metaphor can be extended to encompass the way that we handle incoming and outgoing messages from the system.
To avoid the "distributed BBOM" antipattern, instead of temporally coupled HTTP API calls, we want to use some sort of asynchronous messaging layer to integrate between systems. We want our "batch quantity changed" messages to come in as external events from upstream systems, and we want our system to publish "allocated" events for downstream systems to listen to.
When moving towards events as an integration solution, you need to choose some sort of technology for passing those events from one system to another. We need to be able to publish events to some central service, and we need some way for other systems to be able to "subscribe" to different types of messages, and pick them up asynchronously from some sort of queue.
At we use Eventstore; Kafka or RabbitMQ are valid alternatives. A lightweight solution based on Redis pubsub channels can also work just fine, and since Redis is much more generally familiar to people, we thought we’d use it for this book.
We’re glossing over the complexity involved in choosing the right messaging platform. Concerns like message ordering, failure handling and idempotency all need to be thought through. For a few pointers, see the Footguns section in [epilogue_1_how_to_get_there_from_here]. |
Our new flow will look like this:
[plantuml, reallocation_sequence_diagram] @startuml Redis -> MessageBus : BatchQuantityChanged event group BatchQuantityChanged Handler + Unit of Work 1 MessageBus -> Domain_Model : change batch quantity Domain_Model -> MessageBus : emit AllocationRequired event(s) end group AllocationRequired Handler + Unit of Work 2 (or more) MessageBus -> Domain_Model : allocate Domain_Model -> MessageBus : emit Allocated event(s) end MessageBus -> Redis : publish to line_allocated channel @enduml
Here’s how we might start with an end-to-end test. We can use our existing API to create batches, and then we’ll test both inbound and outbound messages:
def test_change_batch_quantity_leading_to_reallocation():
# start with two batches and an order allocated to one of them #(1)
orderid, sku = random_orderid(), random_sku()
earlier_batch, later_batch = random_batchref('old'), random_batchref('newer')
api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-02') #(2)
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02') #(2)
response = api_client.post_to_allocate(orderid, sku, 10) #(2)
assert response.json()['batchref'] == earlier_batch
subscription = redis_client.subscribe_to('line_allocated') #(3)
# change quantity on allocated batch so it's less than our order #(1)
redis_client.publish_message('change_batch_quantity', { #(3)
'batchref': earlier_batch, 'qty': 5
# wait until we see a message saying the order has been reallocated #(1)
messages = []
def assert_new_allocation_published(): #(4)
messages.append(wait_for(subscription.get_message)) #(4)
data = json.loads(messages[-1]['data'])
assert data['orderid'] == orderid
assert data['batchref'] == later_batch
return True
wait_for(assert_new_allocation_published) #(4)
You can read the story of what’s going on in this test from the comments: we want to send an event into the system that causes an order line to be reallocated, and we see that reallocation come out as an event in redis too.
is a little helper that we refactored out to share between our two test types, it wraps our calls
is another test little test helper, the details of which don’t really matter; its job is to be able to send and receive messages from various Redis channels. We’ll use a channel calledchange_batch_quantity
to send in our request to change the quantity for a batch, and we’ll listen to another channel calledline_allocated
to look out for the expected reallocation. -
The last little test helper is a
function. Because we’re moving to asynchronous model, we need our tests to be able to wait until something happens. To do that, we wrap our assertions inside a function. We’ll show the code forwait_for
below, for the curious:
def wait_for(fn):
Keep retrying a function, catching any exceptions, until it returns something truthy,
or we hit a timeout.
timeout = time.time() + 3
while time.time() < timeout:
r = fn()
if r:
return r
if time.time() > timeout:
time.sleep(0.1)'function {fn} never returned anything truthy')
Check out the tenacity library if you don’t fancy hand-rolling your own retry helpers. |
Our Redis pubsub client is very much like flask: it translates from the outside world to our events:
r = redis.Redis(**config.get_redis_host_and_port())
def main():
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe('change_batch_quantity') #(1)
for m in pubsub.listen():
def handle_change_batch_quantity(m):
logging.debug('handling %s', m)
data = json.loads(m['data']) #(2)
cmd = commands.ChangeBatchQuantity(ref=data['batchref'], qty=data['qty'])
messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
def publish(channel, event: events.Event): #(3)
logging.debug('publishing: channel=%s, event=%s', channel, event)
r.publish(channel, json.dumps(asdict(event)))
subscribes us to thechange_batch_quantity
channel on load -
And our main job as an entrypoint to the system is to deserialize JSON, and pass it to the service layer, much like the Flask adapter does.
We also provide a helper function to publish events back into Redis.
Here’s what the Allocated
event will look like:
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: str
It captures everything we need to know about an allocation: the details of the order line, and which batch it was allocated to.
We use add it into our model’s allocate()
method (having added a test
first, naturally)
class Product:
def allocate(self, line: OrderLine) -> str:
self.version_number += 1
orderid=line.orderid, sku=line.sku, qty=line.qty,
return batch.reference
The handler for ChangeBatchQuantity
already exists, so all we need to add
is a handler that publishes the outgoing event:
events.Allocated: [handlers.publish_allocated_event],
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
Publishing the event uses our helper function from the redis wrapper:
def publish_allocated_event(
event: events.Allocated, uow: unit_of_work.AbstractUnitOfWork,
redis_pubsub.publish('line_allocated', event)
Outbound events are one of the places it’s important to apply some validation. See [appendix_validation] for some validation philosophy and examples. |
It’s a good idea to keep the distinction between internal and external events clear. Some events may come from the outside, and some events may get upgraded and published externally, but not all of them. This is particularly important if you get into [event sourcing]( (very much a topic for another book though).