Newer
Older
openstack-caracal-ipv4 / netbox / ipv4-prefixes-import.py
#!/usr/bin/env python3
"""
NetBox IPv4 prefix import for VR0 DC0 Omega Cloud (v1).

Adds the IPv4 prefixes required for v1 (IPv4-only) deployment:
  - Metal           10.12.8.0/22       (charm-to-charm relations)
  - Provider        10.12.4.0/22       (ext_net FIPs + API VIPs per Option B)
  - LBaaS Mgmt      10.12.32.0/22      (Octavia controller-to-amphora)
  - Tenant pool     10.20.0.0/16       (v1 IPv4 hybrid per D-016; configurable)

Within the Provider /22, two IP Ranges are created:
  - 10.12.4.10  - 10.12.4.223          (FIP pool — Neutron allocates from here)
  - 10.12.4.224 - 10.12.4.254          (API VIPs — exclude from Neutron pools)

Per D-015 (v1/v2 fork), this script is v1-scope only and adds NO IPv6 prefixes.
Existing IPv6 entries (Provider /60 carve-outs, etc.) are handled separately
by netbox/ipv6-mark-reserved.py (sets them to Reservation status).

Per D-016 (IPv4 tenant pool hybrid), the tenant pool default is 10.20.0.0/16.
Override via the TENANT_POOL_CIDR environment variable if a different range
is preferred.

NetBox version: 4.x (uses scope_type / scope_id, not legacy 'site' field).

Usage:
    NETBOX_URL=https://netbox.baldurkeep.com NETBOX_TOKEN=<token> \\
        python3 ipv4-prefixes-import.py

    # Override tenant pool:
    TENANT_POOL_CIDR=10.30.0.0/16 \\
        NETBOX_URL=... NETBOX_TOKEN=... python3 ipv4-prefixes-import.py

Idempotent: re-running is safe; existing prefixes/ranges are detected and
skipped with a message. To force update, pass --update on the command line.

Verification block at end prints final state.
"""

from __future__ import annotations

import argparse
import os
import sys

try:
    import pynetbox
except ImportError:
    sys.stderr.write("ERROR: pynetbox not installed. pip install pynetbox\n")
    sys.exit(1)

# -----------------------------------------------------------------------------
# Configuration — values traced to NetBox via docs/design-decisions.md (D-003, D-004)
# -----------------------------------------------------------------------------

SITE_SLUG = "vr0-dc0"  # VR0 DC0 site
SITE_NAME = "VR0 DC0"  # human-readable for verification output

# Role slugs as they appear in NetBox. Verify with: nb.ipam.roles.all()
ROLE_PROVIDER = "provider"
ROLE_METAL = "metal"
ROLE_LBAAS_MGMT = "lbaas-management"
ROLE_OPENSTACK_TENANT = "openstack-tenant"

# VLAN: OS-Provider VID 240 in VR0 DC0-VLANs group (already exists from prior import)
PROVIDER_VLAN_VID = 240
PROVIDER_VLAN_GROUP_SLUG = "vr0-dc0-vlans"

# Default IPv4 tenant pool (per D-016). Configurable via TENANT_POOL_CIDR env var.
# Default 10.20.0.0/16 — 256 /24s available for per-project Neutron-managed tenant subnets.
DEFAULT_TENANT_POOL_CIDR = "10.20.0.0/16"
TENANT_POOL_CIDR = os.environ.get("TENANT_POOL_CIDR", DEFAULT_TENANT_POOL_CIDR)

# Prefixes to create (CIDR -> {role_slug, description, optional vlan})
# All entries are v1-scope IPv4. v2 IPv6 entries are NOT created here;
# see netbox/ipv6-mark-reserved.py for handling of existing IPv6 entries.
IPV4_PREFIXES = [
    {
        "prefix": "10.12.4.0/22",
        "role_slug": ROLE_PROVIDER,
        "description": "VR0 DC0 Provider (ext_net FIPs + API VIPs per Option B / D-003)",
        "vlan_vid": PROVIDER_VLAN_VID,
        "vlan_group_slug": PROVIDER_VLAN_GROUP_SLUG,
    },
    {
        "prefix": "10.12.8.0/22",
        "role_slug": ROLE_METAL,
        "description": "VR0 DC0 Metal (charm-to-charm relations)",
        "vlan_vid": None,
        "vlan_group_slug": None,
    },
    {
        "prefix": "10.12.32.0/22",
        "role_slug": ROLE_LBAAS_MGMT,
        "description": "VR0 DC0 LBaaS Management (Octavia controller to amphora)",
        "vlan_vid": None,
        "vlan_group_slug": None,
    },
    {
        "prefix": TENANT_POOL_CIDR,
        "role_slug": ROLE_OPENSTACK_TENANT,
        "description": (
            "VR0 DC0 OpenStack Tenant pool (v1 IPv4 hybrid per D-016) — "
            "Neutron-managed per-project /24 carve-outs within this pool"
        ),
        "vlan_vid": None,
        "vlan_group_slug": None,
    },
]

# IP Ranges to create within Provider /22 (start, end -> description)
PROVIDER_IP_RANGES = [
    {
        "start": "10.12.4.10/22",
        "end": "10.12.4.223/22",
        "role_slug": ROLE_PROVIDER,
        "description": "FIP pool — Neutron allocates floating IPs from this range",
    },
    {
        "start": "10.12.4.224/22",
        "end": "10.12.4.254/22",
        "role_slug": ROLE_PROVIDER,
        "description": "API VIPs — charm hacluster VIPs (exclude from Neutron allocation_pools)",
    },
]

# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------


def die(msg: str, code: int = 1) -> None:
    sys.stderr.write(f"ERROR: {msg}\n")
    sys.exit(code)


