Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring cleaning #120

Merged
merged 12 commits into from
Sep 9, 2024
4 changes: 0 additions & 4 deletions axiom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
DatasetCreateRequest,
DatasetUpdateRequest,
TrimRequest,
Field,
DatasetInfo,
DatasetsClient,
)
from .annotations import (
Expand All @@ -45,8 +43,6 @@
DatasetCreateRequest,
DatasetUpdateRequest,
TrimRequest,
Field,
DatasetInfo,
DatasetsClient,
Annotation,
AnnotationCreateRequest,
Expand Down
30 changes: 25 additions & 5 deletions axiom/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,22 @@ def __init__(self, session: Session, logger: Logger):
self.logger = logger

def get(self, id: str) -> Annotation:
"""Get a annotation by id."""
"""
Get a annotation by id.

See https://axiom.co/docs/restapi/endpoints/getAnnotation
"""
path = "/v2/annotations/%s" % id
res = self.session.get(path)
decoded_response = res.json()
return Util.from_dict(Annotation, decoded_response)

def create(self, req: AnnotationCreateRequest) -> Annotation:
"""Create an annotation with the given properties."""
"""
Create an annotation with the given properties.

See https://axiom.co/docs/restapi/endpoints/createAnnotation
"""
path = "/v2/annotations"
res = self.session.post(path, data=ujson.dumps(asdict(req)))
annotation = Util.from_dict(Annotation, res.json())
Expand All @@ -80,7 +88,11 @@ def list(
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> List[Annotation]:
"""List all annotations."""
"""
List all annotations.

See https://axiom.co/docs/restapi/endpoints/getAnnotations
"""
query_params = {}
if len(datasets) > 0:
query_params["datasets"] = ",".join(datasets)
Expand All @@ -100,14 +112,22 @@ def list(
return annotations

def update(self, id: str, req: AnnotationUpdateRequest) -> Annotation:
"""Update an annotation with the given properties."""
"""
Update an annotation with the given properties.

See https://axiom.co/docs/restapi/endpoints/updateAnnotation
"""
path = "/v2/annotations/%s" % id
res = self.session.put(path, data=ujson.dumps(asdict(req)))
annotation = Util.from_dict(Annotation, res.json())
self.logger.info(f"updated annotation({annotation.id})")
return annotation

def delete(self, id: str):
"""Deletes an annotation with the given id."""
"""
Deletes an annotation with the given id.

See https://axiom.co/docs/restapi/endpoints/deleteAnnotation
"""
path = "/v2/annotations/%s" % id
self.session.delete(path)
76 changes: 54 additions & 22 deletions axiom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
from requests_toolbelt.sessions import BaseUrlSession
from requests.adapters import HTTPAdapter, Retry
from .datasets import DatasetsClient
from .query import QueryLegacy, QueryResult, QueryOptions, QueryLegacyResult, QueryKind
from .query import (
QueryLegacy,
QueryResult,
QueryOptions,
QueryLegacyResult,
QueryKind,
)
from .annotations import AnnotationsClient
from .users import UsersClient
from .version import __version__
Expand Down Expand Up @@ -177,23 +183,25 @@ def ingest(
enc: ContentEncoding,
opts: Optional[IngestOptions] = None,
) -> IngestStatus:
"""Ingest the events into the named dataset and returns the status."""
path = "/v1/datasets/%s/ingest" % dataset

# check if passed content type and encoding are correct
if not contentType:
raise ValueError("unknown content-type, choose one of json,x-ndjson or csv")
"""
Ingest the payload into the named dataset and returns the status.

if not enc:
raise ValueError("unknown content-encoding")
See https://axiom.co/docs/restapi/endpoints/ingestIntoDataset
"""
path = "/v1/datasets/%s/ingest" % dataset

# set headers
headers = {"Content-Type": contentType.value, "Content-Encoding": enc.value}
headers = {
"Content-Type": contentType.value,
"Content-Encoding": enc.value,
}
# prepare query params
params = self._prepare_ingest_options(opts)

# override the default header and set the value from the passed parameter
res = self.session.post(path, data=payload, headers=headers, params=params)
res = self.session.post(
path, data=payload, headers=headers, params=params
)
status_snake = decamelize(res.json())
return Util.from_dict(IngestStatus, status_snake)

Expand All @@ -203,11 +211,15 @@ def ingest_events(
events: List[dict],
opts: Optional[IngestOptions] = None,
) -> IngestStatus:
"""Ingest the events into the named dataset and returns the status."""
"""
Ingest the events into the named dataset and returns the status.

See https://axiom.co/docs/restapi/endpoints/ingestIntoDataset
"""
# encode request payload to NDJSON
content = ndjson.dumps(events, default=Util.handle_json_serialization).encode(
"UTF-8"
)
content = ndjson.dumps(
events, default=Util.handle_json_serialization
).encode("UTF-8")
gzipped = gzip.compress(content)

return self.ingest(
Expand All @@ -217,15 +229,21 @@ def ingest_events(
def query_legacy(
self, id: str, query: QueryLegacy, opts: QueryOptions
) -> QueryLegacyResult:
"""Executes the given query on the dataset identified by its id."""
"""
Executes the given structured query on the dataset identified by its id.

See https://axiom.co/docs/restapi/endpoints/queryDataset
"""
if not opts.saveAsKind or (opts.saveAsKind == QueryKind.APL):
raise WrongQueryKindException(
"invalid query kind %s: must be %s or %s"
% (opts.saveAsKind, QueryKind.ANALYTICS, QueryKind.STREAM)
)

path = "/v1/datasets/%s/query" % id
payload = ujson.dumps(asdict(query), default=Util.handle_json_serialization)
payload = ujson.dumps(
asdict(query), default=Util.handle_json_serialization
)
self.logger.debug("sending query %s" % payload)
params = self._prepare_query_options(opts)
res = self.session.post(path, data=payload, params=params)
Expand All @@ -236,12 +254,24 @@ def query_legacy(
result.savedQueryID = query_id
return result

def apl_query(self, apl: str, opts: Optional[AplOptions] = None) -> QueryResult:
"""Executes the given apl query on the dataset identified by its id."""
def apl_query(
self, apl: str, opts: Optional[AplOptions] = None
) -> QueryResult:
"""
Executes the given apl query on the dataset identified by its id.

See https://axiom.co/docs/restapi/endpoints/queryApl
"""
return self.query(apl, opts)

def query(self, apl: str, opts: Optional[AplOptions] = None) -> QueryResult:
"""Executes the given apl query on the dataset identified by its id."""
def query(
self, apl: str, opts: Optional[AplOptions] = None
) -> QueryResult:
"""
Executes the given apl query on the dataset identified by its id.

See https://axiom.co/docs/restapi/endpoints/queryApl
"""
path = "/v1/datasets/_apl"
payload = ujson.dumps(
self._prepare_apl_payload(apl, opts),
Expand Down Expand Up @@ -293,7 +323,9 @@ def _prepare_ingest_options(

return params

def _prepare_apl_options(self, opts: Optional[AplOptions]) -> Dict[str, object]:
def _prepare_apl_options(
self, opts: Optional[AplOptions]
) -> Dict[str, object]:
"""Prepare the apl query options for the request."""
params = {"format": AplResultFormat.Legacy.value}

Expand Down
93 changes: 43 additions & 50 deletions axiom/datasets.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""This package provides dataset models and methods as well as a DatasetClient"""
"""
This package provides dataset models and methods as well as a DatasetClient
"""

import ujson
from logging import Logger
from requests import Session
from typing import List
from dataclasses import dataclass, asdict, field
from datetime import datetime, timedelta
from datetime import timedelta
from .util import Util


Expand Down Expand Up @@ -37,41 +39,14 @@ class DatasetUpdateRequest:

@dataclass
class TrimRequest:
"""MaxDuration marks the oldest timestamp an event can have before getting deleted."""
"""
MaxDuration marks the oldest timestamp an event can have before getting
deleted.
"""

maxDuration: str


@dataclass
class Field:
"""A field of a dataset"""

name: str
description: str
type: str
unit: str
hidden: bool


@dataclass
class DatasetInfo:
"""Information and statistics stored inside a dataset"""

name: str
numBlocks: int
numEvents: int
numFields: int
inputBytes: int
inputBytesHuman: str
compressedBytes: int
compressedBytesHuman: str
minTime: datetime
maxTime: datetime
fields: List[Field]
who: str
created: datetime


class DatasetsClient: # pylint: disable=R0903
"""DatasetsClient has methods to manipulate datasets."""

Expand All @@ -82,22 +57,34 @@ def __init__(self, session: Session, logger: Logger):
self.logger = logger

def get(self, id: str) -> Dataset:
"""Get a dataset by id."""
"""
Get a dataset by id.

See https://axiom.co/docs/restapi/endpoints/getDataset
"""
path = "/v1/datasets/%s" % id
res = self.session.get(path)
decoded_response = res.json()
return Util.from_dict(Dataset, decoded_response)

def create(self, req: DatasetCreateRequest) -> Dataset:
"""Create a dataset with the given properties."""
"""
Create a dataset with the given properties.

See https://axiom.co/docs/restapi/endpoints/createDataset
"""
path = "/v1/datasets"
res = self.session.post(path, data=ujson.dumps(asdict(req)))
ds = Util.from_dict(Dataset, res.json())
self.logger.info(f"created new dataset: {ds.name}")
return ds

def get_list(self) -> List[Dataset]:
"""List all available datasets."""
"""
List all available datasets.

See https://axiom.co/docs/restapi/endpoints/getDatasets
"""
path = "/v1/datasets"
res = self.session.get(path)

Expand All @@ -109,32 +96,38 @@ def get_list(self) -> List[Dataset]:
return datasets

def update(self, id: str, req: DatasetUpdateRequest) -> Dataset:
"""Update a dataset with the given properties."""
"""
Update a dataset with the given properties.

See https://axiom.co/docs/restapi/endpoints/updateDataset
"""
path = "/v1/datasets/%s" % id
res = self.session.put(path, data=ujson.dumps(asdict(req)))
ds = Util.from_dict(Dataset, res.json())
self.logger.info(f"updated dataset({ds.name}) with new desc: {ds.description}")
self.logger.info(
f"updated dataset({ds.name}) with new desc: {ds.description}"
)
return ds

def delete(self, id: str):
"""Deletes a dataset with the given id."""
"""
Deletes a dataset with the given id.

See https://axiom.co/docs/restapi/endpoints/deleteDataset
"""
path = "/v1/datasets/%s" % id
self.session.delete(path)

def trim(self, id: str, maxDuration: timedelta):
"""
Trim the dataset identified by its id to a given length. The max duration
given will mark the oldest timestamp an event can have. Older ones will be
deleted from the dataset.
Trim the dataset identified by its id to a given length. The max
duration given will mark the oldest timestamp an event can have.
Older ones will be deleted from the dataset.

See https://axiom.co/docs/restapi/endpoints/trimDataset
"""
path = "/v1/datasets/%s/trim" % id
# prepare request payload and format masDuration to append time unit at the end, e.g `1s`
# prepare request payload and format masDuration to append time unit at
# the end, e.g `1s`
req = TrimRequest(f"{maxDuration.seconds}s")
self.session.post(path, data=ujson.dumps(asdict(req)))

def info(self, id: str) -> DatasetInfo:
"""Returns the info about a dataset."""
path = "/v1/datasets/%s/info" % id
res = self.session.get(path)
decoded_response = res.json()
return Util.from_dict(DatasetInfo, decoded_response)
5 changes: 4 additions & 1 deletion axiom/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1):
def emit(self, record):
"""emit sends a log to Axiom."""
self.buffer.append(record.__dict__)
if len(self.buffer) >= 1000 or time.monotonic() - self.last_run > self.interval:
if (
len(self.buffer) >= 1000
or time.monotonic() - self.last_run > self.interval
):
self.flush()

def flush(self):
Expand Down
Loading