#!/usr/bin/env python3 """ cert_poll.py Poll NetScaler Console for certificate inventory + expiry dates and generate a report. Goals: - Cross-platform (macOS/Windows/Linux) - No external dependencies beyond requests (and optionally keyring) - Uses NITRO v2 login once -> session token -> Cookie: NITRO_AUTH_TOKEN=... - Produces JSON + Markdown report - File lock to prevent concurrent runs on same machine NOTE: - NetScaler Console NITRO resources vary by build/license. - This script tries a few known endpoints and will fall back gracefully. """ import argparse import datetime as dt import getpass import json import os import platform import re import socket import sys import time from dataclasses import dataclass, asdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import requests # ----------------------------- # Utilities # ----------------------------- class NitroError(RuntimeError): def __init__(self, status_code: int, message: str, payload: Optional[dict] = None): super().__init__(f"NITRO error (HTTP {status_code}): {message}") self.status_code = status_code self.message = message self.payload = payload or {} def _now_utc() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _ts_local_compact() -> str: # Example: 20260109-231530 return dt.datetime.now().strftime("%Y%m%d-%H%M%S") def _safe_mkdir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) def _write_text(path: Path, content: str) -> None: path.write_text(content, encoding="utf-8") def _write_bytes(path: Path, content: bytes) -> None: path.write_bytes(content) def _iso(dt_obj: Optional[dt.datetime]) -> Optional[str]: if not dt_obj: return None return dt_obj.astimezone(dt.timezone.utc).isoformat() def _parse_datetime_any(s: str) -> Optional[dt.datetime]: """ Attempts to parse common date formats seen in Console inventory. Examples observed: - "2026-01-09 04:09:55" - "2026-01-09T04:09:55Z" - epoch seconds as string """ if not s: return None s = str(s).strip() if not s: return None # epoch seconds if re.fullmatch(r"\d{9,12}", s): try: return dt.datetime.fromtimestamp(int(s), tz=dt.timezone.utc) except Exception: return None # "YYYY-MM-DD HH:MM:SS" for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S"): try: return dt.datetime.strptime(s, fmt).replace(tzinfo=dt.timezone.utc) except Exception: pass # ISO-ish try: # normalize trailing Z if s.endswith("Z"): s = s[:-1] + "+00:00" return dt.datetime.fromisoformat(s) except Exception: return None # ----------------------------- # Cross-platform file lock # ----------------------------- class FileLock: """ Simple cross-platform lock using atomic directory creation. This avoids platform-specific fcntl/msvcrt issues and works well on local filesystems. Lock is represented by a directory, e.g.: /tmp/certctl.lock/ We also write metadata inside for debugging. """ def __init__(self, lock_dir: Path, stale_seconds: int = 3600): self.lock_dir = lock_dir self.stale_seconds = stale_seconds def acquire(self, wait_seconds: int = 0) -> None: start = time.time() while True: try: self.lock_dir.mkdir(parents=True, exist_ok=False) # write metadata meta = { "pid": os.getpid(), "host": socket.gethostname(), "user": getpass.getuser(), "platform": platform.platform(), "created": _now_utc().isoformat(), } _write_text(self.lock_dir / "meta.json", json.dumps(meta, indent=2)) return except FileExistsError: # check staleness meta_path = self.lock_dir / "meta.json" if meta_path.exists(): try: m = json.loads(meta_path.read_text(encoding="utf-8")) created = _parse_datetime_any(m.get("created", "")) if created: age = (_now_utc() - created.astimezone(dt.timezone.utc)).total_seconds() if age > self.stale_seconds: # stale lock -> remove self.release(force=True) continue except Exception: pass if wait_seconds <= 0: raise RuntimeError(f"Lock already held: {self.lock_dir}") if time.time() - start > wait_seconds: raise RuntimeError(f"Timed out waiting for lock: {self.lock_dir}") time.sleep(0.25) def release(self, force: bool = False) -> None: if not self.lock_dir.exists(): return # best effort cleanup try: for child in self.lock_dir.iterdir(): try: child.unlink() except Exception: pass self.lock_dir.rmdir() except Exception: if not force: raise # ----------------------------- # NetScaler Console NITRO v2 client # ----------------------------- class ConsoleClient: def __init__(self, console_url: str, insecure: bool = False, timeout: int = 30): self.console_url = console_url.rstrip("/") self.insecure = insecure self.timeout = timeout self.session = requests.Session() self.token: Optional[str] = None def _url(self, path: str) -> str: if not path.startswith("/"): path = "/" + path return self.console_url + path def login(self, username: str, password: str) -> str: """ POST /nitro/v2/config/login Response includes login[0].sessionid, which must be used as NITRO_AUTH_TOKEN cookie. """ payload = {"login": {"username": username, "password": password}} r = self.session.post( self._url("/nitro/v2/config/login"), json=payload, verify=not self.insecure, timeout=self.timeout, ) j = self._json_or_raise(r) try: token = j["login"][0]["sessionid"] except Exception: raise NitroError(r.status_code, "Login response missing sessionid", j) self.token = token return token def _headers(self) -> Dict[str, str]: h: Dict[str, str] = {} if self.token: h["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}" return h def get(self, path: str, params: Optional[dict] = None) -> dict: r = self.session.get( self._url(path), headers=self._headers(), params=params or {}, verify=not self.insecure, timeout=self.timeout, ) return self._json_or_raise(r) def _json_or_raise(self, r: requests.Response) -> dict: try: j = r.json() except Exception: raise NitroError(r.status_code, f"Non-JSON response: {r.text[:200]}") # Console often returns 200 with errorcode != 0 if r.status_code >= 400: msg = j.get("message") or r.reason or "HTTP error" raise NitroError(r.status_code, msg, j) if isinstance(j, dict) and "errorcode" in j and j.get("errorcode") not in (0, "0", None): msg = j.get("message") or "NITRO error" raise NitroError(r.status_code, msg, j) return j # ----------------------------- # Report model # ----------------------------- @dataclass class CertRecord: name: str subject: Optional[str] issuer: Optional[str] not_after: Optional[str] days_remaining: Optional[int] source: str # "console" raw: Dict[str, Any] def _days_remaining(not_after_dt: Optional[dt.datetime]) -> Optional[int]: if not not_after_dt: return None delta = not_after_dt.astimezone(dt.timezone.utc) - _now_utc() return int(delta.total_seconds() // 86400) # ----------------------------- # Inventory fetch logic # ----------------------------- def fetch_console_certs(client: ConsoleClient) -> List[CertRecord]: """ Tries multiple known endpoints and normalizes results. Console may expose: - /nitro/v1/config/ssl_cert - /nitro/v1/config/ns_ssl_cert - /nitro/v2/config/ssl_cert - /nitro/v2/config/ns_ssl_cert - /nitro/v1/config/ssl_certificate (analytics type) [not the same thing] We'll try v2 first, then v1. """ candidates = [ ("/nitro/v2/config/ns_ssl_cert?attrs=*", "ns_ssl_cert"), ("/nitro/v2/config/ssl_cert?attrs=*", "ssl_cert"), ("/nitro/v1/config/ns_ssl_cert?attrs=*", "ns_ssl_cert"), ("/nitro/v1/config/ssl_cert?attrs=*", "ssl_cert"), ] last_err: Optional[Exception] = None for path, key in candidates: try: j = client.get(path) objs = j.get(key, []) if not isinstance(objs, list): continue out: List[CertRecord] = [] for o in objs: if not isinstance(o, dict): continue # Try common fields name = ( o.get("file_name") or o.get("certkey") or o.get("name") or o.get("certificate") or "UNKNOWN" ) subject = o.get("subject") or o.get("cert_subject") or o.get("certsubject") issuer = o.get("issuer") or o.get("cert_issuer") or o.get("certissuer") # expiry fields vary wildly not_after = ( o.get("notafter") or o.get("not_after") or o.get("valid_to") or o.get("expiry_date") or o.get("cert_expiry") or o.get("expiration") ) not_after_dt = _parse_datetime_any(str(not_after)) if not_after else None days = _days_remaining(not_after_dt) out.append( CertRecord( name=str(name), subject=str(subject) if subject else None, issuer=str(issuer) if issuer else None, not_after=_iso(not_after_dt), days_remaining=days, source="console", raw=o, ) ) # If endpoint exists but empty, still valid return out except Exception as e: last_err = e continue raise RuntimeError(f"Unable to fetch cert inventory from Console. Last error: {last_err}") # ----------------------------- # Renderers # ----------------------------- def render_markdown(records: List[CertRecord], window_days: int, console: str) -> str: now = _now_utc() lines: List[str] = [] lines.append(f"# Certificate Expiry Report") lines.append("") lines.append(f"- Generated (UTC): `{now.isoformat()}`") lines.append(f"- Console: `{console}`") lines.append(f"- Renewal window: `{window_days} days`") lines.append(f"- Total certs discovered: `{len(records)}`") lines.append("") expiring = [r for r in records if r.days_remaining is not None and r.days_remaining <= window_days] expiring.sort(key=lambda r: (r.days_remaining if r.days_remaining is not None else 999999, r.name)) lines.append(f"## Expiring within {window_days} days ({len(expiring)})") lines.append("") if not expiring: lines.append("_No certificates found in the renewal window._") lines.append("") return "\n".join(lines) lines.append("| Days | Not After (UTC) | Name | Subject | Issuer |") lines.append("|---:|---|---|---|---|") for r in expiring: lines.append( f"| {r.days_remaining} | `{r.not_after or ''}` | `{r.name}` | `{r.subject or ''}` | `{r.issuer or ''}` |" ) lines.append("") return "\n".join(lines) # ----------------------------- # Main # ----------------------------- def main() -> int: ap = argparse.ArgumentParser( description="Poll NetScaler Console for cert expiry and generate a report." ) ap.add_argument("--console", required=True, help="NetScaler Console base URL (e.g. https://192.168.113.2)") ap.add_argument("--user", required=True, help="Console username") ap.add_argument("--password", help="Console password (if omitted, prompt)") ap.add_argument("--insecure", action="store_true", help="Disable TLS verification (lab use)") ap.add_argument("--timeout", type=int, default=30, help="HTTP timeout seconds (default: 30)") ap.add_argument("--window-days", type=int, default=30, help="Renewal window in days (default: 30)") ap.add_argument("--out-dir", default="./reports", help="Output directory for reports (default: ./reports)") ap.add_argument("--lock-file", default="./.certctl.lock", help="Lock dir path (default: ./.certctl.lock)") ap.add_argument("--lock-wait", type=int, default=0, help="Seconds to wait for lock (default: 0)") ap.add_argument("--lock-stale", type=int, default=3600, help="Stale lock seconds (default: 3600)") args = ap.parse_args() out_dir = Path(args.out_dir).expanduser().resolve() lock_dir = Path(args.lock_file).expanduser().resolve() _safe_mkdir(out_dir) lock = FileLock(lock_dir, stale_seconds=args.lock_stale) lock.acquire(wait_seconds=args.lock_wait) try: password = args.password or getpass.getpass(f"Console password for {args.user}@{args.console}: ") client = ConsoleClient(args.console, insecure=args.insecure, timeout=args.timeout) print("[console] Logging in...") client.login(args.user, password) print("[console] Login OK.") print("[console] Fetching cert inventory...") records = fetch_console_certs(client) print(f"[console] Discovered {len(records)} cert records.") # Filter only those with known expiry for reporting relevance # (we still include unknown expiry in JSON for troubleshooting) window = int(args.window_days) report_ts = _ts_local_compact() json_path = out_dir / f"expiry_{report_ts}.json" md_path = out_dir / f"expiry_{report_ts}.md" payload = { "generated_utc": _now_utc().isoformat(), "console": args.console, "window_days": window, "total_records": len(records), "records": [asdict(r) for r in records], } _write_text(json_path, json.dumps(payload, indent=2)) _write_text(md_path, render_markdown(records, window, args.console)) print(f"[report] Wrote JSON: {json_path}") print(f"[report] Wrote MD: {md_path}") # convenience: latest pointers latest_json = out_dir / "latest_expiry.json" latest_md = out_dir / "latest_expiry.md" _write_text(latest_json, json.dumps(payload, indent=2)) _write_text(latest_md, render_markdown(records, window, args.console)) print(f"[report] Updated latest_expiry.json / latest_expiry.md") # Exit code useful for automation: # 0 = ok # 2 = certs in renewal window found expiring = [r for r in records if r.days_remaining is not None and r.days_remaining <= window] if expiring: print(f"[alert] {len(expiring)} cert(s) within {window} days of expiry.") return 2 return 0 finally: lock.release(force=True) if __name__ == "__main__": raise SystemExit(main())