Skip to content

Latest commit

 

History

History

combined

Background processes

This example illustrates setting up and deploying persistence components as well as various background processes (scrapers, workers, and scorers), and using their output in the ranker’s serving path to deliver the final results to the user.

Specifically, we show the following components:

  • posts database (Postgres) with users’ historical feeds
  • a job queue (Celery) that can be used to create ad hoc and scheduled jobs that perform background processing/analytics on this database
  • Redis instance to store these processing results for use by the ranking server and/or other stateful algorithms
  • scoring infrastructure (another Celery job queue)
  • scraping infrastructure (another set of Celery scheduled tasks)
  • job scheduler (utility functions to schedule Celery tasks)
  • the ranking server itself

Infrastructure

Ranking server

The ranking server is a FastAPI application that serves the ranking requests. This one is very similar to the FastAPI NLTK example, but simpler. It reads some values from Redis (common named entities from the posts database), which are then used to compute a simple post ranking, showing entries that do not contain any of those words first.

To run just the ranking server outside of Docker:

cd ranking_server
uvicorn ranking_server:app --reload

Posts database

The posts database is stored in SQLite and mirrored in Postgres to better emulate the production deployment. In its most basic form, it can be generated via running preprocessing followed by an ETL script:

Data models

The posts database has the following schema:

CREATE TABLE IF NOT EXISTS posts (
  id SERIAL PRIMARY KEY,
  post_id TEXT,
  session_timestamp TIMESTAMP WITH TIME ZONE,
  session_user_id TEXT,
  platform TEXT,
  type TEXT,
  author_name_hash TEXT,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
  post_blob JSONB
);

CREATE INDEX IF NOT EXISTS idx_created_at ON posts(created_at);
CREATE INDEX IF NOT EXISTS idx_post_id ON posts(post_id);
CREATE INDEX IF NOT EXISTS idx_session_user_id ON posts(session_user_id);

We index on created_at, post_id, and session_user_id.

The post_blob field contains a JSON representation of the ContentItem pydantic model. The other fields are metadata derived from either the ContentItem or Session models.

Seeding the database from sample data

export POSTS_DB_URI=postgres://postgres:postgres@localhost:5432/posts?sslmode=disable
# ^ use the connection string appropriate for your postgres instance
cd $PROJECT_ROOT/sample_data
python preprocessing.py
python seed_post_db.py --no-user-pool

This will bulk-populate the database with posts from the sample data, using a single dummy user and leaving timestamps unmodified.

If the table posts already exists, you will need to supply either the --drop-table or --upsert flags to either drop the table or upsert the data.

It is also possible to simulate a user pool which emulates many aspects of the expected ranking requests from a cohort of users. The following options are used to customize this process:

  • n-users (int): The number of users to generate (default 500).
  • baseline-sessions-per-day (int): Default number of sessions per user per day (default 1)
  • items-per-session (int): Number of items (posts) per session (default 10).
  • activity-distribution (string-valued dict, e.g.: '0.2:20,1:65,5:15'): A dictionary mapping relative activity level values to the relative proportion of users with that activity level. This distribution will be normalized, so the numbers may be floats representing probabilities or unnormalized relative numbers, etc. The keys are the activity levels, which indicate the number of sessions per day relative to the baseline option above. For instance, the value of 5 with the baseline-sessions-per-day of 1 means that these are active users which on average will have 5 sessions per day, (default: ‘1:1’, i.e. all users will have the same baseline activity level 1)
  • platform-distribution (string-valued dict, e.g.: 'facebook:11,reddit:7.4,twitter:1.25'): Relative number of users for each platform. When omitted, the seed script will allocate the number of users proportional to the number of sample posts in the seed data. This is done so that the sample feeds for different platform represent roughly the same time interval, even with data imbalances.

The sample feeds exhibit the following artificial constraints:

  • We employ a simplifying assumption that a user belongs to one platform only, which allows us to easily specify the distribution and sample from it.
  • A sample post will occur once in a feed. In reality, posts may be duplicated across multiple users’ feeds and/or be shown in multiple sessions for a given user.
  • When different activity levels are specified, the number of users may deviate from n-users somewhat due to rejection sampling.
  • Likewise, the earliest feed time for any given platform will vary somewhat due to sampling variance.

It is important to note that when creating the simulated feeds, synthetic timestamps are generated for the sessions and posts. They maintain chronological order of the feed and are computed such that the newest timestamps are (approximately) current time.

Note that in order to get the posts database into Postgres for use by the other examples, it is necessary to have a Postgres instance running and provide its URI in the POSTS_DB_URI environment variable. The easiest way to do this is to use the Docker container provided with the combined infrastructure example.

Finally, it is possible to perform the ETL from the SQLite file into Postgres. Just run the script as follows:

python seed_post_db.py --dbname=sample_posts.db --seed-postgres

Job queue

We use Celery for running async and scheduled jobs. A Redis instance is used both as a broker and result persistence backend. As per standard Celery practices, we define the tasks in a file tasks.py, and use the @app.task decorator to register them.

As per best practices, we avoid using complex data structures as arguments to tasks, as this may require having to deal with serialization issues.

Ad hoc tasks are enqueued using the .delay() method. In addition, we provide an example that sets up a scheduled task via a hook. Consult Celery documentation for more details.

We illustrate two ways of returning results from tasks:

  • using the AsyncResult object and its blocking .get() method
  • explicitly storing the result in the Redis

Job scheduler

Scheduled tasks need to be implemented as Celery tasks. A separate scheduler service triggers their execution. Task definitions and schedules are persisted in Redis. The tasks are bound to the queue specified in the app definition associated with the particular task.

