271 lines
9.3 KiB
Python
271 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Poll all configured NetScaler Consoles and write per-profile reports."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
from certctl.scripts import nsconsole_certpoll
|
|
|
|
|
|
def _load_config(path: str) -> Dict[str, Any]:
|
|
config_path = Path(path).expanduser()
|
|
raw = config_path.read_text(encoding="utf-8")
|
|
if config_path.suffix.lower() in (".yaml", ".yml"):
|
|
try:
|
|
import yaml # type: ignore
|
|
except Exception as exc: # pragma: no cover - optional dependency
|
|
raise RuntimeError("PyYAML is required for YAML config files.") from exc
|
|
data = yaml.safe_load(raw) or {}
|
|
else:
|
|
data = json.loads(raw or "{}")
|
|
if not isinstance(data, dict):
|
|
raise RuntimeError("Config file must contain a JSON/YAML object at the root.")
|
|
return data
|
|
|
|
|
|
def _ext_for_format(fmt: str) -> str:
|
|
return {"json": "json", "csv": "csv"}.get(fmt, "txt")
|
|
|
|
|
|
def _collect_items(format_name: str, inputs: Dict[str, Path]) -> list[Dict[str, Any]]:
|
|
items: list[Dict[str, Any]] = []
|
|
if format_name == "csv":
|
|
import csv
|
|
|
|
for profile, path in inputs.items():
|
|
with path.open("r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle)
|
|
for row in reader:
|
|
row = dict(row)
|
|
row["profile"] = profile
|
|
items.append(row)
|
|
else:
|
|
for profile, path in inputs.items():
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
rows = payload.get("items", [])
|
|
if isinstance(rows, list):
|
|
for row in rows:
|
|
if isinstance(row, dict):
|
|
row = dict(row)
|
|
row["profile"] = profile
|
|
items.append(row)
|
|
return items
|
|
|
|
|
|
def _merge_reports(format_name: str, inputs: Dict[str, Path], output: Path) -> None:
|
|
merged: Dict[str, Any] = {
|
|
"generated_at": None,
|
|
"profiles": {},
|
|
"count": 0,
|
|
"items": [],
|
|
}
|
|
|
|
if format_name == "csv":
|
|
for profile, path in inputs.items():
|
|
rows = _collect_items("csv", {profile: path})
|
|
merged["profiles"][profile] = {"generated_at": None, "count": len(rows)}
|
|
merged["items"].extend(rows)
|
|
merged["count"] += len(rows)
|
|
else:
|
|
for profile, path in inputs.items():
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
merged["profiles"][profile] = {
|
|
"generated_at": payload.get("generated_at"),
|
|
"count": payload.get("count", 0),
|
|
}
|
|
if merged["generated_at"] is None:
|
|
merged["generated_at"] = payload.get("generated_at")
|
|
rows = _collect_items("json", {profile: path})
|
|
merged["items"].extend(rows)
|
|
merged["count"] += payload.get("count", 0)
|
|
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
if format_name == "csv":
|
|
# Build a union of keys to ensure CSV columns cover all profiles.
|
|
keys = []
|
|
for item in merged["items"]:
|
|
for key in item.keys():
|
|
if key not in keys:
|
|
keys.append(key)
|
|
if "profile" not in keys:
|
|
keys.insert(0, "profile")
|
|
with output.open("w", newline="", encoding="utf-8") as handle:
|
|
import csv
|
|
|
|
writer = csv.DictWriter(handle, fieldnames=keys)
|
|
writer.writeheader()
|
|
for item in merged["items"]:
|
|
writer.writerow({k: item.get(k, "") for k in keys})
|
|
else:
|
|
output.write_text(json.dumps(merged, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def _write_rollup(format_name: str, items: list[Dict[str, Any]], output: Path) -> None:
|
|
groups: Dict[str, Dict[str, Any]] = {}
|
|
for item in items:
|
|
subject = (item.get("subject") or "").strip() or "(unknown)"
|
|
entry = groups.setdefault(
|
|
subject,
|
|
{
|
|
"subject": subject,
|
|
"count": 0,
|
|
"profiles": [],
|
|
"certkeypair_names": [],
|
|
},
|
|
)
|
|
entry["count"] += 1
|
|
profile = item.get("profile")
|
|
if profile and profile not in entry["profiles"]:
|
|
entry["profiles"].append(profile)
|
|
name = item.get("certkeypair_name")
|
|
if name and name not in entry["certkeypair_names"]:
|
|
entry["certkeypair_names"].append(name)
|
|
|
|
rollup = {
|
|
"count_subjects": len(groups),
|
|
"subjects": sorted(groups.values(), key=lambda x: x["subject"].lower()),
|
|
}
|
|
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
if format_name == "csv":
|
|
import csv
|
|
|
|
with output.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(
|
|
handle,
|
|
fieldnames=["subject", "count", "profiles", "certkeypair_names"],
|
|
)
|
|
writer.writeheader()
|
|
for entry in rollup["subjects"]:
|
|
writer.writerow(
|
|
{
|
|
"subject": entry["subject"],
|
|
"count": entry["count"],
|
|
"profiles": ", ".join(entry["profiles"]),
|
|
"certkeypair_names": ", ".join(entry["certkeypair_names"]),
|
|
}
|
|
)
|
|
else:
|
|
output.write_text(json.dumps(rollup, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def run(args: argparse.Namespace) -> int:
|
|
data = _load_config(args.config)
|
|
consoles = data.get("consoles", {})
|
|
if not isinstance(consoles, dict) or not consoles:
|
|
raise SystemExit("Config must include a non-empty 'consoles' mapping.")
|
|
|
|
out_dir = Path(args.out_dir).expanduser()
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
profiles = [args.profile] if args.profile else list(consoles.keys())
|
|
report_paths: Dict[str, Path] = {}
|
|
for profile in profiles:
|
|
if profile not in consoles:
|
|
raise SystemExit(f"Profile '{profile}' not found in config.")
|
|
ext = _ext_for_format(args.format)
|
|
out_path = out_dir / f"{profile}.{ext}"
|
|
|
|
poll_args = argparse.Namespace(
|
|
console=None,
|
|
user=None,
|
|
password=None,
|
|
config=args.config,
|
|
profile=profile,
|
|
insecure=None,
|
|
ca_bundle=None,
|
|
timeout=None,
|
|
format=args.format,
|
|
out=str(out_path),
|
|
all=args.all,
|
|
inventory=args.inventory,
|
|
include_mappings=args.include_mappings,
|
|
expires_within=args.expires_within,
|
|
)
|
|
nsconsole_certpoll.run(poll_args)
|
|
report_paths[profile] = out_path
|
|
print(f"[{profile}] wrote {args.format} report to {out_path}")
|
|
|
|
if args.merge:
|
|
if args.format not in ("json", "csv"):
|
|
raise SystemExit("--merge is only supported with --format json or csv.")
|
|
merged_path = out_dir / f"all.{_ext_for_format(args.format)}"
|
|
_merge_reports(args.format, report_paths, merged_path)
|
|
print(f"[all] wrote merged {args.format} report to {merged_path}")
|
|
|
|
if args.rollup:
|
|
if args.format not in ("json", "csv"):
|
|
raise SystemExit("--rollup is only supported with --format json or csv.")
|
|
items = _collect_items(args.format, report_paths)
|
|
rollup_path = out_dir / f"rollup_subjects.{_ext_for_format(args.format)}"
|
|
_write_rollup(args.format, items, rollup_path)
|
|
print(f"[rollup] wrote subject rollup to {rollup_path}")
|
|
|
|
return 0
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Poll all configured NetScaler Consoles and write per-profile reports."
|
|
)
|
|
parser.add_argument("--config", required=True, help="Path to JSON/YAML config file")
|
|
parser.add_argument("--out-dir", default="./reports", help="Output directory for reports")
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["table", "csv", "json"],
|
|
default="json",
|
|
help="Report format",
|
|
)
|
|
parser.add_argument("--profile", help="Only run a single profile")
|
|
parser.add_argument(
|
|
"--all",
|
|
action="store_true",
|
|
default=None,
|
|
help="Include unbound or inactive certs (default is in-use only)",
|
|
)
|
|
parser.add_argument(
|
|
"--inventory",
|
|
action="store_true",
|
|
default=None,
|
|
help="Trigger inventory refresh before fetching certs",
|
|
)
|
|
parser.add_argument(
|
|
"--include-mappings",
|
|
action="store_true",
|
|
default=None,
|
|
help="Include cert_store_mapping data in the report",
|
|
)
|
|
parser.add_argument(
|
|
"--merge",
|
|
action="store_true",
|
|
default=False,
|
|
help="Write a combined report across all profiles (json/csv only)",
|
|
)
|
|
parser.add_argument(
|
|
"--rollup",
|
|
action="store_true",
|
|
default=False,
|
|
help="Write a subject rollup across all profiles (json/csv only)",
|
|
)
|
|
parser.add_argument(
|
|
"--expires-within",
|
|
type=int,
|
|
default=None,
|
|
help="Only include certs expiring within N days",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
parser = build_arg_parser()
|
|
args = parser.parse_args()
|
|
raise SystemExit(run(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|