In this lab we will experiment with Kafka Changefeed out of CockroachDB using Confluent.
- DBeaver
- ./cockroach sql --insecure
- psql
Student, Database, pgurl, adminurl
The lab cluster is configured in Google Clould in a singe zone:
- us-east4-b
https://github.com/glennfawcett/roachcrib
# Start Consumer for Avro
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic student2_pets
# List Topics
./bin/kafka-topics --list --bootstrap-server localhost:9092
# Delete Topics
./bin/kafka-topics --bootstrap-server localhost:9092 --delete --topic student2_pets
-- Enable rangefeed on CDC cluster
--
SHOW cluster setting kv.rangefeed.enabled;
SET CLUSTER SETTING kv.rangefeed.enabled='true';
-- Connect to your Database
--
use student0;
CREATE TABLE pets (
id UUID NOT NULL DEFAULT gen_random_uuid(),
person_name string,
email string,
pet_name string
);
- How do you verify the CHANGEFEED is running?
Create the changefeed but make sure to change topic_prefix to your database name:
-- Connect to your Database
--
use student2;
-- Create CHANGEFEED... set topic_prefix to your database name!!
--
CREATE CHANGEFEED FOR TABLE pets
INTO 'kafka://10.142.0.33:9092?topic_prefix=student2_'
WITH updated, resolved='20s',
confluent_schema_registry = 'http://10.142.0.33:8081',
format = 'experimental_avro',
diff,
schema_change_policy=backfill;
Connect to Confluent/Kafka machine and run the consumer:
## Start a Avro consumer in a SHELL on the kafka cluster
##
ssh -i ./bench-ssh-key bench@glenn-confluent-0001.roachprod.crdb.io
cd confluent-5.5.0
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic student2_pets
From another window, connect to the database and insert some values into the table you created:
-- Insert some values to students table
--
INSERT INTO pets (person_name, email, pet_name) VALUES ('Christopher', 'crobin@100acrewoods.com', 'Pooh');
INSERT INTO pets (person_name, email, pet_name) VALUES ('Christopher', 'crobin@100acrewoods.com', 'Tigger');
INSERT INTO pets (person_name, email, pet_name) VALUES ('Christopher', 'crobin@100acrewoods.com', 'Piglet');
INSERT INTO pets (person_name, email, pet_name) VALUES ('Walt', 'walt@disney.com', 'Mickey');
INSERT INTO pets (person_name, email, pet_name) VALUES ('Walt', 'walt@disney.com', 'Minnie');
- What does
{"before": null,
mean?
- What columns are sent to the changefeed?
- What does the
{"resolved":
timestamp mean?
-- Alter table to add City
--
ALTER TABLE pets ADD COLUMN city STRING;
- What values are submitted to the CHANGEFEED?
UPDATE pets SET city='Hundred Acre Woods' where person_name='Christopher';
UPDATE pets SET city='Anaheim' where person_name='Walt';
- What values for EACH row are sent to the CHANGEFEED?
- What is the
"updated":
value?
- How do you create the CHANGEFEED so the before value isn't sent?
This activity will have you cancel the changefeed and restart without the before values.
- How do you cancel the running CHANGEFEED?
- Show the
CREATE CHANGEFEED
statement such that the before values are not included.
This activity will have you cancel the changefeed and restart without the before values. Additionally, the changefeed will be restarted such that changes made before the current timestamp are NOT included.
- Show the
CREATE CHANGEFEED
statement such that changes before the current timestamp are NOT included and before values are not included. Test theCHANGEFEED
by updating and inserting rows to the table.