#!/usr/bin/env python3
"""
review-bundle.py -- comprehensive pre-deploy review of the Charmed OpenStack
Caracal 2024.1 IPv4-only bundle (VR0 / DC0 / Omega test cloud).

READ-ONLY. Encodes every lesson learned from the 2026-05-28/29/30 deploy
sessions as a fail-closed check. Superset of audit-bundle-fixes.py.

Severities:
  FAIL  deploy-blocker or known regression       -> exit 1
  WARN  review item / possible issue             -> exit 1 only under --strict
  INFO  informational summary                    -> never affects exit

Dependencies: PyYAML only (already used by the existing fix scripts); rest stdlib.
ASCII-only output by design (non-ASCII has caused silent daemon failures here).

Usage:
  python3 review-bundle.py [BUNDLE] [--strict] [--quiet]
    BUNDLE   path to bundle.yaml (default: ./bundle.yaml)
    --strict treat WARN as failing for exit code
    --quiet  suppress PASS/INFO lines (show only WARN/FAIL)
"""

import sys
import argparse
import ipaddress

try:
    import yaml
except ImportError:
    sys.stderr.write("ERROR: PyYAML not installed (pip install pyyaml --break-system-packages)\n")
    sys.exit(2)

# --------------------------------------------------------------------------- #
# Config -- the known-good baseline. Adjust here if the design changes.
# --------------------------------------------------------------------------- #
EXPECTED_APPS = 51
EXPECTED_RELATIONS = 98

PROVIDER_NET = ipaddress.ip_network("10.12.4.0/22")
METAL_NET = ipaddress.ip_network("10.12.8.0/22")
VIP_OCTET_MIN = 224          # MAAS reserved metal VIP range 10.12.8.224-254 (D-020)
VIP_OCTET_MAX = 254

# BUNDLEFIX-001: the 7 per-endpoint binding keys that were phantom and removed.
# Final anchors are {"":metal} and {"":metal, public:provider} -> none of these
# should reappear in any app's effective bindings.
PHANTOM_BINDING_KEYS = {
    "admin", "internal", "shared-db", "amqp", "certificates", "cluster", "ha",
}

# D-020 clustered-API charm -> provider VIP last octet (metal mirrors it).
EXPECTED_CLUSTERED = {
    "barbican": 224, "cinder": 226, "glance": 228, "keystone": 229,
    "magnum": 230, "neutron-api": 231, "nova-cloud-controller": 232,
    "octavia": 233, "openstack-dashboard": 234, "placement": 235,
}

# Verified Caracal channel matrix (from prior charmhub verification).
# WARN-only: channels can be intentionally pinned; flag deviation, do not block.
OPENSTACK_CORE_CHANNEL = "2024.1/stable"
OPENSTACK_CORE_CHARMS = {
    "keystone", "glance", "cinder", "cinder-ceph", "nova-cloud-controller",
    "nova-compute", "neutron-api", "neutron-api-plugin-ovn", "placement",
    "octavia", "barbican", "magnum", "magnum-dashboard", "openstack-dashboard",
    "ceph-radosgw",
}
CHANNEL_MATRIX = {
    "ovn-central": "24.03/stable", "ovn-chassis": "24.03/stable",
    "ceph-mon": "squid/stable", "ceph-osd": "squid/stable", "ceph-fs": "squid/stable",
    "mysql-innodb-cluster": "8.0/stable", "mysql-router": "8.0/stable",
    "rabbitmq-server": "3.9/stable", "vault": "1.8/stable",
}
EXPECTED_BASE = "ubuntu@22.04"   # jammy; Caracal-bundle paradigm (not noble)

MAC_RE = None  # compiled below
import re
MAC_RE = re.compile(r"([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}")

# --------------------------------------------------------------------------- #
# Duplicate-key-detecting YAML loader (PyYAML silently keeps the last dup).
# --------------------------------------------------------------------------- #
_DUP_KEYS = []


