Skip to content

Commit 6582320

Browse files
authored
Merge pull request #186 from axiomhq/aisha/support-mpl
feat: Add MPL Query Support (Sync + Async)
2 parents fe796a3 + 59dab85 commit 6582320

14 files changed

Lines changed: 767 additions & 90 deletions

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,34 @@ edge_client = axiom_py.Client(
4444

4545
**Note:** Edge endpoints require API tokens (`xaat-`), not personal tokens.
4646
Edge configuration must be passed explicitly when creating the client.
47+
48+
## Metrics Queries (MPL)
49+
50+
To query OTel metrics using MPL (Metrics Processing Language), configure the
51+
client with your edge endpoint and use `mpl_query`:
52+
53+
```python
54+
import axiom_py
55+
from axiom_py import MplOptions
56+
from datetime import datetime, timedelta, timezone
57+
58+
client = axiom_py.Client(
59+
token="xaat-your-api-token",
60+
edge="us-east-1.aws.edge.axiom.co"
61+
)
62+
63+
end = datetime.now(timezone.utc)
64+
start = end - timedelta(hours=1)
65+
66+
result = client.mpl_query(
67+
"`my-metrics`:`http.server.duration` | align to 5m using avg",
68+
opts=MplOptions(start_time=start, end_time=end),
69+
)
70+
71+
for series in result.series:
72+
print(series.metric, series.tags, series.data)
73+
```
74+
4775
## Asynchronous Client
4876

4977
The library also provides an async client for use with asyncio:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "axiom-py"
3-
version = "0.10.0"
3+
version = "0.12.0"
44
description = "Official bindings for the Axiom API"
55
readme = "README.md"
66
requires-python = ">=3.8"

src/axiom_py/__init__.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
ContentEncoding,
1414
WrongQueryKindException,
1515
PersonalTokenNotSupportedForEdgeError,
16+
EdgeResolutionError,
1617
AplOptions,
18+
MplOptions,
1719
Client,
1820
AXIOM_URL,
1921
)
22+
from .query import MplResult, MplSeriesItem, MplMetadata
2023
from .datasets import (
2124
Dataset,
2225
TrimRequest,
@@ -40,35 +43,40 @@
4043
from .logging_async import AsyncAxiomHandler
4144
from .structlog_async import AsyncAxiomProcessor
4245

43-
_all_ = [
44-
AxiomError,
45-
IngestFailure,
46-
IngestStatus,
47-
IngestOptions,
48-
AplResultFormat,
49-
ContentType,
50-
ContentEncoding,
51-
WrongQueryKindException,
52-
PersonalTokenNotSupportedForEdgeError,
53-
AplOptions,
54-
AXIOM_URL,
55-
Dataset,
56-
TrimRequest,
57-
Annotation,
58-
AnnotationCreateRequest,
59-
AnnotationUpdateRequest,
46+
__all__ = [
47+
"AxiomError",
48+
"IngestFailure",
49+
"IngestStatus",
50+
"IngestOptions",
51+
"AplResultFormat",
52+
"ContentType",
53+
"ContentEncoding",
54+
"WrongQueryKindException",
55+
"PersonalTokenNotSupportedForEdgeError",
56+
"EdgeResolutionError",
57+
"AplOptions",
58+
"MplOptions",
59+
"MplResult",
60+
"MplSeriesItem",
61+
"MplMetadata",
62+
"AXIOM_URL",
63+
"Dataset",
64+
"TrimRequest",
65+
"Annotation",
66+
"AnnotationCreateRequest",
67+
"AnnotationUpdateRequest",
6068
# Sync API
61-
Client,
62-
DatasetsClient,
63-
AnnotationsClient,
64-
AxiomHandler,
65-
AxiomProcessor,
69+
"Client",
70+
"DatasetsClient",
71+
"AnnotationsClient",
72+
"AxiomHandler",
73+
"AxiomProcessor",
6674
# Async API
67-
AsyncClient,
68-
AsyncDatasetsClient,
69-
AsyncAnnotationsClient,
70-
AsyncTokensClient,
71-
AsyncUsersClient,
72-
AsyncAxiomHandler,
73-
AsyncAxiomProcessor,
75+
"AsyncClient",
76+
"AsyncDatasetsClient",
77+
"AsyncAnnotationsClient",
78+
"AsyncTokensClient",
79+
"AsyncUsersClient",
80+
"AsyncAxiomHandler",
81+
"AsyncAxiomProcessor",
7482
]

src/axiom_py/client.py

Lines changed: 109 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from enum import Enum
1010
from humps import decamelize
11-
from typing import Optional, List, Dict, Callable
11+
from typing import Dict, List, Optional, Callable
1212
from dataclasses import dataclass, field, asdict
1313
from datetime import datetime
1414
from requests_toolbelt.sessions import BaseUrlSession
@@ -20,14 +20,19 @@
2020
QueryOptions,
2121
QueryLegacyResult,
2222
QueryKind,
23+
MplResult,
2324
)
2425
from .annotations import AnnotationsClient
2526
from .users import UsersClient
2627
from .version import __version__
27-
from .util import from_dict, handle_json_serialization, is_personal_token
28+
from .util import (
29+
from_dict,
30+
format_datetime_rfc3339_utc,
31+
handle_json_serialization,
32+
is_personal_token,
33+
)
2834
from .tokens import TokensClient
2935

30-
3136
AXIOM_URL = "https://api.axiom.co"
3237

3338

@@ -41,6 +46,12 @@ def __init__(self):
4146
)
4247

