From 36ba7bdf595b6f03b23ccd7fc5abd14b5ef1456b Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Mon, 3 Feb 2025 17:17:22 -0800 Subject: [PATCH 1/9] Attempt at a failing test case --- posthog/hogql/test/test_resolver.py | 113 ++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/posthog/hogql/test/test_resolver.py b/posthog/hogql/test/test_resolver.py index 0696c52acc09f..92fc2d320aa57 100644 --- a/posthog/hogql/test/test_resolver.py +++ b/posthog/hogql/test/test_resolver.py @@ -24,6 +24,9 @@ from posthog.hogql.test.utils import pretty_dataclasses from posthog.hogql.visitor import clone_expr from posthog.test.base import BaseTest +from posthog.warehouse.models.credential import DataWarehouseCredential +from posthog.warehouse.models.join import DataWarehouseJoin +from posthog.warehouse.models.table import DataWarehouseTable class TestResolver(BaseTest): @@ -694,3 +697,113 @@ def test_property_access_with_tuples_zero_index_error(self): ): node: ast.SelectQuery = self._select(query) resolve_types(node, context, dialect="clickhouse") + + def test_resolve_field_through_linear_joins(self): + credential = DataWarehouseCredential.objects.create( + team=self.team, access_key="_accesskey", access_secret="_secret" + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credential, + url_pattern="", + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credential, + url_pattern="", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="customers", + source_table_key="email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + ) + + self.database = create_hogql_database(self.team.pk) + self.context = HogQLContext(database=self.database, team_id=self.team.pk, enable_select_queries=True) + + node = self._select(""" + SELECT + customer.events.distinct_id + FROM subscriptions + """) + node = cast(ast.SelectQuery, resolve_types(node, self.context, dialect="clickhouse")) + + def test_resolve_field_through_nested_joins(self): + credential = DataWarehouseCredential.objects.create( + team=self.team, access_key="_accesskey", access_secret="_secret" + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credential, + url_pattern="", + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credential, + url_pattern="", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer.email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + ) + + self.database = create_hogql_database(self.team.pk) + self.context = HogQLContext(database=self.database, team_id=self.team.pk, enable_select_queries=True) + + node = self._select(""" + SELECT + events.distinct_id + FROM subscriptions + """) + node = cast(ast.SelectQuery, resolve_types(node, self.context, dialect="clickhouse")) From 2a3f445c39ea9cee31cf163b4e20e989045ac5dd Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 05:07:17 -0800 Subject: [PATCH 2/9] Revert "Attempt at a failing test case" This reverts commit 36ba7bdf595b6f03b23ccd7fc5abd14b5ef1456b. --- posthog/hogql/test/test_resolver.py | 113 ---------------------------- 1 file changed, 113 deletions(-) diff --git a/posthog/hogql/test/test_resolver.py b/posthog/hogql/test/test_resolver.py index 92fc2d320aa57..0696c52acc09f 100644 --- a/posthog/hogql/test/test_resolver.py +++ b/posthog/hogql/test/test_resolver.py @@ -24,9 +24,6 @@ from posthog.hogql.test.utils import pretty_dataclasses from posthog.hogql.visitor import clone_expr from posthog.test.base import BaseTest -from posthog.warehouse.models.credential import DataWarehouseCredential -from posthog.warehouse.models.join import DataWarehouseJoin -from posthog.warehouse.models.table import DataWarehouseTable class TestResolver(BaseTest): @@ -697,113 +694,3 @@ def test_property_access_with_tuples_zero_index_error(self): ): node: ast.SelectQuery = self._select(query) resolve_types(node, context, dialect="clickhouse") - - def test_resolve_field_through_linear_joins(self): - credential = DataWarehouseCredential.objects.create( - team=self.team, access_key="_accesskey", access_secret="_secret" - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="subscriptions", - columns={ - "id": "String", - "customer_id": "String", - }, - credential=credential, - url_pattern="", - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="customers", - columns={ - "id": "String", - "email": "String", - }, - credential=credential, - url_pattern="", - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="subscriptions", - source_table_key="customer_id", - joining_table_name="customers", - joining_table_key="id", - field_name="customer", - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="customers", - source_table_key="email", - joining_table_name="events", - joining_table_key="person.properties.$email", - field_name="events", - ) - - self.database = create_hogql_database(self.team.pk) - self.context = HogQLContext(database=self.database, team_id=self.team.pk, enable_select_queries=True) - - node = self._select(""" - SELECT - customer.events.distinct_id - FROM subscriptions - """) - node = cast(ast.SelectQuery, resolve_types(node, self.context, dialect="clickhouse")) - - def test_resolve_field_through_nested_joins(self): - credential = DataWarehouseCredential.objects.create( - team=self.team, access_key="_accesskey", access_secret="_secret" - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="subscriptions", - columns={ - "id": "String", - "customer_id": "String", - }, - credential=credential, - url_pattern="", - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="customers", - columns={ - "id": "String", - "email": "String", - }, - credential=credential, - url_pattern="", - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="subscriptions", - source_table_key="customer_id", - joining_table_name="customers", - joining_table_key="id", - field_name="customer", - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="subscriptions", - source_table_key="customer.email", - joining_table_name="events", - joining_table_key="person.properties.$email", - field_name="events", - ) - - self.database = create_hogql_database(self.team.pk) - self.context = HogQLContext(database=self.database, team_id=self.team.pk, enable_select_queries=True) - - node = self._select(""" - SELECT - events.distinct_id - FROM subscriptions - """) - node = cast(ast.SelectQuery, resolve_types(node, self.context, dialect="clickhouse")) From 510e0a02b29ab5648d47f12eb973d32b873863b2 Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 05:09:38 -0800 Subject: [PATCH 3/9] Failing test cases for linear and nested joins --- posthog/hogql/database/test/test_database.py | 116 +++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/posthog/hogql/database/test/test_database.py b/posthog/hogql/database/test/test_database.py index 83c5f64121544..d74b8f1573f95 100644 --- a/posthog/hogql/database/test/test_database.py +++ b/posthog/hogql/database/test/test_database.py @@ -707,3 +707,119 @@ def test_database_warehouse_person_id_field_with_events_join(self): assert person_id_field.chain == ["events_data", "person_id"] print_ast(parse_select("SELECT person_id FROM warehouse_table"), context, dialect="clickhouse") + + def test_database_warehouse_experiments_optimized_events_join_resolve_field_through_linear_joins(self): + credentials = DataWarehouseCredential.objects.create( + access_key="test_key", access_secret="test_secret", team=self.team + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="customers", + source_table_key="email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + configuration={"experiments_optimized": True, "experiments_timestamp_key": "created_at"}, + ) + + db = create_hogql_database(team_id=self.team.pk) + + context = HogQLContext( + team_id=self.team.pk, + enable_select_queries=True, + database=db, + ) + + print_ast(parse_select("SELECT customer.events.distinct_id FROM subscriptions"), context, dialect="clickhouse") + + def test_database_warehouse_experiments_optimized_events_join_resolve_field_through_nested_joins(self): + credentials = DataWarehouseCredential.objects.create( + access_key="test_key", access_secret="test_secret", team=self.team + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer.email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + configuration={"experiments_optimized": True, "experiments_timestamp_key": "created_at"}, + ) + + db = create_hogql_database(team_id=self.team.pk) + + context = HogQLContext( + team_id=self.team.pk, + enable_select_queries=True, + database=db, + ) + + print_ast(parse_select("SELECT events.distinct_id FROM subscriptions"), context, dialect="clickhouse") From 92a970428ee1feaf1bc3c2da5e9da01531cf8177 Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 05:16:50 -0800 Subject: [PATCH 4/9] Add test cases for the succeeding scenarios --- posthog/hogql/database/test/test_database.py | 118 ++++++++++++++++++- 1 file changed, 116 insertions(+), 2 deletions(-) diff --git a/posthog/hogql/database/test/test_database.py b/posthog/hogql/database/test/test_database.py index d74b8f1573f95..86dbb400b482a 100644 --- a/posthog/hogql/database/test/test_database.py +++ b/posthog/hogql/database/test/test_database.py @@ -708,7 +708,64 @@ def test_database_warehouse_person_id_field_with_events_join(self): print_ast(parse_select("SELECT person_id FROM warehouse_table"), context, dialect="clickhouse") - def test_database_warehouse_experiments_optimized_events_join_resolve_field_through_linear_joins(self): + def test_database_warehouse_resolve_field_through_linear_joins_basic_join(self): + credentials = DataWarehouseCredential.objects.create( + access_key="test_key", access_secret="test_secret", team=self.team + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="customers", + source_table_key="email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + ) + + db = create_hogql_database(team_id=self.team.pk) + + context = HogQLContext( + team_id=self.team.pk, + enable_select_queries=True, + database=db, + ) + + print_ast(parse_select("SELECT customer.events.distinct_id FROM subscriptions"), context, dialect="clickhouse") + + def test_database_warehouse_resolve_field_through_linear_joins_experiments_optimized_events_join(self): credentials = DataWarehouseCredential.objects.create( access_key="test_key", access_secret="test_secret", team=self.team ) @@ -766,7 +823,64 @@ def test_database_warehouse_experiments_optimized_events_join_resolve_field_thro print_ast(parse_select("SELECT customer.events.distinct_id FROM subscriptions"), context, dialect="clickhouse") - def test_database_warehouse_experiments_optimized_events_join_resolve_field_through_nested_joins(self): + def test_database_warehouse_resolve_field_through_nested_joins_basic_join(self): + credentials = DataWarehouseCredential.objects.create( + access_key="test_key", access_secret="test_secret", team=self.team + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="subscriptions", + columns={ + "id": "String", + "customer_id": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseTable.objects.create( + team=self.team, + name="customers", + columns={ + "id": "String", + "email": "String", + }, + credential=credentials, + url_pattern="s3://test/*", + format=DataWarehouseTable.TableFormat.Parquet, + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer_id", + joining_table_name="customers", + joining_table_key="id", + field_name="customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="subscriptions", + source_table_key="customer.email", + joining_table_name="events", + joining_table_key="person.properties.$email", + field_name="events", + ) + + db = create_hogql_database(team_id=self.team.pk) + + context = HogQLContext( + team_id=self.team.pk, + enable_select_queries=True, + database=db, + ) + + print_ast(parse_select("SELECT events.distinct_id FROM subscriptions"), context, dialect="clickhouse") + + def test_database_warehouse_resolve_field_through_nested_joins_experiments_optimized_events_join(self): credentials = DataWarehouseCredential.objects.create( access_key="test_key", access_secret="test_secret", team=self.team ) From aafcc0f7b722f4940ac4ae3758d9d409caa6bb54 Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 05:29:54 -0800 Subject: [PATCH 5/9] Field fixes --- posthog/hogql/database/test/test_database.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/posthog/hogql/database/test/test_database.py b/posthog/hogql/database/test/test_database.py index 86dbb400b482a..fc6a4509b8ba8 100644 --- a/posthog/hogql/database/test/test_database.py +++ b/posthog/hogql/database/test/test_database.py @@ -718,6 +718,7 @@ def test_database_warehouse_resolve_field_through_linear_joins_basic_join(self): name="subscriptions", columns={ "id": "String", + "created_at": "DateTime64(3, 'UTC')", "customer_id": "String", }, credential=credentials, @@ -751,7 +752,7 @@ def test_database_warehouse_resolve_field_through_linear_joins_basic_join(self): source_table_name="customers", source_table_key="email", joining_table_name="events", - joining_table_key="person.properties.$email", + joining_table_key="person.properties.email", field_name="events", ) @@ -775,6 +776,7 @@ def test_database_warehouse_resolve_field_through_linear_joins_experiments_optim name="subscriptions", columns={ "id": "String", + "created_at": "DateTime64(3, 'UTC')", "customer_id": "String", }, credential=credentials, @@ -808,7 +810,7 @@ def test_database_warehouse_resolve_field_through_linear_joins_experiments_optim source_table_name="customers", source_table_key="email", joining_table_name="events", - joining_table_key="person.properties.$email", + joining_table_key="person.properties.email", field_name="events", configuration={"experiments_optimized": True, "experiments_timestamp_key": "created_at"}, ) @@ -833,6 +835,7 @@ def test_database_warehouse_resolve_field_through_nested_joins_basic_join(self): name="subscriptions", columns={ "id": "String", + "created_at": "DateTime64(3, 'UTC')", "customer_id": "String", }, credential=credentials, @@ -866,7 +869,7 @@ def test_database_warehouse_resolve_field_through_nested_joins_basic_join(self): source_table_name="subscriptions", source_table_key="customer.email", joining_table_name="events", - joining_table_key="person.properties.$email", + joining_table_key="person.properties.email", field_name="events", ) @@ -890,6 +893,7 @@ def test_database_warehouse_resolve_field_through_nested_joins_experiments_optim name="subscriptions", columns={ "id": "String", + "created_at": "DateTime64(3, 'UTC')", "customer_id": "String", }, credential=credentials, @@ -923,7 +927,7 @@ def test_database_warehouse_resolve_field_through_nested_joins_experiments_optim source_table_name="subscriptions", source_table_key="customer.email", joining_table_name="events", - joining_table_key="person.properties.$email", + joining_table_key="person.properties.email", field_name="events", configuration={"experiments_optimized": True, "experiments_timestamp_key": "created_at"}, ) From 8e2dd3cc289fd9dd98b84fd196393053b7a27c6d Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 06:07:22 -0800 Subject: [PATCH 6/9] Properly persist full left and right chain for compare eq --- posthog/warehouse/models/join.py | 40 ++++++++++++++------------------ 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/posthog/warehouse/models/join.py b/posthog/warehouse/models/join.py index 29419d1cb74e7..c41f9d00d8eb4 100644 --- a/posthog/warehouse/models/join.py +++ b/posthog/warehouse/models/join.py @@ -64,21 +64,8 @@ def _join_function( if not join_to_add.fields_accessed: raise ResolutionError(f"No fields requested from {join_to_add.to_table}") - left = parse_expr(_source_table_key) - if isinstance(left, ast.Field): - left.chain = [join_to_add.from_table, *left.chain] - elif isinstance(left, ast.Call) and isinstance(left.args[0], ast.Field): - left.args[0].chain = [join_to_add.from_table, *left.args[0].chain] - else: - raise ResolutionError("Data Warehouse Join HogQL expression should be a Field or Call node") - - right = parse_expr(_joining_table_key) - if isinstance(right, ast.Field): - right.chain = [join_to_add.to_table, *right.chain] - elif isinstance(right, ast.Call) and isinstance(right.args[0], ast.Field): - right.args[0].chain = [join_to_add.to_table, *right.args[0].chain] - else: - raise ResolutionError("Data Warehouse Join HogQL expression should be a Field or Call node") + left = self.__parse_table_key_expression(_source_table_key, join_to_add.from_table) + right = self.__parse_table_key_expression(_joining_table_key, join_to_add.to_table) join_expr = ast.JoinExpr( table=ast.SelectQuery( @@ -119,6 +106,9 @@ def _join_function_for_experiments( if not timestamp_key: raise ResolutionError("experiments_timestamp_key is not set for this join") + left = self.__parse_table_key_expression(self.source_table_key, join_to_add.from_table) + right = self.__parse_table_key_expression(self.joining_table_key, join_to_add.to_table) + whereExpr: list[ast.Expr] = [ ast.CompareOperation( op=ast.CompareOperationOp.Eq, @@ -184,14 +174,9 @@ def _join_function_for_experiments( right=ast.Constant(value="$feature_flag_called"), ), ast.CompareOperation( - left=ast.Field( - chain=[ - join_to_add.from_table, - self.source_table_key, - ] - ), + left=left, op=ast.CompareOperationOp.Eq, - right=ast.Field(chain=[join_to_add.to_table, *self.joining_table_key.split(".")]), + right=right, ), ast.CompareOperation( left=ast.Field( @@ -210,3 +195,14 @@ def _join_function_for_experiments( ) return _join_function_for_experiments + + def __parse_table_key_expression(self, table_key: str, table_name: str) -> ast.Expr: + expr = parse_expr(table_key) + if isinstance(expr, ast.Field): + expr.chain = [table_name, *expr.chain] + elif isinstance(expr, ast.Call) and isinstance(expr.args[0], ast.Field): + expr.args[0].chain = [table_name, *expr.args[0].chain] + else: + raise ResolutionError("Data Warehouse Join HogQL expression should be a Field or Call node") + + return expr From 2d10c40ba3c16487707ab6fa632a2732a59953a1 Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 06:24:10 -0800 Subject: [PATCH 7/9] Add trends test case for 8e2dd3cc289fd9dd98b84fd196393053b7a27c6d --- .../test_experiment_trends_query_runner.py | 251 ++++++++++++++++++ 1 file changed, 251 insertions(+) diff --git a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py index d4b419f2d7fda..e583760b5ad5f 100644 --- a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py +++ b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py @@ -285,6 +285,162 @@ def create_data_warehouse_table_with_usage(self): ) return table_name + def create_data_warehouse_table_with_subscriptions(self): + if not OBJECT_STORAGE_ACCESS_KEY_ID or not OBJECT_STORAGE_SECRET_ACCESS_KEY: + raise Exception("Missing vars") + + fs = s3fs.S3FileSystem( + client_kwargs={ + "region_name": "us-east-1", + "endpoint_url": OBJECT_STORAGE_ENDPOINT, + "aws_access_key_id": OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": OBJECT_STORAGE_SECRET_ACCESS_KEY, + }, + ) + + path_to_s3_object = "s3://" + OBJECT_STORAGE_BUCKET + f"/{TEST_BUCKET}" + + credential = DataWarehouseCredential.objects.create( + access_key=OBJECT_STORAGE_ACCESS_KEY_ID, + access_secret=OBJECT_STORAGE_SECRET_ACCESS_KEY, + team=self.team, + ) + + subscription_table_data = [ + { + "subscription_id": "1", + "subscription_created_at": datetime(2023, 1, 2), + "subscription_customer_id": "user_control_0", + "subscription_amount": 100, + }, + { + "subscription_id": "2", + "subscription_created_at": datetime(2023, 1, 3), + "subscription_customer_id": "user_test_1", + "subscription_amount": 50, + }, + { + "subscription_id": "3", + "subscription_created_at": datetime(2023, 1, 4), + "subscription_customer_id": "user_test_2", + "subscription_amount": 75, + }, + { + "subscription_id": "4", + "subscription_created_at": datetime(2023, 1, 5), + "subscription_customer_id": "user_test_3", + "subscription_amount": 80, + }, + { + "subscription_id": "5", + "subscription_created_at": datetime(2023, 1, 6), + "subscription_customer_id": "user_extra", + "subscription_amount": 90, + }, + ] + + pq.write_to_dataset( + pa.Table.from_pylist(subscription_table_data), + path_to_s3_object, + filesystem=fs, + use_dictionary=True, + compression="snappy", + ) + + subscription_table_name = "subscriptions" + + DataWarehouseTable.objects.create( + name=subscription_table_name, + url_pattern=f"http://host.docker.internal:19000/{OBJECT_STORAGE_BUCKET}/{TEST_BUCKET}/*.parquet", + format=DataWarehouseTable.TableFormat.Parquet, + team=self.team, + columns={ + "subscription_id": "String", + "subscription_created_at": "DateTime64(3, 'UTC')", + "subscription_customer_id": "String", + "subscription_amount": "Int64", + }, + credential=credential, + ) + + customer_table_data = [ + { + "customer_id": "user_control_0", + "customer_created_at": datetime(2023, 1, 1), + "customer_name": "John Doe", + "customer_email": "john.doe@example.com", + }, + { + "customer_id": "user_test_1", + "customer_created_at": datetime(2023, 1, 2), + "customer_name": "Jane Doe", + "customer_email": "jane.doe@example.com", + }, + { + "customer_id": "user_test_2", + "customer_created_at": datetime(2023, 1, 3), + "customer_name": "John Smith", + "customer_email": "john.smith@example.com", + }, + { + "customer_id": "user_test_3", + "customer_created_at": datetime(2023, 1, 6), + "customer_name": "Jane Smith", + "customer_email": "jane.smith@example.com", + }, + { + "customer_id": "user_extra", + "customer_created_at": datetime(2023, 1, 7), + "customer_name": "John Doe Jr", + "customer_email": "john.doejr@example.com", + }, + ] + + pq.write_to_dataset( + pa.Table.from_pylist(customer_table_data), + path_to_s3_object, + filesystem=fs, + use_dictionary=True, + compression="snappy", + ) + + customer_table_name = "customers" + + DataWarehouseTable.objects.create( + name=customer_table_name, + url_pattern=f"http://host.docker.internal:19000/{OBJECT_STORAGE_BUCKET}/{TEST_BUCKET}/*.parquet", + format=DataWarehouseTable.TableFormat.Parquet, + team=self.team, + columns={ + "customer_id": "String", + "customer_created_at": "DateTime64(3, 'UTC')", + "customer_name": "String", + "customer_email": "String", + }, + credential=credential, + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name=subscription_table_name, + source_table_key="subscription_customer_id", + joining_table_name=customer_table_name, + joining_table_key="customer_id", + field_name="subscription_customer", + ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name=subscription_table_name, + source_table_key="subscription_customer.customer_email", + joining_table_name="events", + joining_table_key="person.properties.email", + field_name="events", + configuration={"experiments_optimized": True, "experiments_timestamp_key": "subscription_created_at"}, + ) + + return subscription_table_name + @freeze_time("2020-01-01T12:00:00Z") def test_query_runner(self): feature_flag = self.create_feature_flag() @@ -2203,6 +2359,101 @@ def test_query_runner_with_data_warehouse_series_expected_query(self): self.assertEqual(control_result.absolute_exposure, 7) self.assertEqual(test_result.absolute_exposure, 9) + def test_query_runner_with_data_warehouse_subscriptions_table(self): + table_name = self.create_data_warehouse_table_with_subscriptions() + + feature_flag = self.create_feature_flag() + experiment = self.create_experiment( + feature_flag=feature_flag, + start_date=datetime(2023, 1, 1), + end_date=datetime(2023, 1, 10), + ) + + feature_flag_property = f"$feature/{feature_flag.key}" + + count_query = TrendsQuery( + series=[ + DataWarehouseNode( + id=table_name, + distinct_id_field="subscription_customer_id", + id_field="id", + table_name=table_name, + timestamp_field="subscription_created_at", + math="total", + ) + ] + ) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=None, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + # Populate exposure events + for variant, count in [("control", 7), ("test", 9)]: + for i in range(count): + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id=f"user_{variant}_{i}", + properties={ + "$feature_flag_response": variant, + feature_flag_property: variant, + "$feature_flag": feature_flag.key, + }, + timestamp=datetime(2023, 1, i + 1), + ) + + _create_person( + team=self.team, + distinct_ids=["user_control_0"], + properties={"email": "john.doe@example.com"}, + ) + + _create_person( + team=self.team, + distinct_ids=["user_test_1"], + properties={"email": "jane.doe@example.com"}, + ) + + _create_person( + team=self.team, + distinct_ids=["user_test_2"], + properties={"email": "john.smith@example.com"}, + ) + + _create_person( + team=self.team, + distinct_ids=["user_test_3"], + properties={"email": "jane.smith@example.com"}, + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + + with freeze_time("2023-01-10"): + result = query_runner.calculate() + + trend_result = cast(ExperimentTrendsQueryResponse, result) + + self.assertEqual(len(result.variants), 2) + + control_result = next(variant for variant in trend_result.variants if variant.key == "control") + test_result = next(variant for variant in trend_result.variants if variant.key == "test") + + self.assertEqual(control_result.count, 1) + self.assertEqual(test_result.count, 3) + self.assertEqual(control_result.absolute_exposure, 7) + self.assertEqual(test_result.absolute_exposure, 9) + def test_query_runner_with_invalid_data_warehouse_table_name(self): # parquet file isn't created, so we'll get an error table_name = "invalid_table_name" From c9ffd213efbfd3bf166ae80896aa6ae380ded745 Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 06:49:32 -0800 Subject: [PATCH 8/9] Fix timestamp expression --- posthog/hogql/database/test/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/hogql/database/test/test_database.py b/posthog/hogql/database/test/test_database.py index fc6a4509b8ba8..62f34477f42f8 100644 --- a/posthog/hogql/database/test/test_database.py +++ b/posthog/hogql/database/test/test_database.py @@ -812,7 +812,7 @@ def test_database_warehouse_resolve_field_through_linear_joins_experiments_optim joining_table_name="events", joining_table_key="person.properties.email", field_name="events", - configuration={"experiments_optimized": True, "experiments_timestamp_key": "created_at"}, + configuration={"experiments_optimized": True, "experiments_timestamp_key": "subscriptions.created_at"}, ) db = create_hogql_database(team_id=self.team.pk) From 22860343a03d3fe93f94b241ab485dee2870a07a Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Tue, 4 Feb 2025 07:09:27 -0800 Subject: [PATCH 9/9] Drop test we don't actually want to support --- posthog/hogql/database/test/test_database.py | 59 -------------------- 1 file changed, 59 deletions(-) diff --git a/posthog/hogql/database/test/test_database.py b/posthog/hogql/database/test/test_database.py index 62f34477f42f8..aff2438ad96bd 100644 --- a/posthog/hogql/database/test/test_database.py +++ b/posthog/hogql/database/test/test_database.py @@ -766,65 +766,6 @@ def test_database_warehouse_resolve_field_through_linear_joins_basic_join(self): print_ast(parse_select("SELECT customer.events.distinct_id FROM subscriptions"), context, dialect="clickhouse") - def test_database_warehouse_resolve_field_through_linear_joins_experiments_optimized_events_join(self): - credentials = DataWarehouseCredential.objects.create( - access_key="test_key", access_secret="test_secret", team=self.team - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="subscriptions", - columns={ - "id": "String", - "created_at": "DateTime64(3, 'UTC')", - "customer_id": "String", - }, - credential=credentials, - url_pattern="s3://test/*", - format=DataWarehouseTable.TableFormat.Parquet, - ) - - DataWarehouseTable.objects.create( - team=self.team, - name="customers", - columns={ - "id": "String", - "email": "String", - }, - credential=credentials, - url_pattern="s3://test/*", - format=DataWarehouseTable.TableFormat.Parquet, - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="subscriptions", - source_table_key="customer_id", - joining_table_name="customers", - joining_table_key="id", - field_name="customer", - ) - - DataWarehouseJoin.objects.create( - team=self.team, - source_table_name="customers", - source_table_key="email", - joining_table_name="events", - joining_table_key="person.properties.email", - field_name="events", - configuration={"experiments_optimized": True, "experiments_timestamp_key": "subscriptions.created_at"}, - ) - - db = create_hogql_database(team_id=self.team.pk) - - context = HogQLContext( - team_id=self.team.pk, - enable_select_queries=True, - database=db, - ) - - print_ast(parse_select("SELECT customer.events.distinct_id FROM subscriptions"), context, dialect="clickhouse") - def test_database_warehouse_resolve_field_through_nested_joins_basic_join(self): credentials = DataWarehouseCredential.objects.create( access_key="test_key", access_secret="test_secret", team=self.team