Skip to content

Latest commit

 

History

History
372 lines (279 loc) · 9.84 KB

no-more-silos-oracle.adoc

File metadata and controls

372 lines (279 loc) · 9.84 KB

No More Silos - Demo

The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/no-more-silos-integrating-databases-and-apache-kafka

Pre-reqs

Local:

  • curl

  • jq

  • Docker

Pre-Flight Setup

Start the environment

cd docker-compose
./scripts/setup.sh

Verify that the Xstream capture process is running:

docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus sys/Admin123@//localhost:1521/ORCLCDB as sysdba'
COLUMN ACTION HEADING 'XStream Component' FORMAT A30
COLUMN SID HEADING 'Session ID' FORMAT 99999
COLUMN SERIAL# HEADING 'Session|Serial|Number' FORMAT 99999999
COLUMN PROCESS HEADING 'Operating System|Process ID' FORMAT A17
COLUMN PROCESS_NAME HEADING 'XStream|Program|Name' FORMAT A7

SELECT /*+PARAM('_module_action_old_length',0)*/ ACTION,
       SID,
       SERIAL#,
       PROCESS,
       SUBSTR(PROGRAM,INSTR(PROGRAM,'(')+1,4) PROCESS_NAME
  FROM V$SESSION
  WHERE MODULE ='XStream';

Expected output:

                                            Session                   XStream
                                             Serial Operating System  Program
XStream Component              Session ID    Number Process ID        Name
------------------------------ ---------- --------- ----------------- -------
DBZXOUT - Apply Reader                  9      6842 3380              AS01
DBZXOUT - Apply Server                 16     35968 1                 TNS
DBZXOUT - Apply Server                145     13290 3382              AS02
CAP$_DBZXOUT_1 - Capture              270     62826 3384              CP01
DBZXOUT - Apply Coordinator           396     27273 3378              AP01
DBZXOUT - Propagation Send/Rcv        399      6031 3386              CX01

6 rows selected.

If you don’t see these running (particularly the Capture) then refer to debezium-xstream-system-output.adoc

Check that Kafka Connect is running:

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'

Check that required connectors are loaded

curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'OracleConnector|JdbcSourceConnector'
"io.confluent.connect.jdbc.JdbcSourceConnector"
"io.debezium.connector.oracle.OracleConnector"

Run ksqlDB CLI and SQL*Plus

Optionally, use something like screen or tmux to have these both easily to hand. Or multiple Terminal tabs. Whatever works for you :)

  • ksqlDB CLI:

    docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
  • SQL*Plus

    docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'

    (the sleep is necessary to avoid rlwrap: error: My terminal reports width=0 (is it emacs?) I can’t handle this, sorry! ref)

  • Oracle log

    docker logs -f oracle
  • Debezium log

    docker logs -f connect-debezium

Demo

Show Oracle table + contents

COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CREATE_TS FOR A29
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CREATE_TS, UPDATE_TS FROM CUSTOMERS;
  ID FIRST_NAME      LAST_NAME       CREATE_TS                     UPDATE_TS
---- --------------- --------------- ----------------------------- -----------------------------
   1 Rica            Blaisdell       04-DEC-18 08.22.32.933376 PM  04-DEC-18 08.22.32.000000 PM
   2 Ruthie          Brockherst      04-DEC-18 08.22.32.953342 PM  04-DEC-18 08.22.32.000000 PM
   3 Mariejeanne     Cocci           04-DEC-18 08.22.32.965713 PM  04-DEC-18 08.22.32.000000 PM
   4 Hashim          Rumke           04-DEC-18 08.22.32.977417 PM  04-DEC-18 08.22.32.000000 PM
   5 Hansiain        Coda            04-DEC-18 08.22.32.979967 PM  04-DEC-18 08.22.32.000000 PM

Check status of connectors

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort

Expected:

source  |  ora-source-debezium-xstream  |  RUNNING  |  RUNNING  |  io.debezium.connector.oracle.OracleConnector
source  |  ora-source-jdbc              |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSourceConnector

Show Kafka topic has been created & populated

In ksqlDB:

ksql> list topics;

Show contents

PRINT 'asgard.DEBEZIUM.CUSTOMERS' FROM BEGINNING;

Insert a row in Oracle, observe it in Kafka