We provide a simple interface to the Redis-backed scheduler in util/scheduler.py. The following functions are available:

  • schedule_tasks(app: Celery, tasks: list[ScheduledTask]): Schedules a list of task to run.
  • clear_old_tasks(app: Celery, queue=None): Removes all scheduled tasks bound to app.conf.task_default_queue (default) or queue (if provided)

The scheduled tasks are created using the ScheduledTask class as follows:

task = ScheduledTask(
    count_top_named_entities,
    args=(10, "2017-05-31", "2017-06-01", result_key),
    kwargs=None,
    options=None,
    interval_seconds=60,
)

Note that clear_old_tasks gets called by schedule_tasks to ensure that the deleted or renamed tasks are not left in the scheduler. If finer grained control is needed (e.g. when all tasks are deleted and now new tasks are provided, hence the schedule_tasks call is not made), the user should call clear_old_tasks.

Parallel scorers

We assume that post-level parallelism can be used when performing certain types of scoring. In order to take advantage of that, we provide a separate Celery job queue.

This scoring is expected to be performed in the serving path, so both the individual scoring tasks as well as queue management need to be optimized for low latency. As such, we expect the queue worker capacity and worker types (e.g. CPU vs GPU) to be provisioned in accordance with anticipated load.

The implementer’s job is to provide both the worker tasks (tasks.py) and the driver code suitable for performing the scoring. Depending on the scoring algorithm(s), there are multiple considerations that may be addressed, such as:

  • running a heterogeneous workload, e.g. with multiple task types
  • gracefully handling failures, e.g. enabling partial results
  • fine-grained control over task execution time limits
  • tuning and profiling

Given the flexibility of Celery and Python, there are multiple valid ways to achieve the above goals. We provide two examples to illustrate possible ways to set up the job queue, and briefly discuss alternatives in the module and function documentation.

  • scorer_worker/scorer_basic.py
  • scorer_worker/scorer_advanced.py

Advanced example in particular makes an attempt to address the above-mentioned complexities.

Note that we keep the Celery application definition and celery task definitions in separate modules, and enqueue the registered tasks using string-valued names. This allows us to simplify deployment by avoiding importing the task dependencies in the server.

To ensure that the scoring examples works as expected on your system, we provide tests:

cd scorer_worker
make test

Scrapers

Please see scraper_worker/README.org

Redis

A Redis instance is used not only as part of Celery deployment, but also as a general-purpose data store for persisting and consuming results of async processing. Because of memory constraints, it is discouraged to store large result sets in Redis.

Redis is the primary way that your offline tasks will communicate with the components in the serving path.

If your Celery jobs need to store large intermediate results, you may want to consider changing the backend so as to not overload the Redis instance. For example, you could use the SQLAlchemy backend to store results in a local database file. You can assume that you will have a single worker host, so the local filesystem is a fine place for data that is not needed outside the worker.

Setting up your environment

The combined example relies on the following:

  • docker compose to run Redis, Celery and Postgres
  • an assortment of Python libraries, including celery, redis-py, pytest, pandas, nltk, etc.
  • sample data (see “Running tests” below)

You will need to use a virtual environment using your preferred tool. We recommend Poetry, as this is what the PRC infrastructure team is currently using. Please ensure your Poetry version is at least 1.8.3.

When setting up this project for the first time, at the repo root, run

poetry install --no-root

This will install only the dependencies listed in pyproject.toml without trying to install the current project as a package.

For Conda users, you may refer to environment.yml. To use it: conda env create -f environment.yml To re-export dependencies, use conda env export > environment.yml. Please note that Conda flows are no longer maintained and we don’t guarantee that this file is up to date.

Running tests

We provide a makefile to run tests. You can run the tests using make test in repo root.

This will spin up Redis + db containers, run the tests, and tear down the containers. Running this command is a good way to ensure that your environment is set up correctly.

Be aware that due to the way pytest interacts with Celery, you need to ensure that no other Celery workers are running when you run the tests.

Note also that test code runs outside docker, therefore Redis and Postgres are made available in the localhost test environment via port mappings in the docker compose file.

Test rely on sample data being present in a test database (posts_test_db). Note that this is a different database from the one discuss in the sample data section above, and contains a small subset of test records.

We provide a script that seeds both the test database and the sample posts database (with a simulated user pool). You are welcome to adjust the sample post database seeding process as best fits your needs. This script is available as ci.sh, and is best executed as make ci in repo root, as it relies on the db container.

Simplified script is reproduced below for your reference:

# Postgres connection setup. This will depend on your specific port mappings
# and other settings.
PGHOST=localhost
PGPORT=5435

PGDB_PYTEST_TESTS=posts_test_db
PGDB_SAMPLE_DATA=posts

POSTS_DB_URI_PYTEST_TESTS=postgresql://postgres:postgres@${PGHOST}:${PGPORT}/${PGDB_PYTEST_TESTS}
export POSTS_DB_URI=postgresql://postgres:postgres@${PGHOST}:${PGPORT}/${PGDB_SAMPLE_DATA}

cd "${PROJECT_ROOT}/sample_data" || exit 1

POSTS_DB_URI=${POSTS_DB_URI_PYTEST_TESTS} poetry run python seed_post_db.py --dbname ${TEST_POSTS_DB} --seed-postgres

# Setup sample posts database
# This simulates a pool of 500 users with varying activity levels.

poetry run python seed_post_db.py --n-users=500 --activity-distribution=0.2:20,1:65,5:15

Running examples

  1. Launch Celery, Redis, and FastAPI using make run or docker compose up --build.
  2. Run the sample tasks via python worker.py.