Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an example that demonstrates sending event data from CSV files #57

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/__init__.py
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions examples/processing_large_csv/cio_event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Sample code to demonstrate sending data, contained in CSVs, to CustomerIO as events. This script uses Python3.
"""
import csv
import time
import logging
import os
from multiprocessing.pool import ThreadPool as Pool
from customerio import CustomerIO, CustomerIOException

logger = logging.getLogger(__name__)


class CIOEventManager(object):
def __init__(self, cio_site_id, cio_api_key, csv_full_file_path):
self.site_id = cio_site_id
self.api_key = cio_api_key
self.csv_full_file_path = os.path.normpath(csv_full_file_path)
self.cio_api_client = CustomerIO(self.site_id, self.api_key, retries=5)
self.event_name = None

def _read_csv(self):
with open(self.csv_full_file_path, 'r') as csv_file:
data = csv.DictReader(csv_file)
for line in data:
yield line

def _send_data_to_cio(self, data):
try:
# Assume each CSV row looks like
# {'age': '23', 'created_at': '', 'location': 'Boston', 'id': 'id: 1', 'unsubscribed': 'false'}
self.cio_api_client.track(customer_id=data['id'], name=self.event_name)
except CustomerIOException as e:
# log the CSV row and exception and move to processing the next row
logger.exception("Failed to send the row data: {row}. Error message={message}".format(row=data,
message=e.message))

def pre_process(self):
"""Use this method to do some tasks before the CSV is processed"""
pass

def send_events(self, no_of_processes=4):
"""
First runs some pre-processing. Read the CSV and send data from each row to CustomerIO. Then it finally
runs some post-processing.
"""
self.pre_process()
csv = self._read_csv()
pool = Pool(processes=no_of_processes)
for row in csv:
pool.map(self._send_data_to_cio, (row,))
rate_limit_in_secs = 0.03 # rate limiting 30 requests per sec (i.e. 1 / 30 = 0.03)
time.sleep(rate_limit_in_secs)
self._post_process()
pool.close()
pool.join()

def _post_process(self):
"""Use this method to do some tasks after the CSV is processed"""
pass
25 changes: 25 additions & 0 deletions examples/processing_large_csv/readme.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Sending event data to CustomerIO from CSVs


Let's say you have large amounts of data in CSV. And you need to process
each row, and then send data from that row to CustomerIO.

Well, there are many ways one to do this. But we have demonstrated one way to do that
in the file `cio_event_manager.py`.

To run the code, you will need the following data:
* Site-ID and API Key from your CustomerIO workspace
* Event name
* Full CSV file path


And then run the code like this:
````
cio_event_manager = CIOEventManager(site_id, api_key, full_file_path_in_str)
cio_event_manager.event_name = 'purchase'
cio_event_manager.send_events()
````

Behind the scenes, the CSV will be read, and data will be sent to CustomerIO. To speed
up the process of sending data, parallelism is demonstrated using Python's `Multiprocessing` module.