SET AUTOCOMMIT ON;

INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');

Update a row in Oracle, observe it in Kafka

UPDATE CUSTOMERS SET CLUB_STATUS = 'Platinum' where ID=42;

ksqlDB for exploring CDC

Create a stream

SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_DBZ_SRC WITH (KAFKA_TOPIC='asgard.DEBEZIUM.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');

LIST STREAMS;

It also supports nested data

DESCRIBE CUSTOMERS_STREAM_DBZ_SRC;

Pretty-print the source data to show why nested

echo '{"before": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Bronze", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000706000000}, "after": {"ID": 42, "FIRST_NAME": "Rick", "LAST_NAME": "Astley", "EMAIL": null, "GENDER": null, "CLUB_STATUS": "Platinum", "COMMENTS": null, "CREATE_TS": 1544000706681769, "UPDATE_TS": 1544000742000000}, "source": {"version": "0.9.0.Alpha2", "connector": "oracle", "name": "asgard", "ts_ms": 1544000742000, "txId": "6.26.734", "scn": 2796831, "snapshot": false}, "op": "u", "ts_ms": 1544000745823, "messagetopic": "asgard.DEBEZIUM.CUSTOMERS", "messagesource": "Debezium CDC from Oracle on asgard"}'|jq '.'

or use kafkacat

docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS -C -u -q -o-1 -c1 -r http://schema-registry:8081 -s key=s -s value=avro|jq '.'

Look at before & after:

SELECT OP, AFTER->ID, BEFORE->CLUB_STATUS, AFTER->CLUB_STATUS FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
r | 1 | null | bronze
r | 2 | null | platinum
r | 3 | null | bronze
r | 4 | null | platinum
r | 5 | null | platinum
c | 42 | null | Bronze
u | 42 | Bronze | Platinum

JDBC only shows what it is now:

SELECT ID, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;

Do an update in the database, do a delete - note the data you get with proper CDC vs not

UPDATE CUSTOMERS SET CLUB_STATUS='Silver' WHERE ID=2;
DELETE FROM CUSTOMERS WHERE ID=2;

Flattening data:

CREATE STREAM CUSTOMERS_STREAM_FLATTENED AS \
    SELECT AFTER->ID AS ID, \
           AFTER->FIRST_NAME AS FIRST_NAME, \
           AFTER->LAST_NAME AS LAST_NAME, \
           AFTER->EMAIL AS EMAIL, \
           AFTER->GENDER AS GENDER, \
           AFTER->CLUB_STATUS AS CLUB_STATUS, \
           AFTER->COMMENTS AS COMMENTS \
      FROM CUSTOMERS_STREAM_DBZ_SRC
     EMIT CHANGES;
LIST TOPICS;
PRINT 'CUSTOMERS_STREAM_FLATTENED' FROM BEGINNING;

Checking lag

CREATE STREAM LAG_MONITOR_DBZ AS \
SELECT SOURCE->TS_MS, \
       ROWTIME - SOURCE->TS_MS AS LAG, \
       OP, \
       SOURCE->SNAPSHOT, \
       BEFORE->ID AS ID_BEFORE, \
       AFTER->ID AS ID_AFTER \
FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;

SELECT ID_BEFORE, ID_AFTER, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SYSTEM_TIME, LAG, OP FROM LAG_MONITOR_DBZ EMIT CHANGES;

(what would be nice here is to hook up LAG_MONITOR_DBZ to Elasticsearch or InfluxDB and have a little monitoring chart)

More cool stuff with ksqlDB

ksql> SELECT OP, COUNT(*) AS OP_COUNT FROM CUSTOMERS_STREAM_DBZ_SRC GROUP BY OP EMIT CHANGES;
c | 1
r | 9
u | 5
d | 3

JDBC

Show Kafka topic has been created & populated

In ksqlDB:

ksql> list topics;

Show contents

PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');

ksqlDB applies the schema to the data

DESCRIBE CUSTOMERS_STREAM_JDBC_SRC;

Lag

CREATE STREAM LAG_MONITOR_JDBC AS SELECT UPDATE_TS, ROWTIME-UPDATE_TS AS LAG, ID FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;

SELECT ID, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SYSTEM_TIME, LAG FROM LAG_MONITOR_JDBC EMIT CHANGES;