Skip to content

Commit

Permalink
SNOW-1843926: SPCS service events & metrics (#1954)
Browse files Browse the repository at this point in the history
* spcs service events&metrics

* snapshot and feature flag

* add new test

* fix test

* new table format and new test and new snapshot

* fix comment

* release notes

* fix comments

* snapshot update

* update snapshots

---------

Co-authored-by: Teja Kommineni <teja.kommineni@snowflake.com>
Co-authored-by: Patryk Czajka <patryk.czajka@snowflake.com>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent 30e43a3 commit c3a145d
Show file tree
Hide file tree
Showing 8 changed files with 1,069 additions and 13 deletions.
6 changes: 6 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
* Add publish command to make it easier to manage publishing versions to release channels and updating release directives: `snow app publish`
* Add support for restricting Snowflake user authentication policy to Snowflake CLI-only.
* Added a new command: `snow helpers import-snowsql-connections` allowing to import configuration of connections from SnowSQL.
* Add `snow spcs service events` command to retrieve service-specific events:
* Supports filtering by service name, container name, instance ID, time intervals (`--since`, `--until`), and pagination (`--first`, `--last`).
* Use `--all` to fetch all columns.
* Add `snow spcs service metrics` command to fetch service metrics:
* Supports filtering by service name, container name, instance ID, and time intervals (`--since`, `--until`).
* Use `--all` to fetch all columns.

## Fixes and improvements
* Fixed inability to add patches to lowercase quoted versions
Expand Down
129 changes: 129 additions & 0 deletions src/snowflake/cli/_plugins/spcs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from __future__ import annotations

import json
import sys
from datetime import datetime
from typing import TextIO

from click import ClickException
Expand All @@ -23,6 +25,22 @@
from snowflake.cli.api.project.util import unquote_identifier
from snowflake.connector.errors import ProgrammingError

EVENT_COLUMN_NAMES = [
"TIMESTAMP",
"START_TIMESTAMP",
"OBSERVED_TIMESTAMP",
"TRACE",
"RESOURCE",
"RESOURCE_ATTRIBUTES",
"SCOPE",
"SCOPE_ATTRIBUTES",
"RECORD_TYPE",
"RECORD",
"RECORD_ATTRIBUTES",
"VALUE",
"EXEMPLARS",
]

if not sys.stdout.closed and sys.stdout.isatty():
GREEN = "\033[32m"
BLUE = "\033[34m"
Expand Down Expand Up @@ -124,5 +142,116 @@ def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> li
return new_log_records_sorted


def build_resource_clause(
service_name: str, instance_id: str, container_name: str
) -> str:
resource_filters = []
if service_name:
resource_filters.append(
f"resource_attributes:\"snow.service.name\" = '{service_name}'"
)
if instance_id:
resource_filters.append(
f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' "
f"OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')"
)
if container_name:
resource_filters.append(
f"resource_attributes:\"snow.service.container.name\" = '{container_name}'"
)
return " and ".join(resource_filters) if resource_filters else "1=1"


def build_time_clauses(
since: str | datetime | None, until: str | datetime | None
) -> tuple[str, str]:
since_clause = ""
until_clause = ""

if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"

if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"

return since_clause, until_clause


def format_event_row(event_dict: dict) -> dict:
try:
resource_attributes = json.loads(event_dict.get("RESOURCE_ATTRIBUTES", "{}"))
record_attributes = json.loads(event_dict.get("RECORD_ATTRIBUTES", "{}"))
record = json.loads(event_dict.get("RECORD", "{}"))

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get("snow.service.instance", "N/A")
container_name = resource_attributes.get("snow.service.container.name", "N/A")
event_name = record_attributes.get("event.name", "Unknown Event")
event_value = event_dict.get("VALUE", "Unknown Value")
severity = record.get("severity_text", "Unknown Severity")

return {
"TIMESTAMP": event_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"SEVERITY": severity,
"EVENT NAME": event_name,
"EVENT VALUE": event_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing event row.")


def format_metric_row(metric_dict: dict) -> dict:
try:
resource_attributes = json.loads(metric_dict["RESOURCE_ATTRIBUTES"])
record = json.loads(metric_dict["RECORD"])

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get(
"snow.service.container.instance", "N/A"
)
container_name = resource_attributes.get("snow.service.container.name", "N/A")

metric_name = record["metric"].get("name", "Unknown Metric")
metric_value = metric_dict.get("VALUE", "Unknown Value")

return {
"TIMESTAMP": metric_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"METRIC NAME": metric_name,
"METRIC VALUE": metric_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing metric row.")


class RecordProcessingError(ClickException):
"""Raised when processing an event or metric record fails due to invalid data."""

pass


class SPCSEventTableError(ClickException):
"""Raised when there is an issue related to the SPCS event table."""

pass


class NoPropertiesProvidedError(ClickException):
pass
140 changes: 128 additions & 12 deletions src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from snowflake.cli.api.feature_flags import FeatureFlag
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CollectionResult,
CommandResult,
MessageResult,
QueryJsonValueResult,
Expand All @@ -59,6 +60,38 @@
short_help="Manages services.",
)

# Define common options
container_name_option = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
)

instance_id_option = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
)

since_option = typer.Option(
default="",
help="Fetch events that are newer than this time ago, in Snowflake interval syntax.",
)

until_option = typer.Option(
default="",
help="Fetch events that are older than this time ago, in Snowflake interval syntax.",
)

show_all_columns_option = typer.Option(
False,
"--all",
is_flag=True,
help="Fetch all columns.",
)


def _service_name_callback(name: FQN) -> FQN:
if not is_valid_object_name(name.identifier, max_depth=2, allow_quoted=False):
Expand Down Expand Up @@ -213,18 +246,8 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult:
@app.command(requires_connection=True)
def logs(
name: FQN = ServiceNameArgument,
container_name: str = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
),
instance_id: str = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
),
container_name: str = container_name_option,
instance_id: str = instance_id_option,
num_lines: int = typer.Option(
DEFAULT_NUM_LINES, "--num-lines", help="Number of lines to retrieve."
),
Expand Down Expand Up @@ -297,6 +320,99 @@ def logs(
return StreamResult(cast(Generator[CommandResult, None, None], stream))


@app.command(requires_connection=True)
def events(
name: FQN = ServiceNameArgument,
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
first: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform events for a service container.
"""
if FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_disabled():
raise FeatureNotEnabledError(
"ENABLE_SPCS_SERVICE_EVENTS",
"Service events collection from SPCS event table is disabled.",
)

if first is not None and last is not None:
raise IncompatibleParametersError(["--first", "--last"])

manager = ServiceManager()
events = manager.get_events(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
first=first,
last=last,
show_all_columns=show_all_columns,
)

if not events:
return MessageResult("No events found.")

return CollectionResult(events)


@app.command(requires_connection=True)
def metrics(
name: FQN = ServiceNameArgument,
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform metrics for a service container.
"""
if FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_disabled():
raise FeatureNotEnabledError(
"ENABLE_SPCS_SERVICE_METRICS",
"Service metrics collection from SPCS event table is disabled.",
)

manager = ServiceManager()
if since or until:
metrics = manager.get_all_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
show_all_columns=show_all_columns,
)
else:
metrics = manager.get_latest_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
show_all_columns=show_all_columns,
)

if not metrics:
return MessageResult("No metrics found.")

return CollectionResult(metrics)


@app.command(requires_connection=True)
def upgrade(
name: FQN = ServiceNameArgument,
Expand Down
Loading

0 comments on commit c3a145d

Please sign in to comment.