Skip to content

Latest commit

 

History

History
529 lines (388 loc) · 12.3 KB

section-14.md

File metadata and controls

529 lines (388 loc) · 12.3 KB

Section 14: NATS Streaming Server - An Event Bus Implementation

Table of Contents

What Now?

⬆ back to top

Three Important Items

NATS Streaming Server

⬆ back to top

Creating a NATS Streaming Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nats-depl
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nats
  template:
    metadata:
      labels:
        app: nats
    spec:
      containers:
        - name: nats
          image: nats-streaming:0.17.0
          args:
            [
              '-p',
              '4222',
              '-m',
              '8222',
              '-hbi',
              '5s',
              '-hbt',
              '5s',
              '-hbf',
              '2',
              '-SD',
              '-cid',
              'ticketing',
            ]
---
apiVersion: v1
kind: Service
metadata:
  name: nats-srv
spec:
  selector:
    app: nats
  ports:
    - name: client
      protocol: TCP
      port: 4222
      targetPort: 4222
    - name: monitoring
      protocol: TCP
      port: 8222
      targetPort: 8222
cd section-14/ticketing
skaffold dev
kubectl get pods

⬆ back to top

Big Notes on NATS Streaming

Stan.js - Node.js client for NATS Streaming

⬆ back to top

Building a NATS Test Project

Short Term Goal

  • Create a new sub-project with typescript support
  • Install node-nats-streaming library and connect to nats streaming server
  • We should have two npm scripts, one to run code to emit events, and one to run code to listen for events
  • This program will be ran outside of kubernetes!
// publisher.ts
import nats from 'node-nats-streaming';

const stan = nats.connect('ticketing', 'abc', {
  url: 'http://localhost:4222',
});

stan.on('connect', () => {
  console.log('Publisher connected to NATS');
});

⬆ back to top

Port-Forwarding with Kubectl

  • Option #3 is selected for small test program
kubectl get pods
kubectl port-forward nats-depl-7cf98f65b8-p8nk6 4222:4222
cd section-14/ticketing/nats-test
npm run publish

⬆ back to top

Publishing Events

// publisher.ts
import nats from 'node-nats-streaming';

const stan = nats.connect('ticketing', 'abc', {
  url: 'http://localhost:4222',
});

stan.on('connect', () => {
  console.log('Publisher connected to NATS');

  const data = JSON.stringify({
    id: '123',
    title: 'concert',
    price: 20
  });

  stan.publish('ticket:created', data, () => {
    console.log('Event published');
  })
});

⬆ back to top

Listening For Data

// listener.ts
import nats from 'node-nats-streaming';

console.clear();

const stan = nats.connect('ticketing', '123', {
  url: 'http://localhost:4222',
});

stan.on('connect', () => {
  console.log('Listener connected to NATS');

  const subscription = stan.subscribe('ticket:created');

  subscription.on('message', (msg) => {
    console.log('Message recieved');
  });
});
  • split screen to watch both publisher and listener
  • type rs and enter to re-start publisher

⬆ back to top

Accessing Event Data

import nats, { Message } from 'node-nats-streaming';

console.clear();

const stan = nats.connect('ticketing', '123', {
  url: 'http://localhost:4222',
});

stan.on('connect', () => {
  console.log('Listener connected to NATS');

  const subscription = stan.subscribe('ticket:created');

  subscription.on('message', (msg: Message) => {
    const data = msg.getData();

    if (typeof data === 'string') {
      console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
    }
  });
});

⬆ back to top

Client ID Generation

import { randomBytes } from 'crypto';

const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {
  url: 'http://localhost:4222',
});

⬆ back to top

Queue Groups

  const subscription = stan.subscribe(
    'ticket:created', 
    'orders-service-queue-group'
  );
  • listener join 'orders-service-queue-group'
  • publisher send a event
  • only one listener in 'orders-service-queue-group' receive the event at a time

⬆ back to top

Manual Ack Mode

  • event can be lost for auto acknowledgement when error occurs

  • listener manually acknowledge once it process the message successfully
stan.on('connect', () => {
  console.log('Listener connected to NATS');

  const options = stan
    .subscriptionOptions()
    .setManualAckMode(true);
  const subscription = stan.subscribe(
    'ticket:created', 
    'orders-service-queue-group',
    options
  );

  subscription.on('message', (msg: Message) => {
    const data = msg.getData();

    if (typeof data === 'string') {
      console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
    }

    msg.ack();
  });
});

⬆ back to top

Client Health Checks

  • monitoring port 8222 for debugging
kubectl get pods
kubectl port-forward nats-depl-7cf98f65b8-p8nk6 8222:8222
  • open chrome
  • goto localhost:8222/streaming

goto http://localhost:8222/streaming/channelsz?subs=1

  • 2 listeners are available
  • if re-start one listener, within 30s there are 3 listeners
  • after 30s, drops back to 2 listeners

⬆ back to top

Graceful Client Shutdown

const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {
  url: 'http://localhost:4222',
});

stan.on('connect', () => {
  console.log('Listener connected to NATS');

  stan.on('close', () => {
    console.log('NATS connection closed!');
    process.exit();
  });
  
  const options = stan
    .subscriptionOptions()
    .setManualAckMode(true);
  const subscription = stan.subscribe(
    'ticket:created', 
    'orders-service-queue-group',
    options
  );

  subscription.on('message', (msg: Message) => {
    const data = msg.getData();

    if (typeof data === 'string') {
      console.log(`Received event #${msg.getSequence()}, with data: ${data}`);
    }

    msg.ack();
  });
});

process.on('SIGINT', () => stan.close());
process.on('SIGTERM', () => stan.close());

⬆ back to top

Core Concurrency Issues

  • Success

  • Fail to update +$70 at file storage

  • One listener might run more quicker than another
  • -$100 is done faster than +$70 and +$40

  • NATS might think a client is still alive when it is dead

  • We might receive the same event twice

⬆ back to top

Common Questions

  • Async (event-based) communication sounds terrible, right?!?!
  • Oh, turns out this happens with sync communications
  • Oh, and it happens with classic monolith style apps too.

  • Instance A and B are busy
  • Instance C do -$100 before +$70 and +$40 complete

  • receive +$70, +$40 and -$100 events, any event can fail too
  • bottleneck for listener
  • hard to scale
    • vertically: increase specification per service
    • horizontally: add more instance of the service

Solution that won't work #2 - Figure out every possible error case and write code to handle it

  • An infinite number of things can fail
  • Engineering time = $$$$$
  • Does it matter if two tweets are out of order?

⬆ back to top

[Optional] More Possible Concurrency Solutions

  • Share state between services of last event processed

  • Event #1 fail. Cannot +$70 to User A account
  • Event #2: +$40 to User B account will be delay

  • Last event processed tracked by resource ID

  • Last Sequence ID

⬆ back to top

Solving Concurrency Issues

  • We are working with a poorly designed system and relying on NATS to somehow save us
  • We should revisit the service design.
  • If we redesign the system, a better solution to this concurrency stuff will present itself

⬆ back to top

Concurrency Control with the Tickets App

⬆ back to top

Event Redelivery

const options = stan
  .subscriptionOptions()
  .setManualAckMode(true)
  .setDeliverAllAvailable();

⬆ back to top

Durable Subscriptions

const options = stan
  .subscriptionOptions()
  .setManualAckMode(true)
  .setDeliverAllAvailable()
  .setDurableName('accounting-service');

const subscription = stan.subscribe(
  'ticket:created',
  'queue-group-name',
  options
);

⬆ back to top