class DupKeyLoader(yaml.SafeLoader):
    def construct_mapping(self, node, deep=False):
        seen = set()
        for key_node, _ in node.value:
            try:
                key = self.construct_object(key_node, deep=deep)
            except Exception:
                continue
            if isinstance(key, (str, int, float, bool)) or key is None:
                if key in seen:
                    _DUP_KEYS.append((str(key), key_node.start_mark.line + 1))
                seen.add(key)
        return super().construct_mapping(node, deep)


# --------------------------------------------------------------------------- #
# Reporter
# --------------------------------------------------------------------------- #
class Reporter:
    def __init__(self, quiet=False):
        self.quiet = quiet
        self.rows = []  # (section, level, code, msg)
        self.counts = {"PASS": 0, "WARN": 0, "FAIL": 0, "INFO": 0}

    def add(self, section, level, code, msg):
        self.rows.append((section, level, code, msg))
        self.counts[level] = self.counts.get(level, 0) + 1

    def emit(self):
        section = None
        for sec, level, code, msg in self.rows:
            if self.quiet and level in ("PASS", "INFO"):
                continue
            if sec != section:
                print("\n--- %s ---" % sec)
                section = sec
            print("  [%-4s] %-10s %s" % (level, code, msg))
        print("\n==================== SUMMARY ====================")
        print("  PASS=%d  WARN=%d  FAIL=%d  INFO=%d"
              % (self.counts["PASS"], self.counts["WARN"],
                 self.counts["FAIL"], self.counts["INFO"]))


# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def ep_app(endpoint):
    """'keystone:shared-db' -> 'keystone'. Non-str -> None."""
    if not isinstance(endpoint, str):
        return None
    return endpoint.split(":", 1)[0]


def in_net(addr, net):
    try:
        return ipaddress.ip_address(addr) in net
    except ValueError:
        return False


# --------------------------------------------------------------------------- #
# Checks
# --------------------------------------------------------------------------- #
def check_ascii(R, text):
    sec = "0. Structure / integrity"
    bad = []
    for i, line in enumerate(text.splitlines(), 1):
        for ch in line:
            if ord(ch) > 127:
                bad.append((i, repr(ch)))
                break
    if bad:
        for ln, ch in bad[:20]:
            R.add(sec, "WARN", "NON-ASCII",
                  "non-ASCII char %s on line %d (non-ASCII has caused silent daemon failures here)" % (ch, ln))
        if len(bad) > 20:
            R.add(sec, "WARN", "NON-ASCII", "...and %d more non-ASCII line(s)" % (len(bad) - 20))
    else:
        R.add(sec, "PASS", "ASCII", "file is pure ASCII")


def check_structure(R, doc):
    sec = "0. Structure / integrity"
    if not isinstance(doc, dict):
        R.add(sec, "FAIL", "STRUCT-00", "top-level YAML is not a mapping")
        return None, None
    if _DUP_KEYS:
        for k, ln in _DUP_KEYS:
            R.add(sec, "FAIL", "DUPKEY", "duplicate key '%s' near line %d" % (k, ln))
    else:
        R.add(sec, "PASS", "DUPKEY", "no duplicate keys")

    apps = doc.get("applications")
    rels = doc.get("relations")
    if not isinstance(apps, dict):
        R.add(sec, "FAIL", "STRUCT-APPS", "no 'applications' mapping")
        apps = {}
    if not isinstance(rels, list):
        R.add(sec, "FAIL", "STRUCT-RELS", "no 'relations' list")
        rels = []

    na, nr = len(apps), len(rels)
    R.add(sec, "INFO" if na == EXPECTED_APPS else "WARN", "APP-COUNT",
          "applications=%d (baseline %d)" % (na, EXPECTED_APPS))
    R.add(sec, "INFO" if nr == EXPECTED_RELATIONS else "WARN", "REL-COUNT",
          "relations=%d (baseline %d)" % (nr, EXPECTED_RELATIONS))
    return apps, rels


