Spark-приложение для потоковой обработки данных олимпийских достижений и рекордов
Все настройки приложения находятся в файле config.properties
в ресурсной директории проетка.
Для успешного запуска приложение требует подключение к очереди Google Pub/Sub.
Для локальной разработки рекомендуется использовать эмулятор данной очереди.
Эмулятор устанавливается отдельным модулем к утилите gcloud
и запускается
как веб-сервер на 8085 порту по умолчанию. Команды для запуска сервера
и остальное описание CLI можно найти тут.
Для обработки сообщений из очереди ее нужно наполнить данными. Для этого используется датасет
отсюда.
CSV-файл athlete_events.csv
достаточно положить в ресурсную директорию.
Рекомендуемым способом работы с эмулятором является официальный образ docker-контейнера google/cloud-sdk.
Эмулятор запускается в интерактивном режиме (-it
) и показывает свой access-лог.
После отправки прерывания или завершения процесса иным способом контейнер удалится.
docker run --rm -it --volumes-from gcloud-config --network host --name pub-sub \
google/cloud-sdk gcloud beta emulators pubsub start --project=olympics-269511
Важно учесть, что здесь и далее при запуске контейнеров образа google/cloud-sdk
рекомендуется указывать ключ --volumes-from gcloud-config
, где gcloud-config
-
контейнер с данными об авторизации в сервисах Google.
Если приведенная выше команда не работает, попробуйте запустить образ так (на Windows
также придется удалить символы \
в конце строк и объединить их в одну):
docker run --rm -it --volumes-from gcloud-config -p "8085:8085" --name pub-sub \
google/cloud-sdk gcloud beta emulators pubsub start --project=olympics-269511 \
--host-port=0.0.0.0:8085
docker run -it --name gcloud-config google/cloud-sdk gcloud auth login
Для запуска эмулятора вне контейнера нужно установить его модуль для утилиты gcloud
:
gcloud components install pubsub-emulator
gcloud components update
После этого будут доступны команды beta emulators
. Для запуска установленного
эмулятора введите следующую команду:
gcloud beta emulators pubsub start --project=olympics-269511
Рассматривается также вариант работы с Pub/Sub напрямую без эмулятора. Для этого вы должны быть членом проекта Google Cloud и иметь JSON-файл, содержащий приватный ключ для аутентификации сервиса. Этот файл можно найти на странице проекта: APIs & Services > Credentials.
После скачивания JSON-файла из раздела Service Accounts этой же страницы его нужно поместить
в любую удобную вам директорию и указать на его путь переменную среды GOOGLE_APPLICATION_CREDENTIALS
.
Подробнее об этом можно прочесть тут.
Для управления зависимостями используется Maven. После mvn install
вам должны быть доступны
все необходимые для запуска и разработки пакеты.
Чтобы запустить приложение на Dataproc нужно выполнить ряд действий в Cloud Shell для подготовки окружения.
Включение необходимых сервисов:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com
Создания топика и подписки в очереди Pub/Sub:
export TOPIC=olympics-topic
export SUBSCRIPTION=olympics-sub
gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create $SUBSCRIPTION --topic=$TOPIC
Создание датасета в BigQuery:
export DATASET=dataset
bq --location=europe-west3 mk \
--dataset \
$DATASET
Создание сервисного аккаунта и добавление необходимых привилегий:
export PROJECT=$(gcloud info --format='value(config.project)')
export SERVICE_ACCOUNT_NAME=dataproc-service-account
export SERVICE_ACCOUNT_ADDRESS=$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:$SERVICE_ACCOUNT_ADDRESS"
gcloud beta pubsub subscriptions add-iam-policy-binding \
$SUBSCRIPTION \
--role roles/pubsub.subscriber \
--member="serviceAccount:$SERVICE_ACCOUNT_ADDRESS"
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/bigquery.dataEditor \
--member="serviceAccount:$SERVICE_ACCOUNT_ADDRESS"
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/bigquery.jobUser \
--member="serviceAccount:$SERVICE_ACCOUNT_ADDRESS"
Запуск Dataproc кластеров:
export CLUSTER=demo-cluster
gcloud dataproc clusters create demo-cluster \
--region=europe-west3 \
--zone=europe-west3-a \
--master-machine-type=n1-standard-4 \
--num-workers=2 \
--worker-machine-type=n1-standard-2 \
--scopes=pubsub,bigquery \
--image-version=1.2 \
--service-account=$SERVICE_ACCOUNT_ADDRESS
После успешного выполнения предыдущих действий вам нужно подготовить хранилище для датасета и исполняемых jar-файлов. Для этого:
- создайте bucket (
gsutil mb -l europe-west3 gs://my-very-own-bucket-1
) - загрузите туда датасет в формате csv
- загрузите также необходимые jar-файлы
Используемый коннектор для BigQuery требует временного хранилища, которое должно находится в том же регионе, что и хранилище с jar-файлом. Про компиляцию в jar-файл подробнее смотрите далее.
export TMP_BUCKET=some-bucket-543645434
gsutil mb -l europe-west3 gs://$TMP_BUCKET
Spark-job получает сообщения из Pub/Sub подписки, которая была создана на этапе подготовки проекта. На момент создания подписки она пуста. Чтобы загрузить в очередь датасет используйте генератор. Для этого:
- загрузите исходный код генератора из папки
generator
в корне проекта на VM instance - создайте виртуальное окружение для python
- установите требуемые зависимости из файла
requirements.txt
(may take some time) - запустите генератор в фоне
export PROJECT=$(gcloud info --format='value(config.project)')
export GENERATOR_PATH=~/generator
export TOPIC=olympics-topic
export BUCKET=my-very-own-bucket-1
export DIRECTORY=data
cd $GENERATOR_PATH
virtualenv venv
source venv/bin/activate
pip install -r requirements.txt
python generatord.py $PROJECT $TOPIC $BUCKET $DIRECTORY 5 50000 &
deactivate
NOTE: Здесь переменные $BUCKET
и $GENERATOR_PATH
могут разниться с тем,
что есть у вас.
Для того, чтобы скомпилировать jar-файл для запуска, клонируйте этот репозиторий и соберите package при помощи Maven:
- задайте нужные настройки в файле
config.properties
в папкеresources
- установите зависимости (
mvn install
) - соберите package (
mvn clean package
)
Исполняемый файл готов и находится в папке target
. Перенесите его в свой bucket.
Еще раз убедитесь, что проект Google Cloud собран правильно, а именно:
- в подписке Pub/Sub есть сообщения
- jar-файл находится в bucket
- датасет в BigQuery создан
- временный bucket создан и находится в том же регионе
Запустите Spark-job через Cloud Sell:
export PROJECT=$(gcloud info --format='value(config.project)')
export CLUSTER=demo-cluster
export BUCKET=my-very-own-bucket-1
export JAR_NAME=olympics-spark-1.0-SNAPSHOT.jar
export JAR="gs://$BUCKET/jars/$JAR_NAME"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="10 10 5 hdfs:///user/checkpoint"
gcloud dataproc jobs submit spark \
--region=europe-west3 \
--cluster $CLUSTER \
--async \
--jar $JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS