|
| 1 | +# Copyright 2024-2026 The DNS-AID Authors |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +"""Enumerate the DNS-AID records published in a zone, via a DNS backend. |
| 5 | +
|
| 6 | +Under draft-mozleywilliams-dnsop-dnsaid-02 an agent's primary owner is the |
| 7 | +**flat** FQDN ``{name}.{domain}`` (SVCB + companion TXT). The pre-flat |
| 8 | +listing logic filtered records by the substring ``_agents`` in the owner |
| 9 | +label, which only ever matched the organization index (``_index._agents``) |
| 10 | +and walkable aliases (``{name}._agents``) — it silently missed every flat |
| 11 | +agent owner. This module identifies DNS-AID records by *structure* instead of |
| 12 | +by a name substring, so flat owners are surfaced. |
| 13 | +
|
| 14 | +A record is a DNS-AID record when it is: |
| 15 | +
|
| 16 | +* an **SVCB** record — every published agent (flat owner) and every walkable |
| 17 | + alias has exactly one; |
| 18 | +* a **TXT** record sharing an owner with an SVCB record — the companion |
| 19 | + capability/metadata TXT of a flat agent owner; |
| 20 | +* a record in the ``_agents`` bookkeeping namespace — the organization index |
| 21 | + (``_index._agents``) and walkable-alias owners. |
| 22 | +
|
| 23 | +System records (SOA/NS/A/AAAA/CNAME/...) are excluded. Used by ``dns-aid |
| 24 | +list`` (CLI) and the ``list_published_agents`` MCP tool so both interfaces |
| 25 | +report the same, correct set. |
| 26 | +""" |
| 27 | + |
| 28 | +from __future__ import annotations |
| 29 | + |
| 30 | +from typing import TYPE_CHECKING, Any |
| 31 | + |
| 32 | +if TYPE_CHECKING: |
| 33 | + from dns_aid.backends.base import DNSBackend |
| 34 | + |
| 35 | +# The label that marks the DNS-AID bookkeeping namespace (index + walkable |
| 36 | +# aliases). Agent *primary* owners are flat and do NOT contain it — that is |
| 37 | +# exactly why a substring filter on it is insufficient on its own. |
| 38 | +_AGENTS_LABEL = "_agents" |
| 39 | + |
| 40 | + |
| 41 | +def _owner(record: dict[str, Any]) -> str: |
| 42 | + """Stable owner key for a record: name-in-zone, falling back to the FQDN.""" |
| 43 | + return record.get("name") or record.get("fqdn", "") |
| 44 | + |
| 45 | + |
| 46 | +async def list_dns_aid_records(backend: DNSBackend, domain: str) -> list[dict[str, Any]]: |
| 47 | + """Return the DNS-AID records published under ``domain``. |
| 48 | +
|
| 49 | + Issues two type-filtered backend queries (SVCB, TXT) so unrelated system |
| 50 | + records are never fetched. The result preserves the backend's record-dict |
| 51 | + shape (``name``, ``fqdn``, ``type``, ``ttl``, ``values``, ``id``) and |
| 52 | + de-duplicates by ``(fqdn, type)``. |
| 53 | +
|
| 54 | + Args: |
| 55 | + backend: DNS backend to query. |
| 56 | + domain: Zone to enumerate. |
| 57 | +
|
| 58 | + Returns: |
| 59 | + DNS-AID records (SVCB owners + walkable aliases + companion TXT + |
| 60 | + the ``_agents`` bookkeeping records), in backend order. |
| 61 | + """ |
| 62 | + svcb_records = [r async for r in backend.list_records(domain, record_type="SVCB")] |
| 63 | + txt_records = [r async for r in backend.list_records(domain, record_type="TXT")] |
| 64 | + |
| 65 | + svcb_owners = {_owner(r) for r in svcb_records} |
| 66 | + |
| 67 | + out: list[dict[str, Any]] = list(svcb_records) |
| 68 | + for record in txt_records: |
| 69 | + owner = _owner(record) |
| 70 | + # Companion TXT of a flat agent owner, or an `_agents` bookkeeping TXT. |
| 71 | + if owner in svcb_owners or _AGENTS_LABEL in owner: |
| 72 | + out.append(record) |
| 73 | + |
| 74 | + # De-duplicate defensively (a backend could surface a record under both |
| 75 | + # queries); keep first occurrence and preserve order. |
| 76 | + seen: set[tuple[str, str]] = set() |
| 77 | + deduped: list[dict[str, Any]] = [] |
| 78 | + for record in out: |
| 79 | + key = (record.get("fqdn", ""), record.get("type", "")) |
| 80 | + if key in seen: |
| 81 | + continue |
| 82 | + seen.add(key) |
| 83 | + deduped.append(record) |
| 84 | + return deduped |
0 commit comments