def check_relations(R, apps, rels):
    sec = "1. Relation integrity"
    bad_shape = miss_colon = dangling = 0
    for r in rels:
        if not (isinstance(r, list) and len(r) == 2):
            R.add(sec, "FAIL", "REL-SHAPE", "relation not a 2-element list: %r" % (r,))
            bad_shape += 1
            continue
        for e in r:
            if not isinstance(e, str) or ":" not in e:
                R.add(sec, "FAIL", "REL-COLON", "endpoint missing colon: %r in %r" % (e, r))
                miss_colon += 1
            else:
                a = ep_app(e)
                if a not in apps:
                    R.add(sec, "FAIL", "REL-DANGLE",
                          "endpoint references unknown app '%s' in %r" % (a, r))
                    dangling += 1
    if not (bad_shape or miss_colon or dangling):
        R.add(sec, "PASS", "REL-INT",
              "all relations well-formed, colon-explicit, both ends resolve to apps")


def check_bindings_phantom(R, apps):
    sec = "2. BUNDLEFIX-001 (phantom binding keys)"
    hits = 0
    for name, spec in apps.items():
        b = (spec or {}).get("bindings")
        if not isinstance(b, dict):
            continue
        bad = sorted(set(b.keys()) & PHANTOM_BINDING_KEYS)
        if bad:
            R.add(sec, "FAIL", "PHANTOM",
                  "%s has phantom per-endpoint binding key(s): %s" % (name, ", ".join(bad)))
            hits += 1
    if not hits:
        R.add(sec, "PASS", "PHANTOM",
              "no app reintroduces a removed phantom binding key (%s)"
              % ", ".join(sorted(PHANTOM_BINDING_KEYS)))


def check_vault(R, apps, rels):
    sec = "3. BUNDLEFIX-002 (vault de-HA)"
    v = apps.get("vault")
    if v is None:
        R.add(sec, "WARN", "VAULT", "no 'vault' app found")
        return
    opts = (v or {}).get("options") or {}
    if "vip" in opts:
        R.add(sec, "FAIL", "VAULT-VIP", "vault has a 'vip' option (must be de-HA'd): %r" % opts["vip"])
    else:
        R.add(sec, "PASS", "VAULT-VIP", "vault has no vip")
    if "os-public-hostname" in opts:
        R.add(sec, "WARN", "VAULT-HOST", "vault has os-public-hostname (expected removed)")
    if "vault-hacluster" in apps:
        R.add(sec, "FAIL", "VAULT-HA", "vault-hacluster application is present (must be removed)")
    else:
        R.add(sec, "PASS", "VAULT-HA", "no vault-hacluster application")
    for r in rels:
        if isinstance(r, list) and any(isinstance(e, str) and e.startswith("vault:ha") for e in r):
            R.add(sec, "FAIL", "VAULT-HAREL", "vault:ha relation present: %r" % (r,))


def map_hacluster(apps, rels):
    """principal -> hacluster_app_name, using charm==hacluster + the :ha relation."""
    hac_apps = {n for n, s in apps.items() if (s or {}).get("charm") == "hacluster"}
    principal_of = {}
    for r in rels:
        if not (isinstance(r, list) and len(r) == 2):
            continue
        a0, a1 = ep_app(r[0]), ep_app(r[1])
        if a0 in hac_apps and a1 and a1 not in hac_apps:
            principal_of[a1] = a0
        elif a1 in hac_apps and a0 and a0 not in hac_apps:
            principal_of[a0] = a1
    return hac_apps, principal_of


