Skip to content

Latest commit

 

History

History
331 lines (250 loc) · 11 KB

chapter_10_external_events.asciidoc

File metadata and controls

331 lines (250 loc) · 11 KB

Event-driven Architecture: Using Events To Integrate Microservices

Note
Chapter under construction.

In this chapter:

  • We’ll see how to use events to communicate between multiple microservices.

  • We’ll use Redis as a publish-subscribe service

TODO: DIAGRAM GOES HERE

How Do We Talk To The Outside World?

In the last chapter we never actually spoke about how we would receive the "batch quantity changed" events, or indeed, how we might notify the outside world about reallocations.

We’ve got a microservice with a web API, but what about other ways of talking to other systems? How does it know if, say, a shipment is delayed or the quantity is amended? How does it communicate to our warehouse system to say that an order has been allocated and needs to be sent to a customer?

In this chapter we’d like to show how the events metaphor can be extended to encompass the way that we handle incoming and outgoing messages from the system.

Using A Redis Pubsub Channel For Integration

To avoid the "distributed BBOM" antipattern, instead of temporally coupled HTTP API calls, we want to use some sort of asynchronous messaging layer to integrate between systems. We want our "batch quantity changed" messages to come in as external events from upstream systems, and we want our system to publish "allocated" events for downstream systems to listen to.

When moving towards events as an integration solution, you need to choose some sort of technology for passing those events from one system to another. We need to be able to publish events to some central service, and we need some way for other systems to be able to "subscribe" to different types of messages, and pick them up asynchronously from some sort of queue.

At MADE.com we use Eventstore; Kafka or RabbitMQ are valid alternatives. A lightweight solution based on Redis pubsub channels can also work just fine, and since Redis is much more generally familiar to people, we thought we’d use it for this book.

Note
We’re glossing over the complexity involved in choosing the right messaging platform. Concerns like message ordering, failure handling and idempotency all need to be thought through. For a few pointers, see the Footguns section in [epilogue_1_how_to_get_there_from_here].

Our new flow will look like this:

reallocation sequence diagram
Figure 1. Sequence diagram for reallocation flow
[plantuml, reallocation_sequence_diagram]
@startuml
Redis -> MessageBus : BatchQuantityChanged event

group BatchQuantityChanged Handler + Unit of Work 1
    MessageBus -> Domain_Model : change batch quantity
    Domain_Model -> MessageBus : emit AllocationRequired event(s)
end


group AllocationRequired Handler + Unit of Work 2 (or more)
    MessageBus -> Domain_Model : allocate
    Domain_Model -> MessageBus : emit Allocated event(s)
end

MessageBus -> Redis : publish to line_allocated channel
@enduml

Test-driving It All Using An End-to-end Test

Here’s how we might start with an end-to-end test. We can use our existing API to create batches, and then we’ll test both inbound and outbound messages:

Example 1. An end-to-end test for our pubsub model (tests/e2e/test_external_events.py)
def test_change_batch_quantity_leading_to_reallocation():
    # start with two batches and an order allocated to one of them  #(1)
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref('old'), random_batchref('newer')
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-02')  #(2)
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02')  #(2)
    response = api_client.post_to_allocate(orderid, sku, 10)  #(2)
    assert response.json()['batchref'] == earlier_batch

    subscription = redis_client.subscribe_to('line_allocated')  #(3)

    # change quantity on allocated batch so it's less than our order  #(1)
    redis_client.publish_message('change_batch_quantity', {  #(3)
        'batchref': earlier_batch, 'qty': 5
    })

    # wait until we see a message saying the order has been reallocated  #(1)
    messages = []
    def assert_new_allocation_published():  #(4)
        messages.append(wait_for(subscription.get_message))  #(4)
        print(messages)
        data = json.loads(messages[-1]['data'])
        assert data['orderid'] == orderid
        assert data['batchref'] == later_batch
        return True

    wait_for(assert_new_allocation_published)  #(4)
  1. You can read the story of what’s going on in this test from the comments: we want to send an event into the system that causes an order line to be reallocated, and we see that reallocation come out as an event in redis too.

  2. api_client is a little helper that we refactored out to share between our two test types, it wraps our calls to requests.post

  3. redis_client is another test little test helper, the details of which don’t really matter; its job is to be able to send and receive messages from various Redis channels. We’ll use a channel called change_batch_quantity to send in our request to change the quantity for a batch, and we’ll listen to another channel called line_allocated to look out for the expected reallocation.

  4. The last little test helper is a wait_for function. Because we’re moving to asynchronous model, we need our tests to be able to wait until something happens. To do that, we wrap our assertions inside a function. We’ll show the code for wait_for below, for the curious:

