Files
nscertkeycreate/certctl/scripts/nsconsole_certpoll_all.py
deamonkai fc94008530 initial
2026-01-23 12:11:21 -06:00

271 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""Poll all configured NetScaler Consoles and write per-profile reports."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any, Dict
from certctl.scripts import nsconsole_certpoll
def _load_config(path: str) -> Dict[str, Any]:
config_path = Path(path).expanduser()
raw = config_path.read_text(encoding="utf-8")
if config_path.suffix.lower() in (".yaml", ".yml"):
try:
import yaml # type: ignore
except Exception as exc: # pragma: no cover - optional dependency
raise RuntimeError("PyYAML is required for YAML config files.") from exc
data = yaml.safe_load(raw) or {}
else:
data = json.loads(raw or "{}")
if not isinstance(data, dict):
raise RuntimeError("Config file must contain a JSON/YAML object at the root.")
return data
def _ext_for_format(fmt: str) -> str:
return {"json": "json", "csv": "csv"}.get(fmt, "txt")
def _collect_items(format_name: str, inputs: Dict[str, Path]) -> list[Dict[str, Any]]:
items: list[Dict[str, Any]] = []
if format_name == "csv":
import csv
for profile, path in inputs.items():
with path.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle)
for row in reader:
row = dict(row)
row["profile"] = profile
items.append(row)
else:
for profile, path in inputs.items():
payload = json.loads(path.read_text(encoding="utf-8"))
rows = payload.get("items", [])
if isinstance(rows, list):
for row in rows:
if isinstance(row, dict):
row = dict(row)
row["profile"] = profile
items.append(row)
return items
def _merge_reports(format_name: str, inputs: Dict[str, Path], output: Path) -> None:
merged: Dict[str, Any] = {
"generated_at": None,
"profiles": {},
"count": 0,
"items": [],
}
if format_name == "csv":
for profile, path in inputs.items():
rows = _collect_items("csv", {profile: path})
merged["profiles"][profile] = {"generated_at": None, "count": len(rows)}
merged["items"].extend(rows)
merged["count"] += len(rows)
else:
for profile, path in inputs.items():
payload = json.loads(path.read_text(encoding="utf-8"))
merged["profiles"][profile] = {
"generated_at": payload.get("generated_at"),
"count": payload.get("count", 0),
}
if merged["generated_at"] is None:
merged["generated_at"] = payload.get("generated_at")
rows = _collect_items("json", {profile: path})
merged["items"].extend(rows)
merged["count"] += payload.get("count", 0)
output.parent.mkdir(parents=True, exist_ok=True)
if format_name == "csv":
# Build a union of keys to ensure CSV columns cover all profiles.
keys = []
for item in merged["items"]:
for key in item.keys():
if key not in keys:
keys.append(key)
if "profile" not in keys:
keys.insert(0, "profile")
with output.open("w", newline="", encoding="utf-8") as handle:
import csv
writer = csv.DictWriter(handle, fieldnames=keys)
writer.writeheader()
for item in merged["items"]:
writer.writerow({k: item.get(k, "") for k in keys})
else:
output.write_text(json.dumps(merged, indent=2, sort_keys=True), encoding="utf-8")
def _write_rollup(format_name: str, items: list[Dict[str, Any]], output: Path) -> None:
groups: Dict[str, Dict[str, Any]] = {}
for item in items:
subject = (item.get("subject") or "").strip() or "(unknown)"
entry = groups.setdefault(
subject,
{
"subject": subject,
"count": 0,
"profiles": [],
"certkeypair_names": [],
},
)
entry["count"] += 1
profile = item.get("profile")
if profile and profile not in entry["profiles"]:
entry["profiles"].append(profile)
name = item.get("certkeypair_name")
if name and name not in entry["certkeypair_names"]:
entry["certkeypair_names"].append(name)
rollup = {
"count_subjects": len(groups),
"subjects": sorted(groups.values(), key=lambda x: x["subject"].lower()),
}
output.parent.mkdir(parents=True, exist_ok=True)
if format_name == "csv":
import csv
with output.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=["subject", "count", "profiles", "certkeypair_names"],
)
writer.writeheader()
for entry in rollup["subjects"]:
writer.writerow(
{
"subject": entry["subject"],
"count": entry["count"],
"profiles": ", ".join(entry["profiles"]),
"certkeypair_names": ", ".join(entry["certkeypair_names"]),
}
)
else:
output.write_text(json.dumps(rollup, indent=2, sort_keys=True), encoding="utf-8")
def run(args: argparse.Namespace) -> int:
data = _load_config(args.config)
consoles = data.get("consoles", {})
if not isinstance(consoles, dict) or not consoles:
raise SystemExit("Config must include a non-empty 'consoles' mapping.")
out_dir = Path(args.out_dir).expanduser()
out_dir.mkdir(parents=True, exist_ok=True)
profiles = [args.profile] if args.profile else list(consoles.keys())
report_paths: Dict[str, Path] = {}
for profile in profiles:
if profile not in consoles:
raise SystemExit(f"Profile '{profile}' not found in config.")
ext = _ext_for_format(args.format)
out_path = out_dir / f"{profile}.{ext}"
poll_args = argparse.Namespace(
console=None,
user=None,
password=None,
config=args.config,
profile=profile,
insecure=None,
ca_bundle=None,
timeout=None,
format=args.format,
out=str(out_path),
all=args.all,
inventory=args.inventory,
include_mappings=args.include_mappings,
expires_within=args.expires_within,
)
nsconsole_certpoll.run(poll_args)
report_paths[profile] = out_path
print(f"[{profile}] wrote {args.format} report to {out_path}")
if args.merge:
if args.format not in ("json", "csv"):
raise SystemExit("--merge is only supported with --format json or csv.")
merged_path = out_dir / f"all.{_ext_for_format(args.format)}"
_merge_reports(args.format, report_paths, merged_path)
print(f"[all] wrote merged {args.format} report to {merged_path}")
if args.rollup:
if args.format not in ("json", "csv"):
raise SystemExit("--rollup is only supported with --format json or csv.")
items = _collect_items(args.format, report_paths)
rollup_path = out_dir / f"rollup_subjects.{_ext_for_format(args.format)}"
_write_rollup(args.format, items, rollup_path)
print(f"[rollup] wrote subject rollup to {rollup_path}")
return 0
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Poll all configured NetScaler Consoles and write per-profile reports."
)
parser.add_argument("--config", required=True, help="Path to JSON/YAML config file")
parser.add_argument("--out-dir", default="./reports", help="Output directory for reports")
parser.add_argument(
"--format",
choices=["table", "csv", "json"],
default="json",
help="Report format",
)
parser.add_argument("--profile", help="Only run a single profile")
parser.add_argument(
"--all",
action="store_true",
default=None,
help="Include unbound or inactive certs (default is in-use only)",
)
parser.add_argument(
"--inventory",
action="store_true",
default=None,
help="Trigger inventory refresh before fetching certs",
)
parser.add_argument(
"--include-mappings",
action="store_true",
default=None,
help="Include cert_store_mapping data in the report",
)
parser.add_argument(
"--merge",
action="store_true",
default=False,
help="Write a combined report across all profiles (json/csv only)",
)
parser.add_argument(
"--rollup",
action="store_true",
default=False,
help="Write a subject rollup across all profiles (json/csv only)",
)
parser.add_argument(
"--expires-within",
type=int,
default=None,
help="Only include certs expiring within N days",
)
return parser
def main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
raise SystemExit(run(args))
if __name__ == "__main__":
main()