Skip to content

Commit

Permalink
Cherrypicks v3.3.0rc0 (#1991)
Browse files Browse the repository at this point in the history
* SNOW-1843926: SPCS service events & metrics (#1954)

* spcs service events&metrics

* snapshot and feature flag

* add new test

* fix test

* new table format and new test and new snapshot

* fix comment

* release notes

* fix comments

* snapshot update

* update snapshots

---------

Co-authored-by: Teja Kommineni <teja.kommineni@snowflake.com>
Co-authored-by: Patryk Czajka <patryk.czajka@snowflake.com>

* Refactor unreleased commands and hide released spcs flags instead of blocking (#1989)

* disable spcs commands if feature flag is disabled; hide logs instead of rising an error

* Hide flag instead of command, fix tests

* update snapshots

* remove streaming flag

---------

Co-authored-by: Abby Shen <abby.shen@snowflake.com>
Co-authored-by: Teja Kommineni <teja.kommineni@snowflake.com>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent 2045936 commit 88c05f3
Show file tree
Hide file tree
Showing 9 changed files with 853 additions and 68 deletions.
7 changes: 6 additions & 1 deletion RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
## Deprecations

## New additions
* Add `snow spcs service events` command to retrieve service-specific events:
* Supports filtering by service name, container name, instance ID, time intervals (`--since`, `--until`), and pagination (`--first`, `--last`).
* Use `--all` to fetch all columns.
* Add `snow spcs service metrics` command to fetch service metrics:
* Supports filtering by service name, container name, instance ID, and time intervals (`--since`, `--until`).
* Use `--all` to fetch all columns.

## Fixes and improvements

Expand Down Expand Up @@ -48,7 +54,6 @@
## Fixes and improvements
* Fixed inability to add patches to lowercase quoted versions.
* Fixes label being set to blank instead of None when not provided.
* Added a feature flag `ENABLE_SPCS_LOG_STREAMING` to control the rollout of the log streaming feature.


# v3.2.2
Expand Down
129 changes: 129 additions & 0 deletions src/snowflake/cli/_plugins/spcs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from __future__ import annotations

import json
import sys
from datetime import datetime
from typing import TextIO

from click import ClickException
Expand All @@ -23,6 +25,22 @@
from snowflake.cli.api.project.util import unquote_identifier
from snowflake.connector.errors import ProgrammingError

EVENT_COLUMN_NAMES = [
"TIMESTAMP",
"START_TIMESTAMP",
"OBSERVED_TIMESTAMP",
"TRACE",
"RESOURCE",
"RESOURCE_ATTRIBUTES",
"SCOPE",
"SCOPE_ATTRIBUTES",
"RECORD_TYPE",
"RECORD",
"RECORD_ATTRIBUTES",
"VALUE",
"EXEMPLARS",
]

if not sys.stdout.closed and sys.stdout.isatty():
GREEN = "\033[32m"
BLUE = "\033[34m"
Expand Down Expand Up @@ -124,5 +142,116 @@ def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> li
return new_log_records_sorted


def build_resource_clause(
service_name: str, instance_id: str, container_name: str
) -> str:
resource_filters = []
if service_name:
resource_filters.append(
f"resource_attributes:\"snow.service.name\" = '{service_name}'"
)
if instance_id:
resource_filters.append(
f"(resource_attributes:\"snow.service.instance\" = '{instance_id}' "
f"OR resource_attributes:\"snow.service.container.instance\" = '{instance_id}')"
)
if container_name:
resource_filters.append(
f"resource_attributes:\"snow.service.container.name\" = '{container_name}'"
)
return " and ".join(resource_filters) if resource_filters else "1=1"


def build_time_clauses(
since: str | datetime | None, until: str | datetime | None
) -> tuple[str, str]:
since_clause = ""
until_clause = ""

if isinstance(since, datetime):
since_clause = f"and timestamp >= '{since}'"
elif isinstance(since, str) and since:
since_clause = f"and timestamp >= sysdate() - interval '{since}'"

if isinstance(until, datetime):
until_clause = f"and timestamp <= '{until}'"
elif isinstance(until, str) and until:
until_clause = f"and timestamp <= sysdate() - interval '{until}'"

return since_clause, until_clause


def format_event_row(event_dict: dict) -> dict:
try:
resource_attributes = json.loads(event_dict.get("RESOURCE_ATTRIBUTES", "{}"))
record_attributes = json.loads(event_dict.get("RECORD_ATTRIBUTES", "{}"))
record = json.loads(event_dict.get("RECORD", "{}"))

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get("snow.service.instance", "N/A")
container_name = resource_attributes.get("snow.service.container.name", "N/A")
event_name = record_attributes.get("event.name", "Unknown Event")
event_value = event_dict.get("VALUE", "Unknown Value")
severity = record.get("severity_text", "Unknown Severity")

return {
"TIMESTAMP": event_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"SEVERITY": severity,
"EVENT NAME": event_name,
"EVENT VALUE": event_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing event row.")


def format_metric_row(metric_dict: dict) -> dict:
try:
resource_attributes = json.loads(metric_dict["RESOURCE_ATTRIBUTES"])
record = json.loads(metric_dict["RECORD"])

database_name = resource_attributes.get("snow.database.name", "N/A")
schema_name = resource_attributes.get("snow.schema.name", "N/A")
service_name = resource_attributes.get("snow.service.name", "N/A")
instance_name = resource_attributes.get(
"snow.service.container.instance", "N/A"
)
container_name = resource_attributes.get("snow.service.container.name", "N/A")

metric_name = record["metric"].get("name", "Unknown Metric")
metric_value = metric_dict.get("VALUE", "Unknown Value")

return {
"TIMESTAMP": metric_dict.get("TIMESTAMP", "N/A"),
"DATABASE NAME": database_name,
"SCHEMA NAME": schema_name,
"SERVICE NAME": service_name,
"INSTANCE ID": instance_name,
"CONTAINER NAME": container_name,
"METRIC NAME": metric_name,
"METRIC VALUE": metric_value,
}
except (json.JSONDecodeError, KeyError) as e:
raise RecordProcessingError(f"Error processing metric row.")


class RecordProcessingError(ClickException):
"""Raised when processing an event or metric record fails due to invalid data."""

pass


class SPCSEventTableError(ClickException):
"""Raised when there is an issue related to the SPCS event table."""

pass


class NoPropertiesProvidedError(ClickException):
pass
149 changes: 130 additions & 19 deletions src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
from snowflake.cli.api.commands.snow_typer import SnowTyperFactory
from snowflake.cli.api.constants import ObjectType
from snowflake.cli.api.exceptions import (
FeatureNotEnabledError,
IncompatibleParametersError,
)
from snowflake.cli.api.feature_flags import FeatureFlag
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CollectionResult,
CommandResult,
MessageResult,
QueryJsonValueResult,
Expand All @@ -59,6 +59,38 @@
short_help="Manages services.",
)

# Define common options
container_name_option = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
)

instance_id_option = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
)

since_option = typer.Option(
default="",
help="Fetch events that are newer than this time ago, in Snowflake interval syntax.",
)

until_option = typer.Option(
default="",
help="Fetch events that are older than this time ago, in Snowflake interval syntax.",
)

show_all_columns_option = typer.Option(
False,
"--all",
is_flag=True,
help="Fetch all columns.",
)


def _service_name_callback(name: FQN) -> FQN:
if not is_valid_object_name(name.identifier, max_depth=2, allow_quoted=False):
Expand Down Expand Up @@ -213,18 +245,8 @@ def status(name: FQN = ServiceNameArgument, **options) -> CommandResult:
@app.command(requires_connection=True)
def logs(
name: FQN = ServiceNameArgument,
container_name: str = typer.Option(
...,
"--container-name",
help="Name of the container.",
show_default=False,
),
instance_id: str = typer.Option(
...,
"--instance-id",
help="ID of the service instance, starting with 0.",
show_default=False,
),
container_name: str = container_name_option,
instance_id: str = instance_id_option,
num_lines: int = typer.Option(
DEFAULT_NUM_LINES, "--num-lines", help="Number of lines to retrieve."
),
Expand All @@ -241,24 +263,24 @@ def logs(
False, "--include-timestamps", help="Include timestamps in logs.", is_flag=True
),
follow: bool = typer.Option(
False, "--follow", help="Stream logs in real-time.", is_flag=True
False,
"--follow",
help="Stream logs in real-time.",
is_flag=True,
hidden=True,
),
follow_interval: int = typer.Option(
2,
"--follow-interval",
help="Set custom polling intervals for log streaming (--follow flag) in seconds.",
hidden=True,
),
**options,
):
"""
Retrieves local logs from a service container.
"""
if follow:
if FeatureFlag.ENABLE_SPCS_LOG_STREAMING.is_disabled():
raise FeatureNotEnabledError(
"ENABLE_SPCS_LOG_STREAMING",
"Streaming logs from spcs containers is disabled.",
)
if num_lines != DEFAULT_NUM_LINES:
raise IncompatibleParametersError(["--follow", "--num-lines"])
if previous_logs:
Expand Down Expand Up @@ -297,6 +319,95 @@ def logs(
return StreamResult(cast(Generator[CommandResult, None, None], stream))


@app.command(
requires_connection=True,
is_enabled=FeatureFlag.ENABLE_SPCS_SERVICE_EVENTS.is_enabled,
)
def events(
name: FQN = ServiceNameArgument,
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
first: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the first N events. Cannot be used with --last.",
),
last: int = typer.Option(
default=None,
show_default=False,
help="Fetch only the last N events. Cannot be used with --first.",
),
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform events for a service container.
"""

if first is not None and last is not None:
raise IncompatibleParametersError(["--first", "--last"])

manager = ServiceManager()
events = manager.get_events(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
first=first,
last=last,
show_all_columns=show_all_columns,
)

if not events:
return MessageResult("No events found.")

return CollectionResult(events)


@app.command(
requires_connection=True,
is_enabled=FeatureFlag.ENABLE_SPCS_SERVICE_METRICS.is_enabled,
)
def metrics(
name: FQN = ServiceNameArgument,
container_name: str = container_name_option,
instance_id: str = instance_id_option,
since: str = since_option,
until: str = until_option,
show_all_columns: bool = show_all_columns_option,
**options,
):
"""
Retrieve platform metrics for a service container.
"""

manager = ServiceManager()
if since or until:
metrics = manager.get_all_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
since=since,
until=until,
show_all_columns=show_all_columns,
)
else:
metrics = manager.get_latest_metrics(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
show_all_columns=show_all_columns,
)

if not metrics:
return MessageResult("No metrics found.")

return CollectionResult(metrics)


@app.command(requires_connection=True)
def upgrade(
name: FQN = ServiceNameArgument,
Expand Down
Loading

0 comments on commit 88c05f3

Please sign in to comment.