#!/usr/bin/env python3
"""
review-bundle.py -- comprehensive pre-deploy review of the Charmed OpenStack
Caracal 2024.1 IPv4-only bundle (VR0 / DC0 / Omega test cloud).
READ-ONLY. Encodes every lesson learned from the 2026-05-28/29/30 deploy
sessions as a fail-closed check. Superset of audit-bundle-fixes.py.
Severities:
FAIL deploy-blocker or known regression -> exit 1
WARN review item / possible issue -> exit 1 only under --strict
INFO informational summary -> never affects exit
Dependencies: PyYAML only (already used by the existing fix scripts); rest stdlib.
ASCII-only output by design (non-ASCII has caused silent daemon failures here).
Usage:
python3 review-bundle.py [BUNDLE] [--strict] [--quiet]
BUNDLE path to bundle.yaml (default: ./bundle.yaml)
--strict treat WARN as failing for exit code
--quiet suppress PASS/INFO lines (show only WARN/FAIL)
"""
import sys
import argparse
import ipaddress
try:
import yaml
except ImportError:
sys.stderr.write("ERROR: PyYAML not installed (pip install pyyaml --break-system-packages)\n")
sys.exit(2)
# --------------------------------------------------------------------------- #
# Config -- the known-good baseline. Adjust here if the design changes.
# --------------------------------------------------------------------------- #
EXPECTED_APPS = 51
EXPECTED_RELATIONS = 98
PROVIDER_NET = ipaddress.ip_network("10.12.4.0/22")
METAL_NET = ipaddress.ip_network("10.12.8.0/22")
VIP_OCTET_MIN = 224 # MAAS reserved metal VIP range 10.12.8.224-254 (D-020)
VIP_OCTET_MAX = 254
# BUNDLEFIX-001: the 7 per-endpoint binding keys that were phantom and removed.
# Final anchors are {"":metal} and {"":metal, public:provider} -> none of these
# should reappear in any app's effective bindings.
PHANTOM_BINDING_KEYS = {
"admin", "internal", "shared-db", "amqp", "certificates", "cluster", "ha",
}
# D-020 clustered-API charm -> provider VIP last octet (metal mirrors it).
EXPECTED_CLUSTERED = {
"barbican": 224, "cinder": 226, "glance": 228, "keystone": 229,
"magnum": 230, "neutron-api": 231, "nova-cloud-controller": 232,
"octavia": 233, "openstack-dashboard": 234, "placement": 235,
}
# Verified Caracal channel matrix (from prior charmhub verification).
# WARN-only: channels can be intentionally pinned; flag deviation, do not block.
OPENSTACK_CORE_CHANNEL = "2024.1/stable"
OPENSTACK_CORE_CHARMS = {
"keystone", "glance", "cinder", "cinder-ceph", "nova-cloud-controller",
"nova-compute", "neutron-api", "neutron-api-plugin-ovn", "placement",
"octavia", "barbican", "magnum", "magnum-dashboard", "openstack-dashboard",
"ceph-radosgw",
}
CHANNEL_MATRIX = {
"ovn-central": "24.03/stable", "ovn-chassis": "24.03/stable",
"ceph-mon": "squid/stable", "ceph-osd": "squid/stable", "ceph-fs": "squid/stable",
"mysql-innodb-cluster": "8.0/stable", "mysql-router": "8.0/stable",
"rabbitmq-server": "3.9/stable", "vault": "1.8/stable",
}
EXPECTED_BASE = "ubuntu@22.04" # jammy; Caracal-bundle paradigm (not noble)
MAC_RE = None # compiled below
import re
MAC_RE = re.compile(r"([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}")
# --------------------------------------------------------------------------- #
# Duplicate-key-detecting YAML loader (PyYAML silently keeps the last dup).
# --------------------------------------------------------------------------- #
_DUP_KEYS = []
class DupKeyLoader(yaml.SafeLoader):
def construct_mapping(self, node, deep=False):
seen = set()
for key_node, _ in node.value:
try:
key = self.construct_object(key_node, deep=deep)
except Exception:
continue
if isinstance(key, (str, int, float, bool)) or key is None:
if key in seen:
_DUP_KEYS.append((str(key), key_node.start_mark.line + 1))
seen.add(key)
return super().construct_mapping(node, deep)
# --------------------------------------------------------------------------- #
# Reporter
# --------------------------------------------------------------------------- #
class Reporter:
def __init__(self, quiet=False):
self.quiet = quiet
self.rows = [] # (section, level, code, msg)
self.counts = {"PASS": 0, "WARN": 0, "FAIL": 0, "INFO": 0}
def add(self, section, level, code, msg):
self.rows.append((section, level, code, msg))
self.counts[level] = self.counts.get(level, 0) + 1
def emit(self):
section = None
for sec, level, code, msg in self.rows:
if self.quiet and level in ("PASS", "INFO"):
continue
if sec != section:
print("\n--- %s ---" % sec)
section = sec
print(" [%-4s] %-10s %s" % (level, code, msg))
print("\n==================== SUMMARY ====================")
print(" PASS=%d WARN=%d FAIL=%d INFO=%d"
% (self.counts["PASS"], self.counts["WARN"],
self.counts["FAIL"], self.counts["INFO"]))
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def ep_app(endpoint):
"""'keystone:shared-db' -> 'keystone'. Non-str -> None."""
if not isinstance(endpoint, str):
return None
return endpoint.split(":", 1)[0]
def in_net(addr, net):
try:
return ipaddress.ip_address(addr) in net
except ValueError:
return False
# --------------------------------------------------------------------------- #
# Checks
# --------------------------------------------------------------------------- #
def check_ascii(R, text):
sec = "0. Structure / integrity"
bad = []
for i, line in enumerate(text.splitlines(), 1):
for ch in line:
if ord(ch) > 127:
bad.append((i, repr(ch)))
break
if bad:
for ln, ch in bad[:20]:
R.add(sec, "WARN", "NON-ASCII",
"non-ASCII char %s on line %d (non-ASCII has caused silent daemon failures here)" % (ch, ln))
if len(bad) > 20:
R.add(sec, "WARN", "NON-ASCII", "...and %d more non-ASCII line(s)" % (len(bad) - 20))
else:
R.add(sec, "PASS", "ASCII", "file is pure ASCII")
def check_structure(R, doc):
sec = "0. Structure / integrity"
if not isinstance(doc, dict):
R.add(sec, "FAIL", "STRUCT-00", "top-level YAML is not a mapping")
return None, None
if _DUP_KEYS:
for k, ln in _DUP_KEYS:
R.add(sec, "FAIL", "DUPKEY", "duplicate key '%s' near line %d" % (k, ln))
else:
R.add(sec, "PASS", "DUPKEY", "no duplicate keys")
apps = doc.get("applications")
rels = doc.get("relations")
if not isinstance(apps, dict):
R.add(sec, "FAIL", "STRUCT-APPS", "no 'applications' mapping")
apps = {}
if not isinstance(rels, list):
R.add(sec, "FAIL", "STRUCT-RELS", "no 'relations' list")
rels = []
na, nr = len(apps), len(rels)
R.add(sec, "INFO" if na == EXPECTED_APPS else "WARN", "APP-COUNT",
"applications=%d (baseline %d)" % (na, EXPECTED_APPS))
R.add(sec, "INFO" if nr == EXPECTED_RELATIONS else "WARN", "REL-COUNT",
"relations=%d (baseline %d)" % (nr, EXPECTED_RELATIONS))
return apps, rels
def check_relations(R, apps, rels):
sec = "1. Relation integrity"
bad_shape = miss_colon = dangling = 0
for r in rels:
if not (isinstance(r, list) and len(r) == 2):
R.add(sec, "FAIL", "REL-SHAPE", "relation not a 2-element list: %r" % (r,))
bad_shape += 1
continue
for e in r:
if not isinstance(e, str) or ":" not in e:
R.add(sec, "FAIL", "REL-COLON", "endpoint missing colon: %r in %r" % (e, r))
miss_colon += 1
else:
a = ep_app(e)
if a not in apps:
R.add(sec, "FAIL", "REL-DANGLE",
"endpoint references unknown app '%s' in %r" % (a, r))
dangling += 1
if not (bad_shape or miss_colon or dangling):
R.add(sec, "PASS", "REL-INT",
"all relations well-formed, colon-explicit, both ends resolve to apps")
def check_bindings_phantom(R, apps):
sec = "2. BUNDLEFIX-001 (phantom binding keys)"
hits = 0
for name, spec in apps.items():
b = (spec or {}).get("bindings")
if not isinstance(b, dict):
continue
bad = sorted(set(b.keys()) & PHANTOM_BINDING_KEYS)
if bad:
R.add(sec, "FAIL", "PHANTOM",
"%s has phantom per-endpoint binding key(s): %s" % (name, ", ".join(bad)))
hits += 1
if not hits:
R.add(sec, "PASS", "PHANTOM",
"no app reintroduces a removed phantom binding key (%s)"
% ", ".join(sorted(PHANTOM_BINDING_KEYS)))
def check_vault(R, apps, rels):
sec = "3. BUNDLEFIX-002 (vault de-HA)"
v = apps.get("vault")
if v is None:
R.add(sec, "WARN", "VAULT", "no 'vault' app found")
return
opts = (v or {}).get("options") or {}
if "vip" in opts:
R.add(sec, "FAIL", "VAULT-VIP", "vault has a 'vip' option (must be de-HA'd): %r" % opts["vip"])
else:
R.add(sec, "PASS", "VAULT-VIP", "vault has no vip")
if "os-public-hostname" in opts:
R.add(sec, "WARN", "VAULT-HOST", "vault has os-public-hostname (expected removed)")
if "vault-hacluster" in apps:
R.add(sec, "FAIL", "VAULT-HA", "vault-hacluster application is present (must be removed)")
else:
R.add(sec, "PASS", "VAULT-HA", "no vault-hacluster application")
for r in rels:
if isinstance(r, list) and any(isinstance(e, str) and e.startswith("vault:ha") for e in r):
R.add(sec, "FAIL", "VAULT-HAREL", "vault:ha relation present: %r" % (r,))
def map_hacluster(apps, rels):
"""principal -> hacluster_app_name, using charm==hacluster + the :ha relation."""
hac_apps = {n for n, s in apps.items() if (s or {}).get("charm") == "hacluster"}
principal_of = {}
for r in rels:
if not (isinstance(r, list) and len(r) == 2):
continue
a0, a1 = ep_app(r[0]), ep_app(r[1])
if a0 in hac_apps and a1 and a1 not in hac_apps:
principal_of[a1] = a0
elif a1 in hac_apps and a0 and a0 not in hac_apps:
principal_of[a0] = a1
return hac_apps, principal_of
def check_hacluster(R, apps, rels):
sec = "4. BUNDLEFIX-003 (hacluster cluster_count)"
hac_apps, principal_of = map_hacluster(apps, rels)
if not hac_apps:
R.add(sec, "WARN", "HAC", "no hacluster apps found")
return principal_of
principal_for_hac = {h: p for p, h in principal_of.items()}
ok = 0
for h in sorted(hac_apps):
opts = (apps[h].get("options") or {})
cc = opts.get("cluster_count")
prin = principal_for_hac.get(h)
nu = (apps.get(prin, {}) or {}).get("num_units") if prin else None
if cc is None:
R.add(sec, "FAIL", "HAC-CC", "%s missing cluster_count" % h)
continue
if not prin:
R.add(sec, "WARN", "HAC-PRIN", "%s has no principal via :ha relation" % h)
if isinstance(nu, int) and cc > nu:
R.add(sec, "FAIL", "HAC-OVER",
"%s cluster_count=%s > principal %s num_units=%s" % (h, cc, prin, nu))
continue
if cc != 1:
R.add(sec, "WARN", "HAC-NE1",
"%s cluster_count=%s (testcloud baseline is 1)" % (h, cc))
else:
ok += 1
if ok:
R.add(sec, "PASS", "HAC", "%d hacluster app(s) cluster_count=1 and <= principal num_units" % ok)
def check_memcached(R, apps, rels):
sec = "5. BUNDLEFIX-004 (memcached)"
if "memcached" not in apps:
R.add(sec, "FAIL", "MEMCACHE-APP", "no 'memcached' application")
else:
R.add(sec, "PASS", "MEMCACHE-APP", "memcached application present")
found = False
for r in rels:
if not (isinstance(r, list) and len(r) == 2):
continue
s = set()
for e in r:
if isinstance(e, str):
s.add(e)
if {"nova-cloud-controller:memcache", "memcached:cache"} <= s:
found = True
R.add(sec, "PASS" if found else "FAIL", "MEMCACHE-REL",
"nova-cloud-controller:memcache <-> memcached:cache relation %s"
% ("present" if found else "MISSING"))
def check_router_bindings(R, apps):
sec = "6. BUNDLEFIX-005 (mysql-router metal binding)"
routers = [n for n, s in apps.items() if (s or {}).get("charm") == "mysql-router"]
if not routers:
R.add(sec, "WARN", "ROUTER", "no mysql-router apps found")
return
bad = 0
for n in sorted(routers):
b = (apps[n].get("bindings") or {})
# effective default space is the "" key; anchors already resolved by yaml
default = b.get("", None)
non_metal = {k: v for k, v in b.items() if v not in ("metal",)}
if default == "metal" and not non_metal:
continue
if default != "metal":
R.add(sec, "FAIL", "ROUTER-BIND",
"%s default space binding is %r (expected metal)" % (n, default))
bad += 1
elif non_metal:
R.add(sec, "WARN", "ROUTER-BIND",
"%s has non-metal endpoint binding(s): %r" % (n, non_metal))
if not bad:
R.add(sec, "PASS", "ROUTER-BIND",
"%d mysql-router app(s) bound to metal" % len(routers))
def check_vips(R, apps, rels):
sec = "7. BUNDLEFIX-006 / D-020 (dual provider+metal VIPs)"
_, principal_of = map_hacluster(apps, rels)
clustered = sorted(principal_of.keys())
# set comparison vs expected D-020 clustered set
got = set(clustered)
exp = set(EXPECTED_CLUSTERED)
if got != exp:
if exp - got:
R.add(sec, "WARN", "VIP-SET", "expected-clustered apps NOT detected as clustered: %s"
% ", ".join(sorted(exp - got)))
if got - exp:
R.add(sec, "WARN", "VIP-SET", "clustered apps beyond the D-020 set: %s"
% ", ".join(sorted(got - exp)))
ok = 0
for name in clustered:
opts = (apps[name].get("options") or {})
vip = opts.get("vip")
if not vip:
R.add(sec, "FAIL", "VIP-MISS", "%s is clustered but has no vip" % name)
continue
parts = str(vip).split()
if len(parts) != 2:
R.add(sec, "FAIL", "VIP-DUAL", "%s vip is not dual (got %r)" % (name, vip))
continue
prov, metal = parts
if not in_net(prov, PROVIDER_NET):
R.add(sec, "FAIL", "VIP-PROV", "%s provider vip %s not in %s" % (name, prov, PROVIDER_NET))
continue
if not in_net(metal, METAL_NET):
R.add(sec, "FAIL", "VIP-METAL", "%s metal vip %s not in %s" % (name, metal, METAL_NET))
continue
po, mo = int(prov.split(".")[-1]), int(metal.split(".")[-1])
if po != mo:
R.add(sec, "FAIL", "VIP-MIRROR", "%s octets differ: provider .%d vs metal .%d" % (name, po, mo))
continue
if not (VIP_OCTET_MIN <= mo <= VIP_OCTET_MAX):
R.add(sec, "FAIL", "VIP-RANGE",
"%s metal vip octet .%d outside reserved %d-%d" % (name, mo, VIP_OCTET_MIN, VIP_OCTET_MAX))
continue
expected_octet = EXPECTED_CLUSTERED.get(name)
if expected_octet is not None and po != expected_octet:
R.add(sec, "WARN", "VIP-OCTET",
"%s vip octet .%d != D-020 map .%d" % (name, po, expected_octet))
ok += 1
if ok:
R.add(sec, "PASS", "VIP-DUAL",
"%d clustered API charm(s) have mirrored dual VIPs in the reserved range" % ok)
def check_osd(R, apps):
sec = "8. Anti-pattern: ceph-osd osd-devices"
osds = [n for n, s in apps.items() if (s or {}).get("charm") == "ceph-osd"]
if not osds:
R.add(sec, "WARN", "OSD", "no ceph-osd app found")
return
for n in osds:
dev = (apps[n].get("options") or {}).get("osd-devices")
if not dev or not isinstance(dev, str) or not dev.strip().startswith("/"):
R.add(sec, "FAIL", "OSD-DEV", "%s osd-devices not a real path: %r" % (n, dev))
else:
note = ""
if "/dev/disk/by-" not in dev:
note = " (kernel-name; by-path/by-id is harder for bare metal -- Roosevelt note)"
R.add(sec, "PASS", "OSD-DEV", "%s osd-devices=%s%s" % (n, dev.strip(), note))
def check_ovn(R, apps):
sec = "9. Anti-pattern: ovn-chassis mappings (MAC over NIC name)"
chassis = [n for n, s in apps.items() if (s or {}).get("charm") == "ovn-chassis"]
if not chassis:
R.add(sec, "WARN", "OVN", "no ovn-chassis app found")
return
for n in sorted(chassis):
opts = (apps[n].get("options") or {})
bim = opts.get("bridge-interface-mappings")
if not bim:
R.add(sec, "INFO", "OVN-BIM", "%s has no bridge-interface-mappings (expected for octavia-side chassis)" % n)
continue
if MAC_RE.search(str(bim)):
R.add(sec, "PASS", "OVN-BIM", "%s bridge-interface-mappings is MAC-based" % n)
else:
R.add(sec, "WARN", "OVN-BIM",
"%s bridge-interface-mappings has no MAC (NIC-name? fragile): %r" % (n, bim))
def check_os_networks(R, apps, rels):
sec = "10. D-020: spaces-native (no os-*-network pinning)"
_, principal_of = map_hacluster(apps, rels)
flagged = 0
for name in sorted(principal_of):
opts = (apps[name].get("options") or {})
for k in ("os-internal-network", "os-admin-network", "os-public-network"):
if k in opts:
R.add(sec, "WARN", "OS-NET",
"%s sets %s (D-020 found spaces-native resolve sufficient; verify intent)" % (name, k))
flagged += 1
if not flagged:
R.add(sec, "PASS", "OS-NET", "no clustered charm pins os-*-network (spaces-native, per D-020)")
def expected_channel(charm):
if charm in CHANNEL_MATRIX:
return CHANNEL_MATRIX[charm]
if charm in OPENSTACK_CORE_CHARMS:
return OPENSTACK_CORE_CHANNEL
return None
def check_channels_base(R, apps):
sec = "11. Channels / base (verified Caracal matrix; WARN-only)"
mismatch = 0
for name, spec in sorted(apps.items()):
spec = spec or {}
charm = spec.get("charm")
ch = spec.get("channel")
exp = expected_channel(charm)
if exp and ch and ch != exp:
R.add(sec, "WARN", "CHANNEL", "%s (%s) channel=%s expected=%s" % (name, charm, ch, exp))
mismatch += 1
base = spec.get("base")
series = spec.get("series")
if base and base != EXPECTED_BASE:
R.add(sec, "WARN", "BASE", "%s base=%s expected=%s" % (name, base, EXPECTED_BASE))
if series and series not in ("jammy",):
R.add(sec, "WARN", "SERIES", "%s series=%s expected=jammy" % (name, series))
if not mismatch:
R.add(sec, "PASS", "CHANNEL", "no charm deviates from the known Caracal channel matrix")
def summary_tables(R, apps, rels):
sec = "12. Inventory (informational)"
_, principal_of = map_hacluster(apps, rels)
for name in sorted(principal_of):
vip = ((apps[name].get("options") or {}).get("vip"))
R.add(sec, "INFO", "CLUSTERED", "%-26s vip=%s" % (name, vip))
routers = sorted(n for n, s in apps.items() if (s or {}).get("charm") == "mysql-router")
R.add(sec, "INFO", "ROUTERS", "%d mysql-router apps: %s" % (len(routers), ", ".join(routers)))
# --------------------------------------------------------------------------- #
# Main
# --------------------------------------------------------------------------- #
def main():
ap = argparse.ArgumentParser(description="Comprehensive Caracal bundle reviewer (read-only).")
ap.add_argument("bundle", nargs="?", default="bundle.yaml")
ap.add_argument("--strict", action="store_true", help="treat WARN as failing for exit code")
ap.add_argument("--quiet", action="store_true", help="show only WARN/FAIL")
args = ap.parse_args()
try:
with open(args.bundle, "r", encoding="utf-8", errors="replace") as fh:
text = fh.read()
except FileNotFoundError:
sys.stderr.write("ERROR: bundle not found: %s\n" % args.bundle)
return 2
try:
doc = yaml.load(text, Loader=DupKeyLoader)
except yaml.YAMLError as e:
sys.stderr.write("ERROR: YAML parse failed: %s\n" % e)
return 2
R = Reporter(quiet=args.quiet)
print("================ Caracal v1 bundle review: %s ================" % args.bundle)
check_ascii(R, text)
apps, rels = check_structure(R, doc)
if apps is None:
R.emit()
return 1
check_relations(R, apps, rels)
check_bindings_phantom(R, apps)
check_vault(R, apps, rels)
check_hacluster(R, apps, rels)
check_memcached(R, apps, rels)
check_router_bindings(R, apps)
check_vips(R, apps, rels)
check_osd(R, apps)
check_ovn(R, apps)
check_os_networks(R, apps, rels)
check_channels_base(R, apps)
summary_tables(R, apps, rels)
R.emit()
fail = R.counts["FAIL"] > 0
warn = R.counts["WARN"] > 0
if fail or (args.strict and warn):
print("\nVERDICT: NOT CLEAN" + (" (--strict: WARN counts)" if (warn and not fail) else ""))
return 1
print("\nVERDICT: CLEAN" + (" (with WARN review items)" if warn else ""))
return 0
if __name__ == "__main__":
sys.exit(main())