Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: formula queries in EndpointTraceItemTable #6844

Merged
merged 9 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.12
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.57
sentry-protos==0.1.58
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new protobuf definition with support for formulas, see getsentry/sentry-protos#105

sentry-redis-tools==0.3.0
sentry-relay==0.9.5
sentry-sdk==2.18.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
extract_response_meta,
setup_trace_query_settings,
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
ExtrapolationContext,
Expand Down Expand Up @@ -297,6 +298,9 @@ def trace_item_type(cls) -> TraceItemType.ValueType:
return TraceItemType.TRACE_ITEM_TYPE_SPAN

def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
if len(in_msg.expressions) > 0:
raise BadSnubaRPCRequestException("expressions field not yet implemented")

snuba_request = _build_snuba_request(in_msg)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
Expand Down
139 changes: 91 additions & 48 deletions snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
AggregationComparisonFilter,
AggregationFilter,
Column,
TraceItemColumnValues,
TraceItemTableRequest,
TraceItemTableResponse,
Expand Down Expand Up @@ -55,6 +56,13 @@

_DEFAULT_ROW_LIMIT = 10_000

OP_TO_EXPR = {
Column.BinaryFormula.OP_ADD: f.plus,
Column.BinaryFormula.OP_SUBTRACT: f.minus,
Column.BinaryFormula.OP_MULTIPLY: f.multiply,
Column.BinaryFormula.OP_DIVIDE: f.divide,
}


def aggregation_filter_to_expression(agg_filter: AggregationFilter) -> Expression:
op_to_expr = {
Expand Down Expand Up @@ -125,9 +133,81 @@ def _convert_order_by(
expression=aggregation_to_expression(x.column.aggregation),
)
)
elif x.column.HasField("formula"):
res.append(
OrderBy(
direction=direction,
expression=_formula_to_expression(x.column.formula),
kylemumma marked this conversation as resolved.
Show resolved Hide resolved
)
)
return res


def _get_reliability_context_columns(column: Column) -> list[SelectedExpression]:
"""
extrapolated aggregates need to request extra columns to calculate the reliability of the result.
this function returns the list of columns that need to be requested.
"""
if not column.HasField("aggregation"):
return []

if (
column.aggregation.extrapolation_mode
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
):
context_columns = []
confidence_interval_column = get_confidence_interval_column(column.aggregation)
if confidence_interval_column is not None:
context_columns.append(
SelectedExpression(
name=confidence_interval_column.alias,
expression=confidence_interval_column,
)
)

average_sample_rate_column = get_average_sample_rate_column(column.aggregation)
count_column = get_count_column(column.aggregation)
context_columns.append(
SelectedExpression(
name=average_sample_rate_column.alias,
expression=average_sample_rate_column,
)
)
context_columns.append(
SelectedExpression(name=count_column.alias, expression=count_column)
)
return context_columns
return []


def _formula_to_expression(formula: Column.BinaryFormula) -> Expression:
return OP_TO_EXPR[formula.op](
_column_to_expression(formula.left),
_column_to_expression(formula.right),
)


def _column_to_expression(column: Column) -> Expression:
"""
Given a column protobuf object, translates it into a Expression object and returns it.
"""
if column.HasField("key"):
return attribute_key_to_expression(column.key)
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(column.aggregation)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
return function_expr
elif column.HasField("formula"):
formula_expr = _formula_to_expression(column.formula)
formula_expr = replace(formula_expr, alias=column.label)
return formula_expr
else:
raise BadSnubaRPCRequestException(
"Column is not one of: aggregate, attribute key, or formula"
)


def _build_query(request: TraceItemTableRequest) -> Query:
# TODO: This is hardcoded still
entity = Entity(
Expand All @@ -138,54 +218,15 @@ def _build_query(request: TraceItemTableRequest) -> Query:

selected_columns = []
for column in request.columns:
if column.HasField("key"):
key_col = attribute_key_to_expression(column.key)
# The key_col expression alias may differ from the column label. That is okay
# the attribute key name is used in the groupby, the column label is just the name of
# the returned attribute value
selected_columns.append(
SelectedExpression(name=column.label, expression=key_col)
)
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(column.aggregation)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)

