186 lines
5.4 KiB
Python
186 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List
|
|
|
|
from .console import ConsoleSession
|
|
|
|
|
|
def _parse_valid_to(s: str) -> datetime | None:
|
|
"""Parse a `valid_to` string returned by Console.
|
|
|
|
Observed formats include:
|
|
- "Jan 09 2026 04:09:55"
|
|
- "2026-01-09 04:09:55"
|
|
|
|
Returns a timezone-aware datetime (UTC) if parsing succeeds.
|
|
"""
|
|
|
|
if not s:
|
|
return None
|
|
|
|
s = s.strip()
|
|
fmts = [
|
|
"%b %d %Y %H:%M:%S",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%b %d %Y",
|
|
"%Y-%m-%d",
|
|
]
|
|
for fmt in fmts:
|
|
try:
|
|
dt = datetime.strptime(s, fmt)
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class CertKeyRow:
|
|
file_name: str
|
|
display_name: str
|
|
days_to_expiry: int | None
|
|
valid_to: str
|
|
no_of_bound_entities: int | None
|
|
|
|
|
|
def poll_console_certkeys(
|
|
session: ConsoleSession,
|
|
*,
|
|
window_days: int = 30,
|
|
) -> Dict[str, Any]:
|
|
"""Fetch cert inventory and produce filtered views.
|
|
|
|
Returns a dict with:
|
|
- all: list[CertKeyRow]
|
|
- expired: list[CertKeyRow]
|
|
- expiring: list[CertKeyRow] (0 < days <= window_days)
|
|
- raw: raw NITRO JSON response
|
|
"""
|
|
|
|
attrs = ",".join(
|
|
[
|
|
"file_name",
|
|
"display_name",
|
|
"valid_to",
|
|
"days_to_expiry",
|
|
"no_of_bound_entities",
|
|
]
|
|
)
|
|
raw = session.list_ssl_certkeys(attrs=attrs)
|
|
items = raw.get("ns_ssl_certkey") or raw.get("ns_ssl_certkey_response") or []
|
|
|
|
rows: List[CertKeyRow] = []
|
|
for it in items:
|
|
file_name = str(it.get("file_name", "") or "")
|
|
display_name = str(it.get("display_name", "") or "")
|
|
valid_to = str(it.get("valid_to", "") or "")
|
|
|
|
dte_raw = it.get("days_to_expiry")
|
|
days_to_expiry: int | None = None
|
|
if dte_raw is not None and str(dte_raw).strip() != "":
|
|
try:
|
|
days_to_expiry = int(str(dte_raw))
|
|
except ValueError:
|
|
days_to_expiry = None
|
|
|
|
bound_raw = it.get("no_of_bound_entities")
|
|
bound: int | None = None
|
|
if bound_raw is not None and str(bound_raw).strip() != "":
|
|
try:
|
|
bound = int(str(bound_raw))
|
|
except ValueError:
|
|
bound = None
|
|
|
|
rows.append(
|
|
CertKeyRow(
|
|
file_name=file_name,
|
|
display_name=display_name,
|
|
days_to_expiry=days_to_expiry,
|
|
valid_to=valid_to,
|
|
no_of_bound_entities=bound,
|
|
)
|
|
)
|
|
|
|
def key_sort(r: CertKeyRow):
|
|
if r.days_to_expiry is None:
|
|
dt = _parse_valid_to(r.valid_to)
|
|
if dt:
|
|
delta = dt - datetime.now(timezone.utc)
|
|
return int(delta.total_seconds())
|
|
return 10**9
|
|
return r.days_to_expiry * 24 * 3600
|
|
|
|
rows_sorted = sorted(rows, key=key_sort)
|
|
|
|
expired = [r for r in rows_sorted if r.days_to_expiry is not None and r.days_to_expiry <= 0]
|
|
expiring = [
|
|
r
|
|
for r in rows_sorted
|
|
if r.days_to_expiry is not None and 0 < r.days_to_expiry <= window_days
|
|
]
|
|
|
|
return {"all": rows_sorted, "expired": expired, "expiring": expiring, "raw": raw}
|
|
|
|
|
|
def write_report(
|
|
report: Dict[str, Any],
|
|
*,
|
|
fmt: str = "table",
|
|
out_path: str | None = None,
|
|
include_all: bool = False,
|
|
) -> str:
|
|
"""Write report to a file (or return as a string for stdout)."""
|
|
|
|
rows: List[CertKeyRow] = report["all"] if include_all else (report["expired"] + report["expiring"])
|
|
|
|
if fmt == "json":
|
|
payload = {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"expired": [r.__dict__ for r in report["expired"]],
|
|
"expiring": [r.__dict__ for r in report["expiring"]],
|
|
}
|
|
s = json.dumps(payload, indent=2)
|
|
elif fmt == "csv":
|
|
import io
|
|
|
|
buf = io.StringIO()
|
|
w = csv.writer(buf)
|
|
w.writerow(["file_name", "display_name", "days_to_expiry", "valid_to", "no_of_bound_entities"])
|
|
for r in rows:
|
|
w.writerow([r.file_name, r.display_name, r.days_to_expiry, r.valid_to, r.no_of_bound_entities])
|
|
s = buf.getvalue()
|
|
else:
|
|
# simple fixed-width table
|
|
headers = ["File", "Display", "Days", "Valid To", "Bound"]
|
|
data = [
|
|
[
|
|
r.file_name,
|
|
r.display_name,
|
|
"" if r.days_to_expiry is None else str(r.days_to_expiry),
|
|
r.valid_to,
|
|
"" if r.no_of_bound_entities is None else str(r.no_of_bound_entities),
|
|
]
|
|
for r in rows
|
|
]
|
|
cols = list(zip(headers, *data)) if data else [(h,) for h in headers]
|
|
widths = [min(60, max(len(str(x)) for x in col)) for col in cols]
|
|
|
|
def fmt_row(vals: List[str]) -> str:
|
|
return " ".join(str(v)[:w].ljust(w) for v, w in zip(vals, widths))
|
|
|
|
lines = [fmt_row(headers), fmt_row(["-" * w for w in widths])]
|
|
for row in data:
|
|
lines.append(fmt_row(row))
|
|
s = "\n".join(lines)
|
|
|
|
if out_path:
|
|
os.makedirs(os.path.dirname(os.path.abspath(out_path)) or ".", exist_ok=True)
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
f.write(s)
|
|
return s
|