This repo is an example of using Apache Pulsar. It's made to help you better understand how Pulsar works in a local environmen so you can test all kinds of features of Pulsar.
- Python3.7 installed (The
pulsar-client
requires 3.7) - Docker & Docker Compost installed
git clone https://github.com/codingforentrepreneurs/getting-started-with-pulsar
cd getting-started-with-pulsar
Yes, you have to use Python3.7 to leverage the python pulsar-client
python3.7 -m venv venv
Activate it (mac/linux)
source venv/bin/activate
Windows:
.\venv\Scripts\activate
$(venv) python3.7 -m pip install pip --upgrade
$(venv) python3.7 -m pip install -r requirements.txt
docker compose up --build
- replace
docker compose
withdocker-compose
where necessary - Run with
-d
once you have the who repo working. - Our pulsar conatiner is named
gs_pulsar
so we can more easily run commands
Here are the details for our tenant:
- tenant name:
cfe-tenant
- namespace:
example-namespace
(think of it ascfe-tenant/example-namespace
) - topics:
input-topic
(think of this aspersistent://cfe-tenant/example-namespace/input-topic
)output-topic
(think of this aspersistent://cfe-tenant/example-namespace/output-topic
)
Here are the commands:
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants create cfe-tenant
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin namespaces create cfe-tenant/example-namespace
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin topics create persistent://cfe-tenant/example-namespace/input-topic
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin topics create persistent://cfe-tenant/example-namespace/output-topic
The Puslar/Docker command references are below
Functions give us the ability to modify data from an input topic and send it to an output topic.
In our local repo, we have the directory example_functions
that contains cfemain.py
which contains a class called EchoFunction
which contains a method called process
. Please review this method to understand the logic behind it.
In our above example we remember that we have:
- tenant name:
cfe-tenant
- namespace:
example-namespace
(think of it ascfe-tenant/example-namespace
) - topics:
input-topic
(think of this aspersistent://cfe-tenant/example-namespace/input-topic
)output-topic
(think of this aspersistent://cfe-tenant/example-namespace/output-topic
)
To create a function we need all the above plus
- An actual python class or function to handle input data (it can be a Java function too)
- A docker container path to the actual function (
/pulsar/example_functions/cfemain.py
). Remember thatexample_functions/
is being mounted within our Docker container at/pulsar/example_functions/
. This mounting was done manually withindocker-compose.yaml
Here's the command:
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin functions create \
--py /pulsar/example_functions/cfemain.py \
--classname cfemain.EchoFunction \
--tenant cfe-tenant \
--namespace example-namespace \
--name cfe-echo-function \
--inputs persistent://cfe-tenant/example-namespace/input-topic \
--output persistent://cfe-tenant/example-namespace/output-topic
Personally, when updating this function, I find it easiest to just delete it and start over. To delete it just use:
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin functions delete \
--tenant cfe-tenant \
--namespace example-namespace \
--name cfe-echo-function
Assuming you completed everything above, we can now run our consumer and producer.
Run the producer as many times as you like; if Pulsar is running (ie docker compose up --build
) the producer can start sending data (messages).
$(venv) python3.7 app/producer.py
This producer will send messages directly to our default input topic cfe-tenant/example-namespace/input-topic
If you need to change topics for your producer to send to, you can:
$(venv) python3.7 app/producer.py cfe-tenant/example-namespace/topic-that-does-not-yet-exist
if you run this topic, you can use cfe-tenant/example-namespace/topic-that-does-not-yet-exist
in a consumer too!
Let's boot up our consumer. This will run (and listen) for a producer to start sending items to Pulsar.
Defaults to listening to cfe-tenant/example-namespace/input-topic
$(venv) python3.7 app/consumer.py
The default consumer/topic skips what our Pulsar function does.
The output topic, as you may be aware, recieves data from the Pulsar Function we implemented above using Python. To listen to that data we must pass our topic in:
$(venv) python3.7 app/consumer.py cfe-tenant/example-namespace/output-topic
Notice that
cfe-tenant/example-namespace/output-topic
is reused exactly from above
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants list
We should see:
"public"
"pulsar"
"sample"
Create a new Tenant
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants create cfe-example
The format is
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants create <tenant name>
Get Tenant
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants get public
The format is
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants get <tenant name>
Delete Tenant
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants delete cfe-example
The format is
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin tenants delete <tenant name>
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin functions create \
--py /pulsar/example_functions/cfemain.py \
--classname cfemain.EchoFunction \
--tenant cfe-tenant \
--namespace example-namespace \
--name cfe-echo-function \
--inputs persistent://cfe-tenant/example-namespace/input-topic \
--output persistent://cfe-tenant/example-namespace/output-topic
docker exec -it gs_pulsar /pulsar/bin/pulsar-admin functions delete \
--tenant cfe-tenant \
--namespace example-namespace \
--name cfe-echo-function