diff --git a/requirements.txt b/requirements.txt index daecffff40..94f53a82fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ python-rapidjson==1.8 redis==4.5.4 sentry-arroyo==2.19.12 sentry-kafka-schemas==0.1.129 -sentry-protos==0.1.57 +sentry-protos==0.1.58 sentry-redis-tools==0.3.0 sentry-relay==0.9.5 sentry-sdk==2.18.0 diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py index 9312a4d7c1..88a01321d4 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py @@ -37,6 +37,7 @@ extract_response_meta, setup_trace_query_settings, ) +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException from snuba.web.rpc.v1.resolvers import ResolverTimeSeries from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( ExtrapolationContext, @@ -297,6 +298,9 @@ def trace_item_type(cls) -> TraceItemType.ValueType: return TraceItemType.TRACE_ITEM_TYPE_SPAN def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: + if len(in_msg.expressions) > 0: + raise BadSnubaRPCRequestException("expressions field not yet implemented") + snuba_request = _build_snuba_request(in_msg) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py index 5408e56075..87635b7c24 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -7,6 +7,7 @@ from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( AggregationComparisonFilter, AggregationFilter, + Column, TraceItemColumnValues, TraceItemTableRequest, TraceItemTableResponse, @@ -55,6 +56,13 @@ _DEFAULT_ROW_LIMIT = 10_000 +OP_TO_EXPR = { + Column.BinaryFormula.OP_ADD: f.plus, + Column.BinaryFormula.OP_SUBTRACT: f.minus, + Column.BinaryFormula.OP_MULTIPLY: f.multiply, + Column.BinaryFormula.OP_DIVIDE: f.divide, +} + def aggregation_filter_to_expression(agg_filter: AggregationFilter) -> Expression: op_to_expr = { @@ -125,9 +133,81 @@ def _convert_order_by( expression=aggregation_to_expression(x.column.aggregation), ) ) + elif x.column.HasField("formula"): + res.append( + OrderBy( + direction=direction, + expression=_formula_to_expression(x.column.formula), + ) + ) return res +def _get_reliability_context_columns(column: Column) -> list[SelectedExpression]: + """ + extrapolated aggregates need to request extra columns to calculate the reliability of the result. + this function returns the list of columns that need to be requested. + """ + if not column.HasField("aggregation"): + return [] + + if ( + column.aggregation.extrapolation_mode + == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED + ): + context_columns = [] + confidence_interval_column = get_confidence_interval_column(column.aggregation) + if confidence_interval_column is not None: + context_columns.append( + SelectedExpression( + name=confidence_interval_column.alias, + expression=confidence_interval_column, + ) + ) + + average_sample_rate_column = get_average_sample_rate_column(column.aggregation) + count_column = get_count_column(column.aggregation) + context_columns.append( + SelectedExpression( + name=average_sample_rate_column.alias, + expression=average_sample_rate_column, + ) + ) + context_columns.append( + SelectedExpression(name=count_column.alias, expression=count_column) + ) + return context_columns + return [] + + +def _formula_to_expression(formula: Column.BinaryFormula) -> Expression: + return OP_TO_EXPR[formula.op]( + _column_to_expression(formula.left), + _column_to_expression(formula.right), + ) + + +def _column_to_expression(column: Column) -> Expression: + """ + Given a column protobuf object, translates it into a Expression object and returns it. + """ + if column.HasField("key"): + return attribute_key_to_expression(column.key) + elif column.HasField("aggregation"): + function_expr = aggregation_to_expression(column.aggregation) + # aggregation label may not be set and the column label takes priority anyways. + function_expr = replace(function_expr, alias=column.label) + return function_expr + elif column.HasField("formula"): + formula_expr = _formula_to_expression(column.formula) + formula_expr = replace(formula_expr, alias=column.label) + return formula_expr + else: + raise BadSnubaRPCRequestException( + "Column is not one of: aggregate, attribute key, or formula" + ) + + def _build_query(request: TraceItemTableRequest) -> Query: # TODO: This is hardcoded still entity = Entity( @@ -138,54 +218,15 @@ def _build_query(request: TraceItemTableRequest) -> Query: selected_columns = [] for column in request.columns: - if column.HasField("key"): - key_col = attribute_key_to_expression(column.key) - # The key_col expression alias may differ from the column label. That is okay - # the attribute key name is used in the groupby, the column label is just the name of - # the returned attribute value - selected_columns.append( - SelectedExpression(name=column.label, expression=key_col) - ) - elif column.HasField("aggregation"): - function_expr = aggregation_to_expression(column.aggregation) - # aggregation label may not be set and the column label takes priority anyways. - function_expr = replace(function_expr, alias=column.label) - selected_columns.append( - SelectedExpression(name=column.label, expression=function_expr) - ) - - if ( - column.aggregation.extrapolation_mode - == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED - ): - confidence_interval_column = get_confidence_interval_column( - column.aggregation - ) - if confidence_interval_column is not None: - selected_columns.append( - SelectedExpression( - name=confidence_interval_column.alias, - expression=confidence_interval_column, - ) - ) - - average_sample_rate_column = get_average_sample_rate_column( - column.aggregation - ) - count_column = get_count_column(column.aggregation) - selected_columns.append( - SelectedExpression( - name=average_sample_rate_column.alias, - expression=average_sample_rate_column, - ) - ) - selected_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) - else: - raise BadSnubaRPCRequestException( - "Column is neither an aggregate or an attribute" + # The key_col expression alias may differ from the column label. That is okay + # the attribute key name is used in the groupby, the column label is just the name of + # the returned attribute value + selected_columns.append( + SelectedExpression( + name=column.label, expression=_column_to_expression(column) ) + ) + selected_columns.extend(_get_reliability_context_columns(column)) res = Query( from_clause=entity, @@ -255,9 +296,11 @@ def _convert_results( converters[column.label] = lambda x: AttributeValue(val_double=float(x)) elif column.HasField("aggregation"): converters[column.label] = lambda x: AttributeValue(val_double=float(x)) + elif column.HasField("formula"): + converters[column.label] = lambda x: AttributeValue(val_double=float(x)) else: raise BadSnubaRPCRequestException( - "column is neither an attribute or aggregation" + "column is not one of: attribute, aggregation, or formula" ) res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) diff --git a/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py index c44ae6aaf4..1f8d32049d 100644 --- a/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_ourlogs/resolver_trace_item_table.py @@ -78,7 +78,7 @@ def _build_query(request: TraceItemTableRequest) -> Query: ) else: raise BadSnubaRPCRequestException( - "requested attribute is not a column (aggregation not supported for logs)" + "requested attribute is not a column (aggregation and formulanot supported for logs)" ) res = Query( diff --git a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py index 1e541b0ea0..d8aa2f42d9 100644 --- a/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_trace_item_table.py @@ -144,6 +144,10 @@ def _build_query(request: TraceItemTableRequest) -> Query: selected_columns.append( SelectedExpression(name=column.label, expression=function_expr) ) + elif column.HasField("formula"): + raise BadSnubaRPCRequestException( + "formulas are not supported for uptime checks" + ) else: raise BadSnubaRPCRequestException( "Column is neither an aggregate or an attribute" diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index b223f6c324..6e3a83e546 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -2307,6 +2307,161 @@ def test_sparse_aggregate(self, setup_teardown: Any) -> None: ), ] + def test_agg_formula(self, setup_teardown: Any) -> None: + """ + ensures formulas of aggregates work + ex sum(my_attribute) / count(my_attribute) + """ + span_ts = BASE_TIME - timedelta(minutes=1) + write_eap_span(span_ts, {"kyles_measurement": 6}, 10) + write_eap_span(span_ts, {"kyles_measurement": 7}, 2) + + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement" + ) + ) + ), + columns=[ + Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="kyles_measurement", + ), + ), + label="sum(kyles_measurement)", + ), + right=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="kyles_measurement", + ), + ), + label="count(kyles_measurement)", + ), + ), + label="sum(kyles_measurement) / count(kyles_measurement)", + ), + ], + limit=1, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="sum(kyles_measurement) / count(kyles_measurement)", + results=[ + AttributeValue(val_double=(74 / 12)), + ], + ), + ] + + def test_non_agg_formula(self, setup_teardown: Any) -> None: + """ + ensures formulas of non-aggregates work + ex: my_attribute + my_other_attribute + """ + span_ts = BASE_TIME - timedelta(minutes=1) + write_eap_span(span_ts, {"kyles_measurement": -1, "my_other_attribute": 1}, 4) + write_eap_span(span_ts, {"kyles_measurement": 3, "my_other_attribute": 2}, 2) + write_eap_span(span_ts, {"kyles_measurement": 10, "my_other_attribute": 3}, 1) + + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement" + ) + ) + ), + columns=[ + Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_ADD, + left=Column( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement" + ) + ), + right=Column( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="my_other_attribute" + ) + ), + ), + label="kyles_measurement + my_other_attribute", + ), + ], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_ADD, + left=Column( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="kyles_measurement", + ) + ), + right=Column( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="my_other_attribute", + ) + ), + ), + label="kyles_measurement + my_other_attribute", + ) + ), + ], + limit=50, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="kyles_measurement + my_other_attribute", + results=[ + AttributeValue(val_double=0), + AttributeValue(val_double=0), + AttributeValue(val_double=0), + AttributeValue(val_double=0), + AttributeValue(val_double=5), + AttributeValue(val_double=5), + AttributeValue(val_double=13), + ], + ), + ] + class TestUtils: def test_apply_labels_to_columns_backward_compat(self) -> None: diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py index 7129f2b781..8499e67ded 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table_extrapolation.py @@ -7,22 +7,28 @@ from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( Column, + TraceItemColumnValues, TraceItemTableRequest, ) from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( AttributeAggregation, AttributeKey, + AttributeValue, ExtrapolationMode, Function, Reliability, ) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events +from tests.web.rpc.v1.test_endpoint_trace_item_table.test_endpoint_trace_item_table import ( + write_eap_span, +) _RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" _SERVER_NAME = "D23CXQ4GK2.local" @@ -708,3 +714,73 @@ def test_count_reliability_with_group_by(self) -> None: assert measurement_reliabilities == [ Reliability.RELIABILITY_LOW, ] # low reliability due to low sample count + + def test_formula(self) -> None: + """ + This test ensures that formulas work with extrapolation. + Reliabilities will not be returned. + """ + span_ts = BASE_TIME - timedelta(minutes=1) + write_eap_span(span_ts, {"kyles_measurement": 6, "server_sample_rate": 0.5}, 10) + write_eap_span(span_ts, {"kyles_measurement": 7}, 2) + + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement" + ) + ) + ), + columns=[ + Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="kyles_measurement", + ), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ), + label="sum(kyles_measurement)", + ), + right=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="kyles_measurement", + ), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ), + label="count(kyles_measurement)", + ), + ), + label="sum(kyles_measurement) / count(kyles_measurement)", + ), + ], + limit=1, + ) + response = EndpointTraceItemTable().execute(message) + assert response.column_values == [ + TraceItemColumnValues( + attribute_name="sum(kyles_measurement) / count(kyles_measurement)", + results=[ + AttributeValue(val_double=(134 / 22)), + ], + ), + ]