def check_hacluster(R, apps, rels):
    sec = "4. BUNDLEFIX-003 (hacluster cluster_count)"
    hac_apps, principal_of = map_hacluster(apps, rels)
    if not hac_apps:
        R.add(sec, "WARN", "HAC", "no hacluster apps found")
        return principal_of
    principal_for_hac = {h: p for p, h in principal_of.items()}
    ok = 0
    for h in sorted(hac_apps):
        opts = (apps[h].get("options") or {})
        cc = opts.get("cluster_count")
        prin = principal_for_hac.get(h)
        nu = (apps.get(prin, {}) or {}).get("num_units") if prin else None
        if cc is None:
            R.add(sec, "FAIL", "HAC-CC", "%s missing cluster_count" % h)
            continue
        if not prin:
            R.add(sec, "WARN", "HAC-PRIN", "%s has no principal via :ha relation" % h)
        if isinstance(nu, int) and cc > nu:
            R.add(sec, "FAIL", "HAC-OVER",
                  "%s cluster_count=%s > principal %s num_units=%s" % (h, cc, prin, nu))
            continue
        if cc != 1:
            R.add(sec, "WARN", "HAC-NE1",
                  "%s cluster_count=%s (testcloud baseline is 1)" % (h, cc))
        else:
            ok += 1
    if ok:
        R.add(sec, "PASS", "HAC", "%d hacluster app(s) cluster_count=1 and <= principal num_units" % ok)


def check_memcached(R, apps, rels):
    sec = "5. BUNDLEFIX-004 (memcached)"
    if "memcached" not in apps:
        R.add(sec, "FAIL", "MEMCACHE-APP", "no 'memcached' application")
    else:
        R.add(sec, "PASS", "MEMCACHE-APP", "memcached application present")
    found = False
    for r in rels:
        if not (isinstance(r, list) and len(r) == 2):
            continue
        s = set()
        for e in r:
            if isinstance(e, str):
                s.add(e)
        if {"nova-cloud-controller:memcache", "memcached:cache"} <= s:
            found = True
    R.add(sec, "PASS" if found else "FAIL", "MEMCACHE-REL",
          "nova-cloud-controller:memcache <-> memcached:cache relation %s"
          % ("present" if found else "MISSING"))


def check_router_bindings(R, apps):
    sec = "6. BUNDLEFIX-005 (mysql-router metal binding)"
    routers = [n for n, s in apps.items() if (s or {}).get("charm") == "mysql-router"]
    if not routers:
        R.add(sec, "WARN", "ROUTER", "no mysql-router apps found")
        return
    bad = 0
    for n in sorted(routers):
        b = (apps[n].get("bindings") or {})
        # effective default space is the "" key; anchors already resolved by yaml
        default = b.get("", None)
        non_metal = {k: v for k, v in b.items() if v not in ("metal",)}
        if default == "metal" and not non_metal:
            continue
        if default != "metal":
            R.add(sec, "FAIL", "ROUTER-BIND",
                  "%s default space binding is %r (expected metal)" % (n, default))
            bad += 1
        elif non_metal:
            R.add(sec, "WARN", "ROUTER-BIND",
                  "%s has non-metal endpoint binding(s): %r" % (n, non_metal))
    if not bad:
        R.add(sec, "PASS", "ROUTER-BIND",
              "%d mysql-router app(s) bound to metal" % len(routers))


