Skip to content

Commit

Permalink
Exclude PQ (YDS) Stats from Metering Records (ydb-platform#2440)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Mar 11, 2024
1 parent 1fcef17 commit 7f7dc0c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable
if (auto* ingressNode = graph.second.GetValueByPath("IngressBytes.sum")) {
ingress += ingressNode->GetIntegerSafe();
}
// special exclusion for PQ/YDS in YQv1
if (auto* pqIngressNode = graph.second.GetValueByPath("TaskRunner.Source=PqSource.Stage=Total.IngressBytes.sum")) {
ui64 pqIngress = pqIngressNode->GetIntegerSafe();
ingress = ingress > pqIngress ? (ingress - pqIngress) : 0;
}
}
}

Expand Down
50 changes: 50 additions & 0 deletions ydb/tests/fq/yds/test_kill_pq_bill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import time
import json

from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase

import ydb.public.api.protos.draft.fq_pb2 as fq


class TestKillPqBill(TestYdsBase):
@yq_v1
def test_do_not_bill_pq(self, kikimr, client):
self.init_topics("no_pq_bill")

sql = R'''
PRAGMA dq.MaxTasksPerStage="2";
INSERT INTO yds.`{output_topic}`
SELECT Data AS Data
FROM yds.`{input_topic}`;''' \
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)

client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING,
vcpu_time_limit=1).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)

data_1mb = ['1' * 1024 * 1024]
message_count = 15
for _ in range(0, message_count):
self.write_stream(data_1mb)
self.read_stream(message_count)

client.abort_query(query_id)
client.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)

stat = json.loads(client.describe_query(query_id).result.query.statistics.json)

graph_name = "Graph=0"
ingress_bytes = stat[graph_name]["IngressBytes"]["sum"]

assert ingress_bytes >= 15 * 1024 * 1024, "Ingress must be >= 15MB"
assert sum(kikimr.control_plane.get_metering()) == 10
1 change: 1 addition & 0 deletions ydb/tests/fq/yds/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ TEST_SRCS(
test_cpu_quota.py
test_delete_read_rules_after_abort_by_system.py
test_eval.py
test_kill_pq_bill.py
test_mem_alloc.py
test_metrics_cleanup.py
test_pq_read_write.py
Expand Down

0 comments on commit 7f7dc0c

Please sign in to comment.