Skip to content

Commit

Permalink
feat(metrics): Add a meta table to counters (#5681)
Browse files Browse the repository at this point in the history
This creates everything needed to test  storing meta information about
metrics. The new table is meant to satisfy queries that are trying to find
metric_ids, tag keys and tag values, but are not interested in the values
associated with the metrics.

In theory this will eventually be done for all the metric types, but for now
this is being used just for counters to test how well this solution actually
solves the problems.

This does three things:

- Add a column to the raw table that can be used to control which data
goes to the meta table
- Create a new table for aggregated meta information
- Create a materialized view to populate that table from the raw data
  • Loading branch information
evanh committed Mar 22, 2024
1 parent 1eb33c5 commit f5f9208
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 0 deletions.
3 changes: 3 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ def get_migrations(self) -> Sequence[str]:
"0027_sets_add_raw_tags_column",
"0028_distributions_add_indexed_tags_column",
"0029_add_use_case_id_index",
"0030_add_record_meta_column",
"0031_counters_meta_table",
"0032_counters_meta_table_mv",
]


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Sequence

from snuba.clickhouse.columns import Column, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.columns import MigrationModifiers as Modifiers


class Migration(migration.ClickhouseNodeMigration):
blocking = False
local_table_name = "generic_metric_counters_raw_local"
dist_table_name = "generic_metric_counters_raw_dist"
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.AddColumn(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
column=Column("record_meta", UInt(8, Modifiers(default=str("0")))),
target=operations.OperationTarget.LOCAL,
after="materialization_version",
),
operations.AddColumn(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
column=Column("record_meta", UInt(8, Modifiers(default=str("0")))),
target=operations.OperationTarget.DISTRIBUTED,
after="materialization_version",
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropColumn(
column_name="record_meta",
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=operations.OperationTarget.DISTRIBUTED,
),
operations.DropColumn(
column_name="record_meta",
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=operations.OperationTarget.LOCAL,
),
]
68 changes: 68 additions & 0 deletions snuba/snuba_migrations/generic_metrics/0031_counters_meta_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget
from snuba.utils.schemas import Float


class Migration(migration.ClickhouseNodeMigration):
blocking = False
granularity = "2048"
local_table_name = "generic_metric_counters_meta_aggregated_local"
dist_table_name = "generic_metric_counters_meta_aggregated_dist"
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS
columns: Sequence[Column[Modifiers]] = [
Column("org_id", UInt(64)),
Column("project_id", UInt(64)),
Column("use_case_id", String(Modifiers(low_cardinality=True))),
Column("metric_id", UInt(64)),
Column("tag_key", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("tag_values", AggregateFunction("groupUniqArray", [String()])),
Column("count", AggregateFunction("sum", [Float(64)])),
]

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
order_by="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)",
primary_key="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)",
partition_by="(retention_days, toMonday(timestamp))",
settings={"index_granularity": self.granularity},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.local_table_name, sharding_key=None
),
columns=self.columns,
target=OperationTarget.DISTRIBUTED,
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=OperationTarget.LOCAL,
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget
from snuba.utils.schemas import Float


class Migration(migration.ClickhouseNodeMigration):
blocking = False
view_name = "generic_metric_counters_meta_aggregation_mv"
dest_table_name = "generic_metric_counters_meta_aggregated_local"
dest_table_columns: Sequence[Column[Modifiers]] = [
Column("org_id", UInt(64)),
Column("project_id", UInt(64)),
Column("use_case_id", String(Modifiers(low_cardinality=True))),
Column("metric_id", UInt(64)),
Column("tag_key", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("tag_values", AggregateFunction("groupUniqArray", [String()])),
Column("value", AggregateFunction("sum", [Float(64)])),
]
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.view_name,
columns=self.dest_table_columns,
destination_table_name=self.dest_table_name,
target=OperationTarget.LOCAL,
query="""
SELECT
org_id,
project_id,
use_case_id,
metric_id,
tag_key,
toStartOfWeek(timestamp) as timestamp,
retention_days,
groupUniqArrayState(tag_value) as `tag_values`,
sumState(count_value) as count
FROM generic_metric_counters_raw_local
ARRAY JOIN
tags.key AS tag_key, tags.raw_value AS tag_value
WHERE record_meta = 1
GROUP BY
org_id,
project_id,
use_case_id,
metric_id,
tag_key,
timestamp,
retention_days
""",
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.view_name,
target=OperationTarget.LOCAL,
)
]

0 comments on commit f5f9208

Please sign in to comment.