Skip to content

Commit

Permalink
Add monitoring worker (#3677)
Browse files Browse the repository at this point in the history
* Add monitoring worker

* Add locks

* Add tenant id to lock

* Remove unneeded tenant postfix
  • Loading branch information
Weves authored Jan 15, 2025
1 parent b195773 commit 3b76955
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 6 deletions.
2 changes: 1 addition & 1 deletion backend/onyx/background/celery/configs/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

# Monitoring worker specific settings
worker_concurrency = 1 # Single worker is sufficient for monitoring
worker_pool = "solo"
worker_pool = "threads"
worker_prefetch_multiplier = 1
34 changes: 29 additions & 5 deletions backend/onyx/background/celery/tasks/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from pydantic import BaseModel
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy import select
from sqlalchemy.orm import Session

from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.tasks.vespa.tasks import celery_get_queue_length
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_db_current_time
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import IndexingStatus
Expand All @@ -28,6 +30,8 @@
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType

_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
_MONITORING_TIME_LIMIT = _MONITORING_SOFT_TIME_LIMIT + 60 # 6 minutes

_CONNECTOR_INDEX_ATTEMPT_START_LATENCY_KEY_FMT = (
"monitoring_connector_index_attempt_start_latency:{cc_pair_id}:{index_attempt_id}"
Expand Down Expand Up @@ -385,7 +389,8 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]

@shared_task(
name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
soft_time_limit=JOB_TIMEOUT,
soft_time_limit=_MONITORING_SOFT_TIME_LIMIT,
time_limit=_MONITORING_TIME_LIMIT,
queue=OnyxCeleryQueues.MONITORING,
bind=True,
)
Expand All @@ -397,7 +402,18 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
- Syncing speed metrics
- Worker status and task counts
"""
task_logger.info("Starting background process monitoring")
task_logger.info("Starting background monitoring")
r = get_redis_client(tenant_id=tenant_id)

lock_monitoring: RedisLock = r.lock(
OnyxRedisLocks.MONITOR_BACKGROUND_PROCESSES_LOCK,
timeout=_MONITORING_SOFT_TIME_LIMIT,
)

# these tasks should never overlap
if not lock_monitoring.acquire(blocking=False):
task_logger.info("Skipping monitoring task because it is already running")
return None

try:
# Get Redis client for Celery broker
Expand All @@ -420,8 +436,16 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
if metric.key:
_mark_metric_as_emitted(redis_std, metric.key)

task_logger.info("Successfully collected background process metrics")

task_logger.info("Successfully collected background metrics")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception as e:
task_logger.exception("Error collecting background process metrics")
raise e
finally:
if lock_monitoring.owned():
lock_monitoring.release()

task_logger.info("Background monitoring task finished")
1 change: 1 addition & 0 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class OnyxRedisLocks:
"da_lock:check_connector_external_group_sync_beat"
)
MONITOR_VESPA_SYNC_BEAT_LOCK = "da_lock:monitor_vespa_sync_beat"
MONITOR_BACKGROUND_PROCESSES_LOCK = "da_lock:monitor_background_processes"

CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX = (
"da_lock:connector_doc_permissions_sync"
Expand Down
19 changes: 19 additions & 0 deletions deployment/cloud_kubernetes/hpa/workers_hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,22 @@ spec:
target:
type: Utilization
averageUtilization: 70
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-monitoring-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-indexing
minReplicas: 1
maxReplicas: 4
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
62 changes: 62 additions & 0 deletions deployment/cloud_kubernetes/workers/monitoring.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker-monitoring
spec:
replicas: 2
selector:
matchLabels:
app: celery-worker-monitoring
template:
metadata:
labels:
app: celery-worker-monitoring
spec:
containers:
- name: celery-worker-monitoring
image: onyxdotapp/onyx-backend-cloud:v0.14.0-cloud.beta.21
imagePullPolicy: IfNotPresent
command:
[
"celery",
"-A",
"onyx.background.celery.versioned_apps.monitoring",
"worker",
"--loglevel=INFO",
"--hostname=monitoring@%n",
"-Q",
"monitoring",
"--prefetch-multiplier=8",
"--concurrency=8",
]
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: onyx-secrets
key: redis_password
- name: ONYX_VERSION
value: "v0.11.0-cloud.beta.8"
envFrom:
- configMapRef:
name: env-configmap
volumeMounts:
- name: vespa-certificates
mountPath: "/app/certs"
readOnly: true
resources:
requests:
cpu: "1000m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "1Gi"
volumes:
- name: vespa-certificates
secret:
secretName: vespa-certificates
items:
- key: cert.pem
path: cert.pem
- key: key.pem
path: key.pem

0 comments on commit 3b76955

Please sign in to comment.