Example 2. A helper function for testing asynchronous behaviour (tests/e2e/wait_for.py)
def wait_for(fn):
    """
    Keep retrying a function, catching any exceptions, until it returns something truthy,
    or we hit a timeout.
    """
    timeout = time.time() + 3
    while time.time() < timeout:
        try:
            r = fn()
            if r:
                return r
        except:
            if time.time() > timeout:
                raise
        time.sleep(0.1)
    pytest.fail(f'function {fn} never returned anything truthy')
Tip
Check out the tenacity library if you don’t fancy hand-rolling your own retry helpers.

Redis Is Another Thin Adapter Around Our Message Bus

Our Redis pubsub client is very much like flask: it translates from the outside world to our events:

Example 3. A first cut of a redis message listener (src/allocation/redis_pubsub.py)
r = redis.Redis(**config.get_redis_host_and_port())


def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe('change_batch_quantity')  #(1)

    for m in pubsub.listen():
        handle_change_batch_quantity(m)


def handle_change_batch_quantity(m):
    logging.debug('handling %s', m)
    data = json.loads(m['data'])  #(2)
    cmd = commands.ChangeBatchQuantity(ref=data['batchref'], qty=data['qty'])
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())


def publish(channel, event: events.Event):  #(3)
    logging.debug('publishing: channel=%s, event=%s', channel, event)
    r.publish(channel, json.dumps(asdict(event)))
  1. main() subscribes us to the change_batch_quantity channel on load

  2. And our main job as an entrypoint to the system is to deserialize JSON, and pass it to the service layer, much like the Flask adapter does.

  3. We also provide a helper function to publish events back into Redis.

Our new outgoing event

Here’s what the Allocated event will look like:

Example 4. New event (src/allocation/events.py)
@dataclass
class Allocated(Event):
    orderid: str
    sku: str
    qty: int
    batchref: str

It captures everything we need to know about an allocation: the details of the order line, and which batch it was allocated to.

We use add it into our model’s allocate() method (having added a test first, naturally)

Example 5. Product.allocate() emits new event to record what happened (src/allocation/model.py)
class Product:
    ...
    def allocate(self, line: OrderLine) -> str:
        ...

            batch.allocate(line)
            self.version_number += 1
            self.events.append(events.Allocated(
                orderid=line.orderid, sku=line.sku, qty=line.qty,
                batchref=batch.reference,
            ))
            return batch.reference

The handler for ChangeBatchQuantity already exists, so all we need to add is a handler that publishes the outgoing event:

Example 6. The messagebus grows (src/allocation/messagebus.py)
HANDLERS = {
    events.Allocated: [handlers.publish_allocated_event],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

Publishing the event uses our helper function from the redis wrapper:

Example 7. Publish to redis (src/allocation/handlers.py)
def publish_allocated_event(
        event: events.Allocated, uow: unit_of_work.AbstractUnitOfWork,
):
    redis_pubsub.publish('line_allocated', event)
Tip
Outbound events are one of the places it’s important to apply some validation. See [appendix_validation] for some validation philosophy and examples.
Internal vs External events

It’s a good idea to keep the distinction between internal and external events clear. Some events may come from the outside, and some events may get upgraded and published externally, but not all of them. This is particularly important if you get into [event sourcing](https://io.made.com/eventsourcing-101/) (very much a topic for another book though).

Wrap-up

  • events can come from the outside, but they can also be published externally — our publish handler converts an event to a message on a redis channel. We use events to talk to the outside world.

TODO: more here