Skip to content

Commit

Permalink
feat(tracer): add impact and synchronisation in tracer (#26)
Browse files Browse the repository at this point in the history
* feat(tracer): add impact and synchronisation in tracer

* fix: update types to remove additional proposed fields, API is not stateful

* fix: rename include_impact_response to sync_mode and others things

Stripped documentation to what is working at the moment
Fixed tests with removed attribute request_id
Fixed huggingface_hub with their `post_init` that does a
`self.update(asdict(self))` which was breaking serialization of
Scope3AIContext
Fixed tracer attachment to context

* docs: update readme to what is working today
  • Loading branch information
tito authored Dec 26, 2024
1 parent e5754da commit f48b63c
Show file tree
Hide file tree
Showing 27 changed files with 750 additions and 289 deletions.
80 changes: 33 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ from scope3ai import Scope3AI
scope3 = Scope3AI.init(
api_key="YOUR_API_KEY", # Replace "YOUR_API_KEY" with your actual key
api_url="https://api.scope3.ai/v1", # Optional: Specify the API URL
include_impact_response=False, # Include impact in responses (default: False)
enable_debug_logging=False, # Enable debug logging (default: False)
sync_mode=False, # Enable synchronous mode when sending telemetry to the API (default: False)
)
```

Expand All @@ -34,7 +34,7 @@ You can also use environment variable to setup the SDK:

- `SCOPE3AI_API_KEY`: Your Scope3AI API key
- `SCOPE3AI_API_URL`: The API endpoint URL. Default: `https://api.scope3.ai/v1`
- `SCOPE3AI_INCLUDE_IMPACT_RESPONSE`: If `True`, every interaction will include its impact in the response. Default: `False`
- `SCOPE3AI_SYNC_MODE`: If `True`, every interaction will be send synchronously to the API, otherwise it will use a background worker. Default: `False`

```python
from scope3ai import Scope3AI
Expand All @@ -46,71 +46,57 @@ scope3 = Scope3AI.init()

### 1. Using Context Management for Tracing

You can record interactions using a `trace()` context. This allows you to analyze the sustainability impact of all interactions within the context.
Within the context of a `trace`, all interactions are recorded and you can query the impact of the trace.
As the interactions are captured and send to Scope3 AI for analysis, the impact is calculated and returned asynchronously.
This will automatically wait for all traces to be processed and return the impact.

```python
with scope3.trace() as tracer:
# Perform your interactions
interact()

# Print the impact of the recorded trace
print(tracer.impact())
```

### 2. Recording `trace_id` for Later Usage

Store the `trace_id` during the interaction for querying the impact later.

```python
trace_id = None
with scope3.trace() as tracer:
trace_id = tracer.trace_id
interact()

# Fetch and print the impact using the stored trace_id
print(scope3.impact(trace_id=trace_id))
# Print the impact of the recorded trace
impact = tracer.impact()
print(f"Total Energy Wh: {impact.total_energy_wh}")
print(f"Total GCO2e: {impact.total_gco2e}")
print(f"Total MLH2O: {impact.total_mlh2o}")
```

### 3. Using `record_id` from the Interaction Response
### 2. Single interaction

Retrieve the `record_id` from the interaction response and query the impact.
For a single interaction, the response is augmented with a `scope3ai` attribute that contains the
`request` and `impact` data. The impact data is calculated asynchronously so we need to wait
for the impact to be calculated and for the attribute to be ready.

```python
response = interact()
print(scope3.impact(record_id=response.scope3ai.record_id))
```

#### Alternative: Fetch Impact for Multiple Records

You can query impacts for multiple `record_id`s simultaneously:
client = OpenAI()
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello world"}],
stream=False,
)

```python
record_ids = [response.scope3ai.record_id]
print(scope3.impact_many(record_ids=record_ids))
response.scope3ai.wait_impact()
impact = response.scope3ai.impact
print(f"Total Energy Wh: {impact.total_energy_wh}")
print(f"Total GCO2e: {impact.total_gco2e}")
print(f"Total MLH2O: {impact.total_mlh2o}")
```

### 4. Enabling Synchronous Mode for Immediate Impact Response
### 3. Enabling Synchronous Mode for Immediate Impact Response

In synchronous mode, the SDK will include the impact data directly in the interaction response. This ensures that every interaction immediately returns its impact data.
In synchronous mode, the SDK will include the impact data directly in the interaction response.
This is useful when you want to get the impact data immediately after the interaction without waiting.

```python
scope3.include_impact_response = True
scope3.sync_mode = True

response = interact()
print(response.scope3ai.impact)
```

### 5. Specify name for grouping

You can specify a name for grouping the interactions. This is useful for grouping interactions based on a specific context.

```python
with scope3.trace(name="my_workflow"):
interact()
with scope3.trace(name="image_generation"):
generate_image()
save_to_s3()
interact()
impact = response.scope3ai.impact
print(f"Total Energy Wh: {impact.total_energy_wh}")
print(f"Total GCO2e: {impact.total_gco2e}")
print(f"Total MLH2O: {impact.total_mlh2o}")
```

## Development
Expand Down
6 changes: 2 additions & 4 deletions examples/api-async.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ async def list_gpus():


async def send_impact():
from scope3ai.api.types import ImpactRequestRow, Model
from scope3ai.api.types import ImpactRow, Model

print("Sending impact")
impact = ImpactRequestRow(
model=Model(id="gpt_4o"), input_tokens=100, output_tokens=100
)
impact = ImpactRow(model=Model(id="gpt_4o"), input_tokens=100, output_tokens=100)
response = await client.impact(rows=[impact])
print(response)

Expand Down
6 changes: 2 additions & 4 deletions examples/api-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ def list_gpus():


def send_impact():
from scope3ai.api.types import ImpactRequestRow, Model
from scope3ai.api.types import ImpactRow, Model

print("Sending impact")
impact = ImpactRequestRow(
model=Model(id="gpt_4o"), input_tokens=100, output_tokens=100
)
impact = ImpactRow(model=Model(id="gpt_4o"), input_tokens=100, output_tokens=100)
response = client.impact(rows=[impact])
print(response)

Expand Down
49 changes: 20 additions & 29 deletions examples/openai-sync-chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,31 @@ def interact() -> None:
messages=[{"role": "user", "content": "Hello world"}],
stream=False,
)
print(response.choices[0].message.content)
print(response)
return response


if __name__ == "__main__":
scope3 = Scope3AI.init(enable_debug_logging=True)

# 1. Using context
# trace() will create a "tracer" that will record all the interactions
# with a specific trace_id (UUID)
# it can be used later to get the impact of the interactions
with scope3.trace() as tracer:
interact()
print(tracer.impact())

# # 2. Using context, but record trace_id for usage on global scope
# # you could keep the trace_id and use it later
# trace_id = None
# with scope3.trace() as tracer:
# trace_id = tracer.trace_id
# interact()

# print(scope3.impact(trace_id=trace_id))
# 1. Impact calculation are done via the Scope3AI API in background
# so you need to wait for the impact to be calculated
response = interact()
response.scope3ai.wait_impact()
print(response.scope3ai.impact)

# # 3. Using record_id from the response
# response = interact()
# print(scope3.impact(record_id=response.scope3ai.record_id))

# # 3.1 Alternative with many record_id
# print(scope3.impact_many(record_ids=[response.scope3ai.record_id]))
# 2. A tracer will automatically wait for the impact response
with scope3.trace() as tracer:
response = interact()
impact = tracer.impact()
print(f"Total Energy Wh: {impact.total_energy_wh}")
print(f"Total GCO2e: {impact.total_gco2e}")
print(f"Total MLH2O: {impact.total_mlh2o}")

# # 4. Using sync mode to extend the response with the impact
# # it always include impact in the response by querying the API on every call
# scope3.include_impact_response = True
# response = interact()
# print(response.scope3ai.impact)
# 3. A tracer can be used to calculate the impact of multiple requests
with scope3.trace() as tracer:
response = interact()
response = interact()
impact = tracer.impact()
print(f"Total Energy Wh: {impact.total_energy_wh}")
print(f"Total GCO2e: {impact.total_gco2e}")
print(f"Total MLH2O: {impact.total_mlh2o}")
52 changes: 0 additions & 52 deletions examples/possible-usages.py

This file was deleted.

120 changes: 120 additions & 0 deletions scope3ai/api/tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import List, Optional
from .typesgen import ImpactResponse, ModeledRow, ImpactMetrics


class Tracer:
def __init__(
self,
name: str = None,
) -> None:
from scope3ai.lib import Scope3AI

self.scope3ai = Scope3AI.get_instance()
self.name = name
self.children: List[Tracer] = []
self.rows: List[ModeledRow] = []
self.traces = [] # type: List[Scope3AIContext]

def impact(self, timeout: Optional[int] = None) -> ImpactResponse:
"""
Return an aggregated impact response for the current tracer and its children.
As the impact is computed asynchronously, this method will wait for the
impact response to be available before returning it.
"""
for trace in self.traces:
trace.wait_impact(timeout)
return self._impact()

async def aimpact(self, timeout: Optional[int] = None) -> ImpactResponse:
"""
Async version of Tracer::impact.
"""
for trace in self.traces:
await trace.await_impact(timeout)
return self._impact()

def _impact(self) -> ImpactResponse:
"""
Return an aggregated impact response for the current tracer and its children.
"""
all_rows = self.get_all_rows()
return ImpactResponse(
rows=all_rows,
total_energy_wh=sum([row.total_impact.usage_energy_wh for row in all_rows]),
total_gco2e=sum(
[row.total_impact.usage_emissions_gco2e for row in all_rows]
),
total_mlh2o=sum([row.total_impact.usage_water_ml for row in all_rows]),
has_errors=any([row.error is not None for row in all_rows]),
)

def add_impact(self, impact: ModeledRow) -> None:
self.rows.append(impact)

def get_all_rows(self) -> List[ModeledRow]:
all_rows = self.rows[:]
for child in self.children:
all_rows.extend(child.get_all_rows())
return all_rows

def _sum_modeled_rows(self, rows: List[ModeledRow]) -> ModeledRow:
if not rows:
raise Exception("No rows to sum")
result = ModeledRow(
inference_impact=self._sum_impact_metrics(
[row.inference_impact for row in rows]
),
training_impact=self._sum_impact_metrics(
[row.training_impact for row in rows]
),
fine_tuning_impact=self._sum_impact_metrics(
[row.fine_tuning_impact for row in rows]
),
total_impact=self._sum_impact_metrics([row.total_impact for row in rows]),
)
return result

def _sum_impact_metrics(self, metrics: List[ImpactMetrics]) -> ImpactMetrics:
# Initialize totals
total_usage_energy_wh = 0.0
total_usage_emissions_gco2e = 0.0
total_usage_water_ml = 0.0
total_embodied_emissions_gco2e = 0.0
total_embodied_water_ml = 0.0

# Aggregate values
for metric in metrics:
if not isinstance(metric, ImpactMetrics):
raise TypeError(
"All items in the list must be instances of ImpactMetrics."
)
total_usage_energy_wh += metric.usage_energy_wh
total_usage_emissions_gco2e += metric.usage_emissions_gco2e
total_usage_water_ml += metric.usage_water_ml
total_embodied_emissions_gco2e += metric.embodied_emissions_gco2e
total_embodied_water_ml += metric.embodied_water_ml

# Return a new instance with summed values
return ImpactMetrics(
usage_energy_wh=total_usage_energy_wh,
usage_emissions_gco2e=total_usage_emissions_gco2e,
usage_water_ml=total_usage_water_ml,
embodied_emissions_gco2e=total_embodied_emissions_gco2e,
embodied_water_ml=total_embodied_water_ml,
)

def _link_parent(self, parent: Optional["Tracer"]) -> None:
if parent and (self not in parent.children):
parent.children.append(self)

def _unlink_parent(self, parent: Optional["Tracer"]) -> None:
pass

def _link_trace(self, trace) -> None:
if trace not in self.traces:
self.traces.append(trace)

def _unlink_trace(self, trace) -> None:
if trace in self.traces:
self.traces.remove(trace)
Loading

0 comments on commit f48b63c

Please sign in to comment.