def check_vips(R, apps, rels):
    sec = "7. BUNDLEFIX-006 / D-020 (dual provider+metal VIPs)"
    _, principal_of = map_hacluster(apps, rels)
    clustered = sorted(principal_of.keys())
    # set comparison vs expected D-020 clustered set
    got = set(clustered)
    exp = set(EXPECTED_CLUSTERED)
    if got != exp:
        if exp - got:
            R.add(sec, "WARN", "VIP-SET", "expected-clustered apps NOT detected as clustered: %s"
                  % ", ".join(sorted(exp - got)))
        if got - exp:
            R.add(sec, "WARN", "VIP-SET", "clustered apps beyond the D-020 set: %s"
                  % ", ".join(sorted(got - exp)))
    ok = 0
    for name in clustered:
        opts = (apps[name].get("options") or {})
        vip = opts.get("vip")
        if not vip:
            R.add(sec, "FAIL", "VIP-MISS", "%s is clustered but has no vip" % name)
            continue
        parts = str(vip).split()
        if len(parts) != 2:
            R.add(sec, "FAIL", "VIP-DUAL", "%s vip is not dual (got %r)" % (name, vip))
            continue
        prov, metal = parts
        if not in_net(prov, PROVIDER_NET):
            R.add(sec, "FAIL", "VIP-PROV", "%s provider vip %s not in %s" % (name, prov, PROVIDER_NET))
            continue
        if not in_net(metal, METAL_NET):
            R.add(sec, "FAIL", "VIP-METAL", "%s metal vip %s not in %s" % (name, metal, METAL_NET))
            continue
        po, mo = int(prov.split(".")[-1]), int(metal.split(".")[-1])
        if po != mo:
            R.add(sec, "FAIL", "VIP-MIRROR", "%s octets differ: provider .%d vs metal .%d" % (name, po, mo))
            continue
        if not (VIP_OCTET_MIN <= mo <= VIP_OCTET_MAX):
            R.add(sec, "FAIL", "VIP-RANGE",
                  "%s metal vip octet .%d outside reserved %d-%d" % (name, mo, VIP_OCTET_MIN, VIP_OCTET_MAX))
            continue
        expected_octet = EXPECTED_CLUSTERED.get(name)
        if expected_octet is not None and po != expected_octet:
            R.add(sec, "WARN", "VIP-OCTET",
                  "%s vip octet .%d != D-020 map .%d" % (name, po, expected_octet))
        ok += 1
    if ok:
        R.add(sec, "PASS", "VIP-DUAL",
              "%d clustered API charm(s) have mirrored dual VIPs in the reserved range" % ok)


def check_osd(R, apps):
    sec = "8. Anti-pattern: ceph-osd osd-devices"
    osds = [n for n, s in apps.items() if (s or {}).get("charm") == "ceph-osd"]
    if not osds:
        R.add(sec, "WARN", "OSD", "no ceph-osd app found")
        return
    for n in osds:
        dev = (apps[n].get("options") or {}).get("osd-devices")
        if not dev or not isinstance(dev, str) or not dev.strip().startswith("/"):
            R.add(sec, "FAIL", "OSD-DEV", "%s osd-devices not a real path: %r" % (n, dev))
        else:
            note = ""
            if "/dev/disk/by-" not in dev:
                note = " (kernel-name; by-path/by-id is harder for bare metal -- Roosevelt note)"
            R.add(sec, "PASS", "OSD-DEV", "%s osd-devices=%s%s" % (n, dev.strip(), note))


def check_ovn(R, apps):
    sec = "9. Anti-pattern: ovn-chassis mappings (MAC over NIC name)"
    chassis = [n for n, s in apps.items() if (s or {}).get("charm") == "ovn-chassis"]
    if not chassis:
        R.add(sec, "WARN", "OVN", "no ovn-chassis app found")
        return
    for n in sorted(chassis):
        opts = (apps[n].get("options") or {})
        bim = opts.get("bridge-interface-mappings")
        if not bim:
            R.add(sec, "INFO", "OVN-BIM", "%s has no bridge-interface-mappings (expected for octavia-side chassis)" % n)
            continue
        if MAC_RE.search(str(bim)):
            R.add(sec, "PASS", "OVN-BIM", "%s bridge-interface-mappings is MAC-based" % n)
        else:
            R.add(sec, "WARN", "OVN-BIM",
                  "%s bridge-interface-mappings has no MAC (NIC-name? fragile): %r" % (n, bim))


def check_os_networks(R, apps, rels):
    sec = "10. D-020: spaces-native (no os-*-network pinning)"
    _, principal_of = map_hacluster(apps, rels)
    flagged = 0
    for name in sorted(principal_of):
        opts = (apps[name].get("options") or {})
        for k in ("os-internal-network", "os-admin-network", "os-public-network"):
            if k in opts:
                R.add(sec, "WARN", "OS-NET",
                      "%s sets %s (D-020 found spaces-native resolve sufficient; verify intent)" % (name, k))
                flagged += 1
    if not flagged:
        R.add(sec, "PASS", "OS-NET", "no clustered charm pins os-*-network (spaces-native, per D-020)")


