#!/usr/bin/env python3
"""
NetBox IPv4 prefix import for VR0 DC0 Omega Cloud (v1).
Adds the IPv4 prefixes required for v1 (IPv4-only) deployment:
- Metal 10.12.8.0/22 (charm-to-charm relations)
- Provider 10.12.4.0/22 (ext_net FIPs + API VIPs per Option B)
- LBaaS Mgmt 10.12.32.0/22 (Octavia controller-to-amphora)
- Tenant pool 10.20.0.0/16 (v1 IPv4 hybrid per D-016; configurable)
Within the Provider /22, two IP Ranges are created:
- 10.12.4.10 - 10.12.4.223 (FIP pool — Neutron allocates from here)
- 10.12.4.224 - 10.12.4.254 (API VIPs — exclude from Neutron pools)
Per D-015 (v1/v2 fork), this script is v1-scope only and adds NO IPv6 prefixes.
Existing IPv6 entries (Provider /60 carve-outs, etc.) are handled separately
by netbox/ipv6-mark-reserved.py (sets them to Reservation status).
Per D-016 (IPv4 tenant pool hybrid), the tenant pool default is 10.20.0.0/16.
Override via the TENANT_POOL_CIDR environment variable if a different range
is preferred.
NetBox version: 4.x (uses scope_type / scope_id, not legacy 'site' field).
Usage:
NETBOX_URL=https://netbox.baldurkeep.com NETBOX_TOKEN=<token> \\
python3 ipv4-prefixes-import.py
# Override tenant pool:
TENANT_POOL_CIDR=10.30.0.0/16 \\
NETBOX_URL=... NETBOX_TOKEN=... python3 ipv4-prefixes-import.py
Idempotent: re-running is safe; existing prefixes/ranges are detected and
skipped with a message. To force update, pass --update on the command line.
Verification block at end prints final state.
"""
from __future__ import annotations
import argparse
import os
import sys
try:
import pynetbox
except ImportError:
sys.stderr.write("ERROR: pynetbox not installed. pip install pynetbox\n")
sys.exit(1)
# -----------------------------------------------------------------------------
# Configuration — values traced to NetBox via docs/design-decisions.md (D-003, D-004)
# -----------------------------------------------------------------------------
SITE_SLUG = "vr0-dc0" # VR0 DC0 site
SITE_NAME = "VR0 DC0" # human-readable for verification output
# Role slugs as they appear in NetBox. Verify with: nb.ipam.roles.all()
ROLE_PROVIDER = "provider"
ROLE_METAL = "metal"
ROLE_LBAAS_MGMT = "lbaas-management"
ROLE_OPENSTACK_TENANT = "openstack-tenant"
# VLAN: OS-Provider VID 240 in VR0 DC0-VLANs group (already exists from prior import)
PROVIDER_VLAN_VID = 240
PROVIDER_VLAN_GROUP_SLUG = "vr0-dc0-vlans"
# Default IPv4 tenant pool (per D-016). Configurable via TENANT_POOL_CIDR env var.
# Default 10.20.0.0/16 — 256 /24s available for per-project Neutron-managed tenant subnets.
DEFAULT_TENANT_POOL_CIDR = "10.20.0.0/16"
TENANT_POOL_CIDR = os.environ.get("TENANT_POOL_CIDR", DEFAULT_TENANT_POOL_CIDR)
# Prefixes to create (CIDR -> {role_slug, description, optional vlan})
# All entries are v1-scope IPv4. v2 IPv6 entries are NOT created here;
# see netbox/ipv6-mark-reserved.py for handling of existing IPv6 entries.
IPV4_PREFIXES = [
{
"prefix": "10.12.4.0/22",
"role_slug": ROLE_PROVIDER,
"description": "VR0 DC0 Provider (ext_net FIPs + API VIPs per Option B / D-003)",
"vlan_vid": PROVIDER_VLAN_VID,
"vlan_group_slug": PROVIDER_VLAN_GROUP_SLUG,
},
{
"prefix": "10.12.8.0/22",
"role_slug": ROLE_METAL,
"description": "VR0 DC0 Metal (charm-to-charm relations)",
"vlan_vid": None,
"vlan_group_slug": None,
},
{
"prefix": "10.12.32.0/22",
"role_slug": ROLE_LBAAS_MGMT,
"description": "VR0 DC0 LBaaS Management (Octavia controller to amphora)",
"vlan_vid": None,
"vlan_group_slug": None,
},
{
"prefix": TENANT_POOL_CIDR,
"role_slug": ROLE_OPENSTACK_TENANT,
"description": (
"VR0 DC0 OpenStack Tenant pool (v1 IPv4 hybrid per D-016) — "
"Neutron-managed per-project /24 carve-outs within this pool"
),
"vlan_vid": None,
"vlan_group_slug": None,
},
]
# IP Ranges to create within Provider /22 (start, end -> description)
PROVIDER_IP_RANGES = [
{
"start": "10.12.4.10/22",
"end": "10.12.4.223/22",
"role_slug": ROLE_PROVIDER,
"description": "FIP pool — Neutron allocates floating IPs from this range",
},
{
"start": "10.12.4.224/22",
"end": "10.12.4.254/22",
"role_slug": ROLE_PROVIDER,
"description": "API VIPs — charm hacluster VIPs (exclude from Neutron allocation_pools)",
},
]
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def die(msg: str, code: int = 1) -> None:
sys.stderr.write(f"ERROR: {msg}\n")
sys.exit(code)
def get_nb() -> "pynetbox.api":
url = os.environ.get("NETBOX_URL")
token = os.environ.get("NETBOX_TOKEN")
if not url:
die("NETBOX_URL environment variable not set")
if not token:
die("NETBOX_TOKEN environment variable not set")
nb = pynetbox.api(url, token=token)
# Sanity check
try:
_ = nb.status()
except Exception as exc: # noqa: BLE001
die(f"Could not reach NetBox at {url}: {exc}")
return nb
def find_site(nb, slug: str):
site = nb.dcim.sites.get(slug=slug)
if site is None:
die(f"Site with slug '{slug}' not found in NetBox")
return site
def find_role(nb, slug: str):
role = nb.ipam.roles.get(slug=slug)
if role is None:
die(f"IPAM role with slug '{slug}' not found in NetBox")
return role
def find_vlan(nb, vid: int, group_slug: str):
group = nb.ipam.vlan_groups.get(slug=group_slug)
if group is None:
die(f"VLAN group with slug '{group_slug}' not found")
vlan = nb.ipam.vlans.get(vid=vid, group_id=group.id)
if vlan is None:
die(f"VLAN VID {vid} not found in group '{group_slug}'")
return vlan
def create_or_report_prefix(nb, cfg: dict, site, update: bool = False) -> None:
cidr = cfg["prefix"]
role = find_role(nb, cfg["role_slug"])
vlan = None
if cfg.get("vlan_vid") is not None:
vlan = find_vlan(nb, cfg["vlan_vid"], cfg["vlan_group_slug"])
existing = nb.ipam.prefixes.get(prefix=cidr)
payload = {
"prefix": cidr,
"role": role.id,
"description": cfg["description"],
"scope_type": "dcim.site",
"scope_id": site.id,
}
if vlan is not None:
payload["vlan"] = vlan.id
if existing is None:
created = nb.ipam.prefixes.create(**payload)
print(f" CREATED prefix {cidr} (id={created.id}) role={cfg['role_slug']}")
else:
if update:
existing.update(payload)
print(f" UPDATED prefix {cidr} (id={existing.id}) role={cfg['role_slug']}")
else:
print(f" EXISTS prefix {cidr} (id={existing.id}) — skipped (use --update to overwrite)")
def create_or_report_iprange(nb, cfg: dict, update: bool = False) -> None:
start = cfg["start"]
end = cfg["end"]
role = find_role(nb, cfg["role_slug"])
# Look for an existing range with these endpoints
existing_list = list(nb.ipam.ip_ranges.filter(start_address=start, end_address=end))
payload = {
"start_address": start,
"end_address": end,
"role": role.id,
"description": cfg["description"],
}
if not existing_list:
created = nb.ipam.ip_ranges.create(**payload)
print(f" CREATED IP Range {start} - {end} (id={created.id})")
else:
existing = existing_list[0]
if update:
existing.update(payload)
print(f" UPDATED IP Range {start} - {end} (id={existing.id})")
else:
print(f" EXISTS IP Range {start} - {end} (id={existing.id}) — skipped")
def verify(nb, site) -> None:
print()
print("=" * 72)
print(f"Verification — final state for site {SITE_NAME} (id={site.id})")
print("=" * 72)
print("\nIPv4 Prefixes:")
for cfg in IPV4_PREFIXES:
cidr = cfg["prefix"]
p = nb.ipam.prefixes.get(prefix=cidr)
if p is None:
print(f" MISSING {cidr}")
continue
scope_id = getattr(p, "scope_id", None)
role_slug = p.role.slug if p.role else "(none)"
vlan_str = f"vlan={p.vlan.vid}" if p.vlan else "vlan=none"
site_ok = "OK" if scope_id == site.id else f"SCOPE-MISMATCH(id={scope_id})"
print(f" {cidr.ljust(18)} role={role_slug.ljust(20)} {vlan_str.ljust(12)} {site_ok}")
print("\nIP Ranges within Provider /22:")
for cfg in PROVIDER_IP_RANGES:
ranges = list(nb.ipam.ip_ranges.filter(start_address=cfg["start"], end_address=cfg["end"]))
if not ranges:
print(f" MISSING {cfg['start']} - {cfg['end']}")
continue
r = ranges[0]
role_slug = r.role.slug if r.role else "(none)"
print(f" {cfg['start']} - {cfg['end']} role={role_slug} (id={r.id})")
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0])
parser.add_argument(
"--update",
action="store_true",
help="Update existing prefixes/ranges in place (default: skip if exists)",
)
parser.add_argument(
"--verify-only",
action="store_true",
help="Skip writes; only print verification block",
)
args = parser.parse_args()
nb = get_nb()
site = find_site(nb, SITE_SLUG)
print(f"Connected. Site '{SITE_NAME}' (id={site.id}).")
if not args.verify_only:
print("\nIPv4 Prefixes:")
for cfg in IPV4_PREFIXES:
create_or_report_prefix(nb, cfg, site, update=args.update)
print("\nProvider IP Ranges:")
for cfg in PROVIDER_IP_RANGES:
create_or_report_iprange(nb, cfg, update=args.update)
verify(nb, site)
print("\nDone.")
return 0
if __name__ == "__main__":
sys.exit(main())