Skip to content

Commit

Permalink
feat: Update Rapid Evaluation Service QPS. Add a customizable evaluat…
Browse files Browse the repository at this point in the history
…ion service QPS parameter.

PiperOrigin-RevId: 656085181
  • Loading branch information
jsondai authored and copybara-github committed Jul 25, 2024
1 parent 0621306 commit 9ee9289
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
2 changes: 2 additions & 0 deletions vertexai/preview/evaluation/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class EvaluationRunConfig:
metrics: The list of metric names, or metric bundle names, or Metric instances to evaluate.
column_map: The dictionary of column name overrides in the dataset.
client: The evaluation service client.
evaluation_service_qps: The custom QPS limit for the evaluation service.
retry_timeout: How long to keep retrying the evaluation requests, in seconds.
"""

dataset: "pd.DataFrame"
metrics: List[Union[str, metrics_base._Metric]]
column_map: Dict[str, str]
client: gapic_evaluation_services.EvaluationServiceClient
evaluation_service_qps: float
retry_timeout: float

def validate_dataset_column(self, column_name: str) -> None:
Expand Down
9 changes: 9 additions & 0 deletions vertexai/preview/evaluation/_eval_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def _evaluate_with_experiment(
prompt_template: Optional[str] = None,
experiment_run_name: Optional[str] = None,
response_column_name: Optional[str] = None,
evaluation_service_qps: Optional[float] = None,
retry_timeout: float = 600.0,
) -> EvalResult:
"""Runs an evaluation for the EvalTask with an experiment.
Expand All @@ -271,6 +272,7 @@ def _evaluate_with_experiment(
unique experiment run name is used.
response_column_name: The column name of model response in the dataset. If
provided, this will override the `response_column_name` of the `EvalTask`.
evaluation_service_qps: The custom QPS limit for the evaluation service.
retry_timeout: How long to keep retrying the evaluation requests for
the whole evaluation dataset, in seconds.
Expand All @@ -288,6 +290,7 @@ def _evaluate_with_experiment(
content_column_name=self.content_column_name,
reference_column_name=self.reference_column_name,
response_column_name=response_column_name,
evaluation_service_qps=evaluation_service_qps,
retry_timeout=retry_timeout,
)

Expand All @@ -308,6 +311,7 @@ def evaluate(
prompt_template: Optional[str] = None,
experiment_run_name: Optional[str] = None,
response_column_name: Optional[str] = None,
evaluation_service_qps: Optional[float] = None,
retry_timeout: float = 600.0,
) -> EvalResult:
"""Runs an evaluation for the EvalTask.
Expand All @@ -324,6 +328,7 @@ def evaluate(
unique experiment run name is used.
response_column_name: The column name of model response in the dataset. If
provided, this will override the `response_column_name` of the `EvalTask`.
evaluation_service_qps: The custom QPS limit for the evaluation service.
retry_timeout: How long to keep retrying the evaluation requests for
the whole evaluation dataset, in seconds.
Expand All @@ -350,6 +355,7 @@ def evaluate(
prompt_template,
experiment_run_name,
response_column_name,
evaluation_service_qps,
retry_timeout,
)
metadata._experiment_tracker.set_experiment(
Expand All @@ -364,6 +370,7 @@ def evaluate(
prompt_template,
experiment_run_name,
response_column_name,
evaluation_service_qps,
retry_timeout,
)
metadata._experiment_tracker.reset()
Expand All @@ -373,6 +380,7 @@ def evaluate(
prompt_template,
experiment_run_name,
response_column_name,
evaluation_service_qps,
retry_timeout,
)
else:
Expand All @@ -384,6 +392,7 @@ def evaluate(
content_column_name=self.content_column_name,
reference_column_name=self.reference_column_name,
response_column_name=response_column_name,
evaluation_service_qps=evaluation_service_qps,
retry_timeout=retry_timeout,
)
return eval_result
Expand Down
19 changes: 12 additions & 7 deletions vertexai/preview/evaluation/_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ def _generate_response_from_gemini_model(
evaluation_run_config: Evaluation Run Configurations.
is_baseline_model: Whether the model is a baseline model for PairwiseMetric.
"""
max_workers = int(
constants.QuotaLimit.GEMINI_1_0_PRO_GENERATE_CONTENT_REQUESTS_PER_MINUTE / 2
)

# Ensure thread safety and avoid race conditions.
df = evaluation_run_config.dataset.copy()

Expand All @@ -310,7 +308,7 @@ def _generate_response_from_gemini_model(
constants.Dataset.COMPLETED_PROMPT_COLUMN
in evaluation_run_config.dataset.columns
):
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
with futures.ThreadPoolExecutor(max_workers=constants.MAX_WORKERS) as executor:
for _, row in df.iterrows():
tasks.append(
executor.submit(
Expand All @@ -323,7 +321,7 @@ def _generate_response_from_gemini_model(
content_column_name = evaluation_run_config.column_map[
constants.Dataset.CONTENT_COLUMN
]
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
with futures.ThreadPoolExecutor(max_workers=constants.MAX_WORKERS) as executor:
for _, row in df.iterrows():
tasks.append(
executor.submit(
Expand Down Expand Up @@ -609,9 +607,10 @@ def _compute_metrics(

instance_list = []
futures_by_metric = collections.defaultdict(list)
eval_max_workers = constants.QuotaLimit.EVAL_SERVICE_QPS

rate_limiter = utils.RateLimiter(evaluation_run_config.evaluation_service_qps)
with tqdm(total=api_request_count) as pbar:
with futures.ThreadPoolExecutor(max_workers=eval_max_workers) as executor:
with futures.ThreadPoolExecutor(max_workers=constants.MAX_WORKERS) as executor:
for idx, row in evaluation_run_config.dataset.iterrows():
row_dict = _compute_custom_metrics(row.to_dict(), custom_metrics)

Expand All @@ -626,6 +625,7 @@ def _compute_metrics(
row_dict=row_dict,
evaluation_run_config=evaluation_run_config,
),
rate_limiter=rate_limiter,
retry_timeout=evaluation_run_config.retry_timeout,
)
future.add_done_callback(lambda _: pbar.update(1))
Expand Down Expand Up @@ -686,6 +686,7 @@ def evaluate(
response_column_name: str = "response",
context_column_name: str = "context",
instruction_column_name: str = "instruction",
evaluation_service_qps: Optional[float] = None,
retry_timeout: float = 600.0,
) -> evaluation_base.EvalResult:
"""Runs the evaluation for metrics.
Expand All @@ -712,6 +713,7 @@ def evaluate(
not set, default to `context`.
instruction_column_name: The column name of the instruction prompt in the
dataset. If not set, default to `instruction`.
evaluation_service_qps: The custom QPS limit for the evaluation service.
retry_timeout: How long to keep retrying the evaluation requests for the
whole evaluation dataset, in seconds.
Returns:
Expand Down Expand Up @@ -741,6 +743,9 @@ def evaluate(
constants.Dataset.INSTRUCTION_COLUMN: instruction_column_name,
},
client=utils.create_evaluation_service_client(),
evaluation_service_qps=evaluation_service_qps
if evaluation_service_qps
else constants.QuotaLimit.EVAL_SERVICE_QPS,
retry_timeout=retry_timeout,
)

Expand Down
9 changes: 8 additions & 1 deletion vertexai/preview/evaluation/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
"""Constants for evaluation."""
import dataclasses

# The number of concurrent workers to use for making model inference and
# evaluation requests.
MAX_WORKERS = 100


@dataclasses.dataclass(frozen=True)
class Metric:
Expand Down Expand Up @@ -193,4 +197,7 @@ class QuotaLimit:
# Default queries per minute (QPM) quota for `gemini-1.0-pro` base model.
GEMINI_1_0_PRO_GENERATE_CONTENT_REQUESTS_PER_MINUTE = 300

EVAL_SERVICE_QPS = 10
# Evaluation Service QPS limit can be computed by
# (GEMINI_1_5_PRO_GENERATE_CONTENT_REQUESTS_QPM / 60 / Number of Samples)
# 0.25 = 300 / 60 / 4
EVAL_SERVICE_QPS = 0.25
6 changes: 3 additions & 3 deletions vertexai/preview/evaluation/metrics/_instance_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,24 +620,24 @@ def handle_response(
return result


# TODO(b/346659152): Add interface to customize rate limit.
@utils.rate_limit(constants.QuotaLimit.EVAL_SERVICE_QPS)
def evaluate_instances(
client: gapic_evaluation_services.EvaluationServiceClient,
request: gapic_eval_service_types.EvaluateInstancesRequest,
rate_limiter: utils.RateLimiter,
retry_timeout: float,
) -> gapic_eval_service_types.EvaluateInstancesResponse:
"""Evaluates an instance.
Args:
client: The client to use for evaluation.
request: An EvaluateInstancesRequest.
rate_limiter: The rate limiter to use for evaluation service requests.
retry_timeout: How long to keep retrying the evaluation requests, in seconds.
Returns:
A response from the evaluation service.
"""

rate_limiter.sleep_and_advance()
return client.evaluate_instances(
request=request,
retry=api_core.retry.Retry(
Expand Down
2 changes: 1 addition & 1 deletion vertexai/preview/evaluation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self, rate: Optional[float] = None):
Raises:
ValueError: If the rate is not positive.
"""
if rate <= 0:
if not rate or rate <= 0:
raise ValueError("Rate must be a positive number")
self.seconds_per_event = 1.0 / rate
self.last = time.time() - self.seconds_per_event
Expand Down

0 comments on commit 9ee9289

Please sign in to comment.