|
| 1 | +# SPDX-License-Identifier: MIT |
| 2 | +"""Multi-encoding evasion decoder for untrusted text. |
| 3 | +
|
| 4 | +Opt-in pre-processor — not part of the default ``clean()`` pipeline. |
| 5 | +Callers compose it with ``clean()`` explicitly:: |
| 6 | +
|
| 7 | + text = decode_evasion(user_input) # peel encoding layers |
| 8 | + cleaned = clean(text, escaper=...) # sanitize |
| 9 | +
|
| 10 | +Iteratively decodes URL encoding, HTML entities, and hex escapes |
| 11 | +(``\\xHH``). Stops when a full pass produces no changes or |
| 12 | +*max_layers* is reached. |
| 13 | +""" |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import html |
| 18 | +import logging |
| 19 | +import re |
| 20 | +import urllib.parse |
| 21 | + |
| 22 | +logger = logging.getLogger("navi_sanitize") |
| 23 | + |
| 24 | +MAX_DECODE_LAYERS: int = 3 |
| 25 | + |
| 26 | +_HEX_RE = re.compile(r"\\x([0-9a-fA-F]{2})") |
| 27 | + |
| 28 | + |
| 29 | +def _decode_url(s: str) -> str: |
| 30 | + """Decode URL percent-encoding, preserving malformed byte sequences. |
| 31 | +
|
| 32 | + Valid UTF-8 percent-encoded sequences decode normally. |
| 33 | + Invalid byte sequences (e.g. ``%FF``, lone ``%80``) are kept |
| 34 | + as literal percent-encoded text instead of being replaced with |
| 35 | + U+FFFD. |
| 36 | + """ |
| 37 | + raw = urllib.parse.unquote_to_bytes(s) |
| 38 | + # Decode valid UTF-8; map invalid bytes to surrogates so we can |
| 39 | + # re-encode them back to %XX in the next step. |
| 40 | + text = raw.decode("utf-8", errors="surrogateescape") |
| 41 | + # Re-encode any lone surrogates (from invalid bytes) back to %XX |
| 42 | + parts: list[str] = [] |
| 43 | + for ch in text: |
| 44 | + if "\udc80" <= ch <= "\udcff": |
| 45 | + parts.append(f"%{ord(ch) & 0xFF:02X}") |
| 46 | + else: |
| 47 | + parts.append(ch) |
| 48 | + return "".join(parts) |
| 49 | + |
| 50 | + |
| 51 | +def _decode_html_entities(s: str) -> str: |
| 52 | + """Decode HTML/XML character entities.""" |
| 53 | + return html.unescape(s) |
| 54 | + |
| 55 | + |
| 56 | +def _decode_hex_escapes(s: str) -> str: |
| 57 | + r"""Decode literal ``\xHH`` escape sequences.""" |
| 58 | + |
| 59 | + def _replace(m: re.Match[str]) -> str: |
| 60 | + return chr(int(m.group(1), 16)) |
| 61 | + |
| 62 | + return _HEX_RE.sub(_replace, s) |
| 63 | + |
| 64 | + |
| 65 | +def decode_evasion(text: str, *, max_layers: int = MAX_DECODE_LAYERS) -> str: |
| 66 | + """Iteratively decode nested encodings from *text*. |
| 67 | +
|
| 68 | + Runs URL decoding, HTML entity unescaping, and hex escape decoding |
| 69 | + in sequence as a single pass. A pass counts as one layer if the |
| 70 | + output differs from the input. Stops when a pass produces no |
| 71 | + changes or *max_layers* is reached. |
| 72 | +
|
| 73 | + Logs a warning with the layer count when decoding occurs. Never |
| 74 | + includes decoded content in log messages. |
| 75 | +
|
| 76 | + Never errors on invalid or partial encodings — they pass through |
| 77 | + unchanged. |
| 78 | + """ |
| 79 | + if max_layers <= 0: |
| 80 | + return text |
| 81 | + |
| 82 | + layers = 0 |
| 83 | + for _ in range(max_layers): |
| 84 | + # Run all three decoders in sequence (one pass) |
| 85 | + decoded = _decode_url(text) |
| 86 | + decoded = _decode_html_entities(decoded) |
| 87 | + decoded = _decode_hex_escapes(decoded) |
| 88 | + # "changed" = output differs from input for this pass |
| 89 | + if decoded == text: |
| 90 | + break |
| 91 | + text = decoded |
| 92 | + layers += 1 |
| 93 | + |
| 94 | + if layers: |
| 95 | + logger.warning("Decoded %d encoding layer(s) from value", layers) |
| 96 | + |
| 97 | + return text |
0 commit comments