def get_nb() -> "pynetbox.api":
    url = os.environ.get("NETBOX_URL")
    token = os.environ.get("NETBOX_TOKEN")
    if not url:
        die("NETBOX_URL environment variable not set")
    if not token:
        die("NETBOX_TOKEN environment variable not set")
    nb = pynetbox.api(url, token=token)
    # Sanity check
    try:
        _ = nb.status()
    except Exception as exc:  # noqa: BLE001
        die(f"Could not reach NetBox at {url}: {exc}")
    return nb


def find_site(nb, slug: str):
    site = nb.dcim.sites.get(slug=slug)
    if site is None:
        die(f"Site with slug '{slug}' not found in NetBox")
    return site


def find_role(nb, slug: str):
    role = nb.ipam.roles.get(slug=slug)
    if role is None:
        die(f"IPAM role with slug '{slug}' not found in NetBox")
    return role


def find_vlan(nb, vid: int, group_slug: str):
    group = nb.ipam.vlan_groups.get(slug=group_slug)
    if group is None:
        die(f"VLAN group with slug '{group_slug}' not found")
    vlan = nb.ipam.vlans.get(vid=vid, group_id=group.id)
    if vlan is None:
        die(f"VLAN VID {vid} not found in group '{group_slug}'")
    return vlan


def create_or_report_prefix(nb, cfg: dict, site, update: bool = False) -> None:
    cidr = cfg["prefix"]
    role = find_role(nb, cfg["role_slug"])
    vlan = None
    if cfg.get("vlan_vid") is not None:
        vlan = find_vlan(nb, cfg["vlan_vid"], cfg["vlan_group_slug"])

    existing = nb.ipam.prefixes.get(prefix=cidr)
    payload = {
        "prefix": cidr,
        "role": role.id,
        "description": cfg["description"],
        "scope_type": "dcim.site",
        "scope_id": site.id,
    }
    if vlan is not None:
        payload["vlan"] = vlan.id

    if existing is None:
        created = nb.ipam.prefixes.create(**payload)
        print(f"  CREATED prefix {cidr} (id={created.id}) role={cfg['role_slug']}")
    else:
        if update:
            existing.update(payload)
            print(f"  UPDATED prefix {cidr} (id={existing.id}) role={cfg['role_slug']}")
        else:
            print(f"  EXISTS  prefix {cidr} (id={existing.id}) — skipped (use --update to overwrite)")


def create_or_report_iprange(nb, cfg: dict, update: bool = False) -> None:
    start = cfg["start"]
    end = cfg["end"]
    role = find_role(nb, cfg["role_slug"])

    # Look for an existing range with these endpoints
    existing_list = list(nb.ipam.ip_ranges.filter(start_address=start, end_address=end))
    payload = {
        "start_address": start,
        "end_address": end,
        "role": role.id,
        "description": cfg["description"],
    }

    if not existing_list:
        created = nb.ipam.ip_ranges.create(**payload)
        print(f"  CREATED IP Range {start} - {end} (id={created.id})")
    else:
        existing = existing_list[0]
        if update:
            existing.update(payload)
            print(f"  UPDATED IP Range {start} - {end} (id={existing.id})")
        else:
            print(f"  EXISTS  IP Range {start} - {end} (id={existing.id}) — skipped")


def verify(nb, site) -> None:
    print()
    print("=" * 72)
    print(f"Verification — final state for site {SITE_NAME} (id={site.id})")
    print("=" * 72)

    print("\nIPv4 Prefixes:")
    for cfg in IPV4_PREFIXES:
        cidr = cfg["prefix"]
        p = nb.ipam.prefixes.get(prefix=cidr)
        if p is None:
            print(f"  MISSING {cidr}")
            continue
        scope_id = getattr(p, "scope_id", None)
        role_slug = p.role.slug if p.role else "(none)"
        vlan_str = f"vlan={p.vlan.vid}" if p.vlan else "vlan=none"
        site_ok = "OK" if scope_id == site.id else f"SCOPE-MISMATCH(id={scope_id})"
        print(f"  {cidr.ljust(18)} role={role_slug.ljust(20)} {vlan_str.ljust(12)} {site_ok}")

    print("\nIP Ranges within Provider /22:")
    for cfg in PROVIDER_IP_RANGES:
        ranges = list(nb.ipam.ip_ranges.filter(start_address=cfg["start"], end_address=cfg["end"]))
        if not ranges:
            print(f"  MISSING {cfg['start']} - {cfg['end']}")
            continue
        r = ranges[0]
        role_slug = r.role.slug if r.role else "(none)"
        print(f"  {cfg['start']} - {cfg['end']} role={role_slug} (id={r.id})")


# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------


def main() -> int:
    parser = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0])
    parser.add_argument(
        "--update",
        action="store_true",
        help="Update existing prefixes/ranges in place (default: skip if exists)",
    )
    parser.add_argument(
        "--verify-only",
        action="store_true",
        help="Skip writes; only print verification block",
    )
    args = parser.parse_args()

    nb = get_nb()
    site = find_site(nb, SITE_SLUG)
    print(f"Connected. Site '{SITE_NAME}' (id={site.id}).")

    if not args.verify_only:
        print("\nIPv4 Prefixes:")
        for cfg in IPV4_PREFIXES:
            create_or_report_prefix(nb, cfg, site, update=args.update)

        print("\nProvider IP Ranges:")
        for cfg in PROVIDER_IP_RANGES:
            create_or_report_iprange(nb, cfg, update=args.update)

    verify(nb, site)
    print("\nDone.")
    return 0


if __name__ == "__main__":
    sys.exit(main())