def expected_channel(charm):
    if charm in CHANNEL_MATRIX:
        return CHANNEL_MATRIX[charm]
    if charm in OPENSTACK_CORE_CHARMS:
        return OPENSTACK_CORE_CHANNEL
    return None


def check_channels_base(R, apps):
    sec = "11. Channels / base (verified Caracal matrix; WARN-only)"
    mismatch = 0
    for name, spec in sorted(apps.items()):
        spec = spec or {}
        charm = spec.get("charm")
        ch = spec.get("channel")
        exp = expected_channel(charm)
        if exp and ch and ch != exp:
            R.add(sec, "WARN", "CHANNEL", "%s (%s) channel=%s expected=%s" % (name, charm, ch, exp))
            mismatch += 1
        base = spec.get("base")
        series = spec.get("series")
        if base and base != EXPECTED_BASE:
            R.add(sec, "WARN", "BASE", "%s base=%s expected=%s" % (name, base, EXPECTED_BASE))
        if series and series not in ("jammy",):
            R.add(sec, "WARN", "SERIES", "%s series=%s expected=jammy" % (name, series))
    if not mismatch:
        R.add(sec, "PASS", "CHANNEL", "no charm deviates from the known Caracal channel matrix")


def summary_tables(R, apps, rels):
    sec = "12. Inventory (informational)"
    _, principal_of = map_hacluster(apps, rels)
    for name in sorted(principal_of):
        vip = ((apps[name].get("options") or {}).get("vip"))
        R.add(sec, "INFO", "CLUSTERED", "%-26s vip=%s" % (name, vip))
    routers = sorted(n for n, s in apps.items() if (s or {}).get("charm") == "mysql-router")
    R.add(sec, "INFO", "ROUTERS", "%d mysql-router apps: %s" % (len(routers), ", ".join(routers)))


# --------------------------------------------------------------------------- #
# Main
# --------------------------------------------------------------------------- #
def main():
    ap = argparse.ArgumentParser(description="Comprehensive Caracal bundle reviewer (read-only).")
    ap.add_argument("bundle", nargs="?", default="bundle.yaml")
    ap.add_argument("--strict", action="store_true", help="treat WARN as failing for exit code")
    ap.add_argument("--quiet", action="store_true", help="show only WARN/FAIL")
    args = ap.parse_args()

    try:
        with open(args.bundle, "r", encoding="utf-8", errors="replace") as fh:
            text = fh.read()
    except FileNotFoundError:
        sys.stderr.write("ERROR: bundle not found: %s\n" % args.bundle)
        return 2

    try:
        doc = yaml.load(text, Loader=DupKeyLoader)
    except yaml.YAMLError as e:
        sys.stderr.write("ERROR: YAML parse failed: %s\n" % e)
        return 2

    R = Reporter(quiet=args.quiet)
    print("================ Caracal v1 bundle review: %s ================" % args.bundle)

    check_ascii(R, text)
    apps, rels = check_structure(R, doc)
    if apps is None:
        R.emit()
        return 1
    check_relations(R, apps, rels)
    check_bindings_phantom(R, apps)
    check_vault(R, apps, rels)
    check_hacluster(R, apps, rels)
    check_memcached(R, apps, rels)
    check_router_bindings(R, apps)
    check_vips(R, apps, rels)
    check_osd(R, apps)
    check_ovn(R, apps)
    check_os_networks(R, apps, rels)
    check_channels_base(R, apps)
    summary_tables(R, apps, rels)

    R.emit()
    fail = R.counts["FAIL"] > 0
    warn = R.counts["WARN"] > 0
    if fail or (args.strict and warn):
        print("\nVERDICT: NOT CLEAN" + (" (--strict: WARN counts)" if (warn and not fail) else ""))
        return 1
    print("\nVERDICT: CLEAN" + (" (with WARN review items)" if warn else ""))
    return 0


if __name__ == "__main__":
    sys.exit(main())
