Skip to content

Commit 704a0e1

Browse files
committed
add MPL query support
1 parent fe796a3 commit 704a0e1

11 files changed

Lines changed: 770 additions & 99 deletions

File tree

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,37 @@ edge_client = axiom_py.Client(
4343
```
4444

4545
**Note:** Edge endpoints require API tokens (`xaat-`), not personal tokens.
46-
Edge configuration must be passed explicitly when creating the client.
46+
Edge configuration can be passed explicitly (`edge_url` / `edge`) or through
47+
environment variables (`AXIOM_EDGE_URL` / `AXIOM_EDGE`). Explicit constructor
48+
arguments take precedence.
49+
50+
## Metrics Queries (MPL)
51+
52+
To query OTel metrics using MPL (Metrics Processing Language), configure the
53+
client with your edge endpoint and use `mpl_query`:
54+
55+
```python
56+
import axiom_py
57+
from axiom_py import MplOptions
58+
from datetime import datetime, timedelta, timezone
59+
60+
client = axiom_py.Client(
61+
token="xaat-your-api-token",
62+
edge="us-east-1.aws.edge.axiom.co"
63+
)
64+
65+
end = datetime.now(timezone.utc)
66+
start = end - timedelta(hours=1)
67+
68+
result = client.mpl_query(
69+
"`my-metrics`:`http.server.duration` | align to 5m using avg",
70+
opts=MplOptions(start_time=start, end_time=end),
71+
)
72+
73+
for series in result.series:
74+
print(series.metric, series.tags, series.data)
75+
```
76+
4777
## Asynchronous Client
4878

4979
The library also provides an async client for use with asyncio:

src/axiom_py/__init__.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
ContentEncoding,
1414
WrongQueryKindException,
1515
PersonalTokenNotSupportedForEdgeError,
16+
EdgeResolutionError,
1617
AplOptions,
18+
MplOptions,
1719
Client,
1820
AXIOM_URL,
1921
)
22+
from .query import MplResult, MplSeriesItem, MplMetadata
2023
from .datasets import (
2124
Dataset,
2225
TrimRequest,
@@ -40,35 +43,40 @@
4043
from .logging_async import AsyncAxiomHandler
4144
from .structlog_async import AsyncAxiomProcessor
4245

43-
_all_ = [
44-
AxiomError,
45-
IngestFailure,
46-
IngestStatus,
47-
IngestOptions,
48-
AplResultFormat,
49-
ContentType,
50-
ContentEncoding,
51-
WrongQueryKindException,
52-
PersonalTokenNotSupportedForEdgeError,
53-
AplOptions,
54-
AXIOM_URL,
55-
Dataset,
56-
TrimRequest,
57-
Annotation,
58-
AnnotationCreateRequest,
59-
AnnotationUpdateRequest,
46+
__all__ = [
47+
"AxiomError",
48+
"IngestFailure",
49+
"IngestStatus",
50+
"IngestOptions",
51+
"AplResultFormat",
52+
"ContentType",
53+
"ContentEncoding",
54+
"WrongQueryKindException",
55+
"PersonalTokenNotSupportedForEdgeError",
56+
"EdgeResolutionError",
57+
"AplOptions",
58+
"MplOptions",
59+
"MplResult",
60+
"MplSeriesItem",
61+
"MplMetadata",
62+
"AXIOM_URL",
63+
"Dataset",
64+
"TrimRequest",
65+
"Annotation",
66+
"AnnotationCreateRequest",
67+
"AnnotationUpdateRequest",
6068
# Sync API
61-
Client,
62-
DatasetsClient,
63-
AnnotationsClient,
64-
AxiomHandler,
65-
AxiomProcessor,
69+
"Client",
70+
"DatasetsClient",
71+
"AnnotationsClient",
72+
"AxiomHandler",
73+
"AxiomProcessor",
6674
# Async API
67-
AsyncClient,
68-
AsyncDatasetsClient,
69-
AsyncAnnotationsClient,
70-
AsyncTokensClient,
71-
AsyncUsersClient,
72-
AsyncAxiomHandler,
73-
AsyncAxiomProcessor,
75+
"AsyncClient",
76+
"AsyncDatasetsClient",
77+
"AsyncAnnotationsClient",
78+
"AsyncTokensClient",
79+
"AsyncUsersClient",
80+
"AsyncAxiomHandler",
81+
"AsyncAxiomProcessor",
7482
]

src/axiom_py/client.py

Lines changed: 112 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from enum import Enum
1010
from humps import decamelize
11-
from typing import Optional, List, Dict, Callable
11+
from typing import Dict, List, Optional, Callable
1212
from dataclasses import dataclass, field, asdict
1313
from datetime import datetime
1414
from requests_toolbelt.sessions import BaseUrlSession
@@ -20,14 +20,19 @@
2020
QueryOptions,
2121
QueryLegacyResult,
2222
QueryKind,
23+
MplResult,
2324
)
2425
from .annotations import AnnotationsClient
2526
from .users import UsersClient
2627
from .version import __version__
27-
from .util import from_dict, handle_json_serialization, is_personal_token
28+
from .util import (
29+
from_dict,
30+
format_datetime_rfc3339_utc,
31+
handle_json_serialization,
32+
is_personal_token,
33+
)
2834
from .tokens import TokensClient
2935

30-
3136
AXIOM_URL = "https://api.axiom.co"
3237

3338

@@ -41,6 +46,12 @@ def __init__(self):
4146
)
4247

4348

49+
class EdgeResolutionError(Exception):
50+
"""Raised when the edge domain for a dataset cannot be resolved."""
51+
52+
pass
53+
54+
4455
@dataclass
4556
class IngestFailure:
4657
"""The ingestion failure of a single event"""
@@ -124,6 +135,21 @@ class AplOptions:
124135
limit: Optional[int] = field(default=None)
125136

126137

138+
@dataclass
139+
class MplOptions:
140+
"""Options for an MPL (Metrics Processing Language) query."""
141+
142+
# Start time for the query range (required by the API).
143+
start_time: datetime
144+
# End time for the query range (required by the API).
145+
end_time: datetime
146+
# Additional parameters forwarded to MetricsDB (e.g. filterbar bindings).
147+
params: Optional[Dict[str, str]] = field(default=None)
148+
# Optional metrics query options passed through to the API payload.
149+
query_options: Optional[Dict[str, str]] = field(default=None)
150+
nocache: bool = field(default=False)
151+
152+
127153
class AxiomError(Exception):
128154
"""This exception is raised on request errors."""
129155

@@ -187,13 +213,13 @@ def __init__(
187213
"https://eu-central-1.aws.edge.axiom.co"). When set, ingest
188214
requests use `/v1/ingest/{dataset}` and query requests use
189215
`/v1/query/_apl`.
190-
Must be passed explicitly (not read from environment).
216+
Falls back to AXIOM_EDGE_URL env var.
191217
Takes precedence over `edge`.
192218
edge: Edge domain for ingest/query operations (e.g.,
193219
"eu-central-1.aws.edge.axiom.co"). When set, ingest
194220
requests use `https://{edge}/v1/ingest/{dataset}` and query
195221
requests use `https://{edge}/v1/query/_apl`.
196-
Must be passed explicitly (not read from environment).
222+
Falls back to AXIOM_EDGE env var.
197223
"""
198224
# fallback to env variables if not provided
199225
if token is None:
@@ -202,11 +228,10 @@ def __init__(
202228
org_id = os.getenv("AXIOM_ORG_ID")
203229
if url is None:
204230
url = os.getenv("AXIOM_URL")
205-
# Note: edge_url and edge are NOT auto-read from environment.
206-
# Edge configuration must be explicit to avoid accidentally routing
207-
# all requests through edge when AXIOM_EDGE_URL or AXIOM_EDGE is set for
208-
# edge-specific tests. Create a separate Client with edge_url
209-
# for edge operations.
231+
if edge_url is None:
232+
edge_url = os.getenv("AXIOM_EDGE_URL")
233+
if edge is None:
234+
edge = os.getenv("AXIOM_EDGE")
210235

211236
# Normalize empty strings to None for edge config
212237
edge_url = edge_url or None
@@ -269,49 +294,49 @@ def is_edge_configured(self) -> bool:
269294
"""Check if edge is configured."""
270295
return self._edge_url is not None or self._edge is not None
271296

272-
def _get_edge_ingest_url(self, dataset: str) -> Optional[str]:
297+
def _resolve_edge_url(self, path_suffix: str) -> Optional[str]:
273298
"""
274-
Get the full edge ingest URL for a dataset.
275-
Returns None if edge is not configured.
299+
Resolve an edge URL for the given path suffix.
300+
301+
If edge_url includes a custom path, it is used as-is.
302+
If edge_url is just a host/base URL, the suffix is appended.
303+
If only edge domain is configured, https://{edge}/{suffix} is used.
276304
"""
277305
if self._edge_url is not None:
278306
url = self._edge_url.rstrip("/")
279307
parsed = urlparse(url)
280308
path = parsed.path
281309

282-
# If path is empty or just "/", append edge ingest format
283310
if path == "" or path == "/":
284-
return f"{parsed.scheme}://{parsed.netloc}/v1/ingest/{dataset}"
311+
return f"{parsed.scheme}://{parsed.netloc}/{path_suffix}"
285312

286-
# edge_url has a custom path, use as-is
287313
return url
288314

289315
if self._edge is not None:
290-
return f"https://{self._edge.rstrip('/')}/v1/ingest/{dataset}"
316+
return f"https://{self._edge.rstrip('/')}/{path_suffix}"
291317

292318
return None
293319

320+
def _get_edge_ingest_url(self, dataset: str) -> Optional[str]:
321+
"""
322+
Get the full edge ingest URL for a dataset.
323+
Returns None if edge is not configured.
324+
"""
325+
return self._resolve_edge_url(f"v1/ingest/{dataset}")
326+
294327
def _get_edge_query_url(self) -> Optional[str]:
295328
"""
296329
Get the full edge query URL.
297330
Returns None if edge is not configured.
298331
"""
299-
if self._edge_url is not None:
300-
url = self._edge_url.rstrip("/")
301-
parsed = urlparse(url)
302-
path = parsed.path
303-
304-
# If path is empty or just "/", append edge query format
305-
if path == "" or path == "/":
306-
return f"{parsed.scheme}://{parsed.netloc}/v1/query/_apl"
332+
return self._resolve_edge_url("v1/query/_apl")
307333

308-
# edge_url has a custom path, use as-is
309-
return url
310-
311-
if self._edge is not None:
312-
return f"https://{self._edge.rstrip('/')}/v1/query/_apl"
313-
314-
return None
334+
def _get_edge_mpl_url(self) -> Optional[str]:
335+
"""
336+
Get the full edge MPL query URL.
337+
Returns None if edge is not configured.
338+
"""
339+
return self._resolve_edge_url("v1/query/_mpl")
315340

316341
def before_shutdown(self, func: Callable):
317342
self.before_shutdown_funcs.append(func)
@@ -452,6 +477,61 @@ def query(
452477

453478
return result
454479

480+
def mpl_query(self, mpl: str, opts: MplOptions) -> MplResult:
481+
"""
482+
Execute an MPL (Metrics Processing Language) query.
483+
484+
MPL queries are edge-only. Configure the client with edge_url or
485+
edge to specify the endpoint. Edge endpoints require API tokens
486+
(xaat-), not personal tokens.
487+
488+
Args:
489+
mpl: MPL query string (e.g.
490+
"`my-dataset`:`http.requests` | align to 5m using avg")
491+
opts: Query options including required start_time and end_time.
492+
493+
Returns:
494+
MplResult containing the time-series data.
495+
"""
496+
url = self._get_edge_mpl_url()
497+
if url is None:
498+
raise EdgeResolutionError(
499+
"MPL queries require an edge endpoint; "
500+
"set edge_url or edge when creating the Client"
501+
)
502+
if is_personal_token(self._token):
503+
raise PersonalTokenNotSupportedForEdgeError()
504+
505+
payload = ujson.dumps(
506+
self._prepare_mpl_payload(mpl, opts),
507+
default=handle_json_serialization,
508+
)
509+
params = self._prepare_mpl_params(opts)
510+
res = self.session.post(url, data=payload, params=params)
511+
result = from_dict(MplResult, res.json())
512+
result.savedQueryID = res.headers.get("X-Axiom-History-Query-Id")
513+
return result
514+
515+
def _prepare_mpl_payload(
516+
self, mpl: str, opts: MplOptions
517+
) -> Dict[str, object]:
518+
payload: Dict[str, object] = {
519+
"mpl": mpl,
520+
"startTime": format_datetime_rfc3339_utc(opts.start_time),
521+
"endTime": format_datetime_rfc3339_utc(opts.end_time),
522+
}
523+
if opts.params:
524+
payload["params"] = opts.params
525+
if opts.query_options:
526+
payload["queryOptions"] = opts.query_options
527+
return payload
528+
529+
def _prepare_mpl_params(self, opts: MplOptions) -> Dict[str, object]:
530+
params: Dict[str, object] = {"format": "metrics-v2"}
531+
if opts.nocache:
532+
params["nocache"] = "true"
533+
return params
534+
455535
def _prepare_query_options(self, opts: QueryOptions) -> Dict[str, object]:
456536
"""returns the query options as a Dict, handles any renaming for key
457537
fields."""

0 commit comments

Comments
 (0)