if (
column.aggregation.extrapolation_mode
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
):
confidence_interval_column = get_confidence_interval_column(
column.aggregation
)
if confidence_interval_column is not None:
selected_columns.append(
SelectedExpression(
name=confidence_interval_column.alias,
expression=confidence_interval_column,
)
)

average_sample_rate_column = get_average_sample_rate_column(
column.aggregation
)
count_column = get_count_column(column.aggregation)
selected_columns.append(
SelectedExpression(
name=average_sample_rate_column.alias,
expression=average_sample_rate_column,
)
)
selected_columns.append(
SelectedExpression(name=count_column.alias, expression=count_column)
)
else:
raise BadSnubaRPCRequestException(
"Column is neither an aggregate or an attribute"
# The key_col expression alias may differ from the column label. That is okay
# the attribute key name is used in the groupby, the column label is just the name of
# the returned attribute value
selected_columns.append(
SelectedExpression(
name=column.label, expression=_column_to_expression(column)
)
)
selected_columns.extend(_get_reliability_context_columns(column))

res = Query(
from_clause=entity,
Expand Down Expand Up @@ -255,9 +296,11 @@ def _convert_results(
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("aggregation"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("formula"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
else:
raise BadSnubaRPCRequestException(
"column is neither an attribute or aggregation"
"column is not one of: attribute, aggregation, or formula"
)

res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _build_query(request: TraceItemTableRequest) -> Query:
)
else:
raise BadSnubaRPCRequestException(
"requested attribute is not a column (aggregation not supported for logs)"
"requested attribute is not a column (aggregation and formulanot supported for logs)"
)

res = Query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def _build_query(request: TraceItemTableRequest) -> Query:
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)
elif column.HasField("formula"):
raise BadSnubaRPCRequestException(
"formulas are not supported for uptime checks"
)
else:
raise BadSnubaRPCRequestException(
"Column is neither an aggregate or an attribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,161 @@ def test_sparse_aggregate(self, setup_teardown: Any) -> None:
),
]

def test_agg_formula(self, setup_teardown: Any) -> None:
"""
ensures formulas of aggregates work
ex sum(my_attribute) / count(my_attribute)
"""
span_ts = BASE_TIME - timedelta(minutes=1)
write_eap_span(span_ts, {"kyles_measurement": 6}, 10)
write_eap_span(span_ts, {"kyles_measurement": 7}, 2)

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
)
),
columns=[
Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_DIVIDE,
left=Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_SUM,
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
),
),
label="sum(kyles_measurement)",
),
right=Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_COUNT,
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
),
),
label="count(kyles_measurement)",
),
),
label="sum(kyles_measurement) / count(kyles_measurement)",
),
],
limit=1,
)
response = EndpointTraceItemTable().execute(message)
assert response.column_values == [
TraceItemColumnValues(
attribute_name="sum(kyles_measurement) / count(kyles_measurement)",
results=[
AttributeValue(val_double=(74 / 12)),
],
),
]

def test_non_agg_formula(self, setup_teardown: Any) -> None:
"""
ensures formulas of non-aggregates work
ex: my_attribute + my_other_attribute
"""
span_ts = BASE_TIME - timedelta(minutes=1)
write_eap_span(span_ts, {"kyles_measurement": -1, "my_other_attribute": 1}, 4)
write_eap_span(span_ts, {"kyles_measurement": 3, "my_other_attribute": 2}, 2)
write_eap_span(span_ts, {"kyles_measurement": 10, "my_other_attribute": 3}, 1)

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
)
),
columns=[
Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_ADD,
left=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
),
right=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="my_other_attribute"
)
),
),
label="kyles_measurement + my_other_attribute",
),
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_ADD,
left=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
)
),
right=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="my_other_attribute",
)
),
),
label="kyles_measurement + my_other_attribute",
)
),
],
limit=50,
)
response = EndpointTraceItemTable().execute(message)
assert response.column_values == [
TraceItemColumnValues(
attribute_name="kyles_measurement + my_other_attribute",
results=[
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=5),
AttributeValue(val_double=5),
AttributeValue(val_double=13),
],
),
]


class TestUtils:
def test_apply_labels_to_columns_backward_compat(self) -> None:
Expand Down
Loading
Loading