4348

49+
class EdgeResolutionError(Exception):
50+
"""Raised when the edge domain for a dataset cannot be resolved."""
51+
52+
pass
53+
54+
4455
@dataclass
4556
class IngestFailure:
4657
"""The ingestion failure of a single event"""
@@ -124,6 +135,21 @@ class AplOptions:
124135
limit: Optional[int] = field(default=None)
125136

126137

138+
@dataclass
139+
class MplOptions:
140+
"""Options for an MPL (Metrics Processing Language) query."""
141+
142+
# Start time for the query range (required by the API).
143+
start_time: datetime
144+
# End time for the query range (required by the API).
145+
end_time: datetime
146+
# Additional parameters forwarded to MetricsDB (e.g. filterbar bindings).
147+
params: Optional[Dict[str, str]] = field(default=None)
148+
# Optional metrics query options passed through to the API payload.
149+
query_options: Optional[Dict[str, str]] = field(default=None)
150+
nocache: bool = field(default=False)
151+
152+
127153
class AxiomError(Exception):
128154
"""This exception is raised on request errors."""
129155

@@ -187,8 +213,8 @@ def __init__(
187213
"https://eu-central-1.aws.edge.axiom.co"). When set, ingest
188214
requests use `/v1/ingest/{dataset}` and query requests use
189215
`/v1/query/_apl`.
190-
Must be passed explicitly (not read from environment).
191216
Takes precedence over `edge`.
217+
Must be passed explicitly (not read from environment).
192218
edge: Edge domain for ingest/query operations (e.g.,
193219
"eu-central-1.aws.edge.axiom.co"). When set, ingest
194220
requests use `https://{edge}/v1/ingest/{dataset}` and query
@@ -204,8 +230,8 @@ def __init__(
204230
url = os.getenv("AXIOM_URL")
205231
# Note: edge_url and edge are NOT auto-read from environment.
206232
# Edge configuration must be explicit to avoid accidentally routing
207-
# all requests through edge when AXIOM_EDGE_URL or AXIOM_EDGE is set for
208-
# edge-specific tests. Create a separate Client with edge_url
233+
# all requests through edge when AXIOM_EDGE_URL/AXIOM_EDGE are set
234+
# for edge-specific tests. Create a separate Client with edge params
209235
# for edge operations.
210236

211237
# Normalize empty strings to None for edge config
@@ -269,49 +295,49 @@ def is_edge_configured(self) -> bool:
269295
"""Check if edge is configured."""
270296
return self._edge_url is not None or self._edge is not None
271297

272-
def _get_edge_ingest_url(self, dataset: str) -> Optional[str]:
298+
def _resolve_edge_url(self, path_suffix: str) -> Optional[str]:
273299
"""
274-
Get the full edge ingest URL for a dataset.
275-
Returns None if edge is not configured.
300+
Resolve an edge URL for the given path suffix.
301+
302+
If edge_url includes a custom path, it is used as-is.
303+
If edge_url is just a host/base URL, the suffix is appended.
304+
If only edge domain is configured, https://{edge}/{suffix} is used.
276305
"""
277306
if self._edge_url is not None:
278307
url = self._edge_url.rstrip("/")
279308
parsed = urlparse(url)
280309
path = parsed.path
281310

282-
# If path is empty or just "/", append edge ingest format
283311
if path == "" or path == "/":
284-
return f"{parsed.scheme}://{parsed.netloc}/v1/ingest/{dataset}"
312+
return f"{parsed.scheme}://{parsed.netloc}/{path_suffix}"
285313

286-
# edge_url has a custom path, use as-is
287314
return url
288315

289316
if self._edge is not None:
290-
return f"https://{self._edge.rstrip('/')}/v1/ingest/{dataset}"
317+
return f"https://{self._edge.rstrip('/')}/{path_suffix}"
291318

292319
return None
293320

321+
def _get_edge_ingest_url(self, dataset: str) -> Optional[str]:
322+
"""
323+
Get the full edge ingest URL for a dataset.
324+
Returns None if edge is not configured.
325+
"""
326+
return self._resolve_edge_url(f"v1/ingest/{dataset}")
327+
294328
def _get_edge_query_url(self) -> Optional[str]:
295329
"""
296330
Get the full edge query URL.
297331
Returns None if edge is not configured.
298332
"""
299-
if self._edge_url is not None:
300-
url = self._edge_url.rstrip("/")
301-
parsed = urlparse(url)
302-
path = parsed.path
303-
304-
# If path is empty or just "/", append edge query format
305-
if path == "" or path == "/":
306-
return f"{parsed.scheme}://{parsed.netloc}/v1/query/_apl"
307-
308-
# edge_url has a custom path, use as-is
309-
return url
333+
return self._resolve_edge_url("v1/query/_apl")
310334

311-
if self._edge is not None:
312-
return f"https://{self._edge.rstrip('/')}/v1/query/_apl"
313-
314-
return None
335+
def _get_edge_mpl_url(self) -> Optional[str]:
336+
"""
337+
Get the full edge MPL query URL.
338+
Returns None if edge is not configured.
339+
"""
340+
return self._resolve_edge_url("v1/query/_mpl")
315341

316342
def before_shutdown(self, func: Callable):
317343
self.before_shutdown_funcs.append(func)
@@ -452,6 +478,61 @@ def query(
452478

453479
return result
454480

481+
def mpl_query(self, mpl: str, opts: MplOptions) -> MplResult:
482+
"""
483+
Execute an MPL (Metrics Processing Language) query.
484+
485+
MPL queries are edge-only. Configure the client with edge_url or
486+
edge to specify the endpoint. Edge endpoints require API tokens
487+
(xaat-), not personal tokens.
488+
489+
Args:
490+
mpl: MPL query string (e.g.
491+
"`my-dataset`:`http.requests` | align to 5m using avg")
492+
opts: Query options including required start_time and end_time.
493+
494+
Returns:
495+
MplResult containing the time-series data.
496+
"""
497+
url = self._get_edge_mpl_url()
498+
if url is None:
499+
raise EdgeResolutionError(
500+
"MPL queries require an edge endpoint; "
501+
"set edge_url or edge when creating the Client"
502+
)
503+
if is_personal_token(self._token):
504+
raise PersonalTokenNotSupportedForEdgeError()
505+
506+
payload = ujson.dumps(
507+
self._prepare_mpl_payload(mpl, opts),
508+
default=handle_json_serialization,
509+
)
510+
params = self._prepare_mpl_params(opts)
511+
res = self.session.post(url, data=payload, params=params)
512+
result = from_dict(MplResult, res.json())
513+
result.savedQueryID = res.headers.get("X-Axiom-History-Query-Id")
514+
return result
515+
516+
def _prepare_mpl_payload(
517+
self, mpl: str, opts: MplOptions
518+
) -> Dict[str, object]:
519+
payload: Dict[str, object] = {
520+
"mpl": mpl,
521+
"startTime": format_datetime_rfc3339_utc(opts.start_time),
522+
"endTime": format_datetime_rfc3339_utc(opts.end_time),
523+
}
524+
if opts.params:
525+
payload["params"] = opts.params
526+
if opts.query_options:
527+
payload["queryOptions"] = opts.query_options
528+
return payload
529+
530+
def _prepare_mpl_params(self, opts: MplOptions) -> Dict[str, object]:
531+
params: Dict[str, object] = {"format": "metrics-v2"}
532+
if opts.nocache:
533+
params["nocache"] = "true"
534+
return params
535+
455536
def _prepare_query_options(self, opts: QueryOptions) -> Dict[str, object]:
456537
"""returns the query options as a Dict, handles any renaming for key
457538
fields."""

0 commit comments

Comments
 (0)