This commit is contained in:
deamonkai
2026-01-23 12:11:21 -06:00
commit fc94008530
16494 changed files with 2974672 additions and 0 deletions

3
legacy/bin/certctl Normal file
View File

@@ -0,0 +1,3 @@
#!/usr/bin/env bash
set -euo pipefail
python3 -m certctl "$@"

4
legacy/bin/nsctl Normal file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/env bash
set -euo pipefail
echo "[deprecated] nsctl -> certctl" >&2
python3 -m certctl "$@"

View File

@@ -0,0 +1,6 @@
"""certctl package
Lightweight package scaffold for Netscaler certificate tooling.
"""
__all__ = ["utils", "netscaler"]

View File

@@ -0,0 +1,42 @@
"""certctl - orchestration entrypoint.
Today:
certctl keycsr-console ... (create key on Console, download key, generate CSR)
This file exists so you can run `python -m certctl ...` and so `bin/certctl`
works without packaging.
"""
from __future__ import annotations
import argparse
import sys
def main(argv: list[str] | None = None) -> int:
argv = sys.argv[1:] if argv is None else argv
p = argparse.ArgumentParser(prog="certctl", add_help=True)
sub = p.add_subparsers(dest="cmd", required=True)
sub.add_parser("keycsr-console", help="Create key on Console, download it, then generate CSR locally")
sub.add_parser("poll", help="(placeholder) Poll cert expiry across Console/ADCs/WAF")
sub.add_parser("renew", help="(placeholder) Renew certs in renewal window")
ns, rest = p.parse_known_args(argv)
if ns.cmd == "keycsr-console":
from certctl.scripts.keycsr_console import main as _m
return _m(rest)
if ns.cmd in {"poll", "renew"}:
print(f"{ns.cmd}: not implemented yet (repo scaffold only)")
return 2
p.print_help()
return 2
if __name__ == "__main__":
raise SystemExit(main())

13
legacy/certctl/ca/base.py Normal file
View File

@@ -0,0 +1,13 @@
"""Base CA adapter interface."""
from typing import Protocol, Optional
class CAAdapter(Protocol):
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
"""Submit a CSR and return a request id or token."""
def poll_status(self, request_id: str, timeout: int = 60) -> str:
"""Poll request status and return a final state string (e.g., 'issued' / 'pending' / 'failed')."""
def download_certificate(self, request_id: str) -> str:
"""Return the issued certificate PEM for the request_id."""

27
legacy/certctl/ca/mock.py Normal file
View File

@@ -0,0 +1,27 @@
"""A simple mock CA adapter for tests and local runs."""
from typing import Optional
from .base import CAAdapter
class MockCA(CAAdapter):
def __init__(self):
self._store = {}
self._counter = 0
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
self._counter += 1
rid = f"mock-{self._counter}"
# store and pretend it's issued immediately for simplicity
self._store[rid] = {
"csr": csr_pem,
"name": name,
"status": "issued",
"cert": f"-----BEGIN CERTIFICATE-----\nMockCertFor:{name}\n-----END CERTIFICATE-----\n",
}
return rid
def poll_status(self, request_id: str, timeout: int = 60) -> str:
return self._store.get(request_id, {}).get("status", "unknown")
def download_certificate(self, request_id: str) -> str:
return self._store.get(request_id, {}).get("cert")

View File

@@ -0,0 +1,35 @@
"""Sectigo CA adapter skeleton.
This file provides a class with the expected interface. Implementing a full
Sectigo integration requires API credentials and network access; this is a
skeleton with TODOs and a clear place to add HTTP calls.
"""
from typing import Optional
from .base import CAAdapter
class SectigoCA(CAAdapter):
def __init__(self, api_base: str = "https://api.sectigo.com", api_key: Optional[str] = None):
self.api_base = api_base
self.api_key = api_key
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
"""Submit CSR to Sectigo and return request id.
TODO: Implement actual HTTP POSTs with authentication and error handling.
"""
raise NotImplementedError("Sectigo submission not implemented yet")
def poll_status(self, request_id: str, timeout: int = 60) -> str:
"""Poll Sectigo for request status.
TODO: implement polling logic using Sectigo APIs.
"""
raise NotImplementedError("Sectigo polling not implemented yet")
def download_certificate(self, request_id: str) -> str:
"""Download issued certificate PEM.
TODO: fetch the issued certificate from Sectigo.
"""
raise NotImplementedError("Sectigo download not implemented yet")

231
legacy/certctl/cli.py Normal file
View File

@@ -0,0 +1,231 @@
"""Minimal CLI for certctl tooling.
Provides a `keygen` subcommand which wraps `certctl.keygen.generate_private_key`.
This is intentionally small and testable: `main(argv)` accepts an argv list.
"""
from __future__ import annotations
import argparse
import sys
from typing import List, Optional
from . import keygen
def _build_subj_from_dict(subject: dict) -> str:
parts = []
for k in ["C", "ST", "L", "O", "OU", "CN"]:
v = subject.get(k)
if v:
parts.append(f"/{k}={v}")
return "".join(parts)
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="certctl", description="certctl control utility")
sub = p.add_subparsers(dest="command")
keygen_p = sub.add_parser("keygen", help="Generate a private key")
keygen_p.add_argument("--kind", choices=["rsa", "ec"], default="rsa")
keygen_p.add_argument("--bits", type=int, default=4096, help="RSA key bits")
keygen_p.add_argument("--curve", default="prime256v1", help="EC curve")
keygen_p.add_argument("--passphrase", help="Passphrase to encrypt key (insecure on CLI)")
keygen_p.add_argument("--keychain-service", help="Keychain service name to read/write passphrase")
keygen_p.add_argument("--save-passphrase", action="store_true", help="Save provided passphrase into Keychain")
keygen_p.add_argument("--keychain-account", default="certctl", help="Keychain account name when saving passphrase")
keygen_p.add_argument("--out", help="Write key to file instead of stdout")
# csr subcommands
csr_p = sub.add_parser("csr", help="Create and inspect CSRs")
csr_sub = csr_p.add_subparsers(dest="csr_cmd")
csr_create = csr_sub.add_parser("create", help="Create a CSR from a private key file")
csr_create.add_argument("--key-file", required=True, help="Path to the private key PEM")
csr_create.add_argument("--subject", help="Subject string in /C=.../ST=.../CN=... format or as key=value pairs like C=US,ST=CA,CN=example.com")
csr_create.add_argument("--san", action="append", help="SubjectAltName entry (DNS or IP); can be given multiple times")
csr_create.add_argument("--allow-wildcard", action="store_true", help="Allow wildcard SANs without interactive confirmation. Use with caution: wildcards broaden certificate scope; in non-interactive contexts this flag bypasses interactive confirmation.")
csr_create.add_argument("--from-cert", help="Path to an existing certificate to populate subject and SANs from")
csr_create.add_argument("--passphrase", help="Passphrase for encrypted key (insecure on CLI)")
csr_create.add_argument("--out", help="Write CSR to file instead of stdout")
csr_show = csr_sub.add_parser("show", help="Show CSR details (like SANs)")
csr_show.add_argument("--csr-file", required=True, help="Path to CSR PEM file")
csr_submit = csr_sub.add_parser("submit", help="Submit a CSR to a CA adapter")
csr_submit.add_argument("--csr-file", required=True, help="CSR PEM file to submit")
csr_submit.add_argument("--ca", choices=["mock", "sectigo"], default="mock", help="CA adapter to use")
csr_submit.add_argument("--name", required=True, help="Name for the cert request")
csr_submit.add_argument("--wait", action="store_true", help="Wait for issuance and download cert")
csr_submit.add_argument("--out", help="Write downloaded cert to file")
csr_submit.add_argument("--allow-wildcard", action="store_true", help="Allow submission of CSRs containing wildcard SANs. Use with caution: wildcards broaden certificate scope and may be unsafe.")
return p
def main(argv: Optional[List[str]] = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "keygen":
if args.kind == "rsa":
pem = keygen.generate_rsa_key(bits=args.bits, passphrase=args.passphrase, keychain_service=args.keychain_service, save_to_keychain=args.save_passphrase, keychain_account=args.keychain_account)
else:
pem = keygen.generate_ec_key(curve=args.curve, passphrase=args.passphrase, keychain_service=args.keychain_service, save_to_keychain=args.save_passphrase, keychain_account=args.keychain_account)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(pem)
else:
sys.stdout.write(pem)
return 0
if args.command == "csr":
if args.csr_cmd == "create":
# Parse subject: accept both /C=... style or comma separated key=value
subject = {}
subj = args.subject or ""
if subj.startswith("/"):
# convert /C=US/ST=... into dict
for part in subj.split("/"):
if not part:
continue
if "=" in part:
k, v = part.split("=", 1)
subject[k] = v
elif subj:
for part in subj.split(","):
if "=" in part:
k, v = part.split("=", 1)
subject[k.strip()] = v.strip()
csr_pem = None
from . import csr as csrmod
if args.from_cert:
# Populate subject and SANs from an existing cert
csr_pem = csrmod.create_csr_from_cert(args.from_cert, args.key_file, passphrase=args.passphrase)
else:
subj_str = subj if subj and subj.startswith("/") else _build_subj_from_dict(subject) if subject else None
# If no subject and no SANs provided, prompt interactively
if not subj_str and not args.san:
subj_dict, san_list = csrmod.prompt_for_subject_and_sans()
subj_str = _build_subj_from_dict(subj_dict) if subj_dict else None
csr_pem = csrmod.create_csr_from_key(args.key_file, subj_str, sans=san_list, passphrase=args.passphrase)
else:
# Validate SANs supplied on CLI
norm_sans = []
if args.san:
wildcard_sans = []
try:
for s in args.san:
ns = csrmod.normalize_and_validate_san(s)
norm_sans.append(ns)
if ns.startswith("DNS:*.") or "*" in ns:
wildcard_sans.append(ns)
except ValueError as e:
from .term import print_error
print_error(f"Invalid SAN provided: {e}")
return 1
# Handle wildcard SANs: interactive confirmation or explicit allow
if wildcard_sans:
if args.allow_wildcard:
# Use formatted warning helper for reusable behavior
from .term import print_warning
print_warning("WARNING: --allow-wildcard used. Wildcard SANs broaden certificate scope and can be risky.")
else:
# If interactive terminal, confirm each wildcard SAN
try:
is_tty = sys.stdin.isatty()
except Exception:
is_tty = False
if not is_tty:
from .term import print_error
print_error("Wildcard SAN detected in CLI arguments; rerun with --allow-wildcard to allow it in non-interactive mode.")
return 1
# interactive confirm
for w in list(wildcard_sans):
ans = input(f"Wildcard SAN detected: {w}. Type 'yes' to include it, or anything else to exclude: ").strip().lower()
if ans != "yes":
# remove from normalized list
norm_sans = [x for x in norm_sans if x != w]
from .term import print_info
print_info(f"Excluded {w}")
# it as None so OpenSSL will not add subjectAltName.
san_to_use = norm_sans if args.san else None
csr_pem = csrmod.create_csr_from_key(args.key_file, subj_str, sans=san_to_use, passphrase=args.passphrase)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(csr_pem)
else:
sys.stdout.write(csr_pem)
return 0
if args.csr_cmd == "show":
with open(args.csr_file, "r", encoding="utf-8") as f:
csr_pem = f.read()
from . import csr as csrmod
# Rudimentary output: show the CSR text parsed by openssl
import subprocess
openssl = csrmod._openssl_bin()
p = subprocess.run([openssl, "req", "-in", "/dev/stdin", "-noout", "-text"], input=csr_pem.encode("utf-8"), check=True, capture_output=True)
sys.stdout.write(p.stdout.decode("utf-8"))
return 0
if args.csr_cmd == "submit":
with open(args.csr_file, "r", encoding="utf-8") as f:
csr_pem = f.read()
# validate SANs if present and wildcard rules
from . import csr as csrmod
# parse SANs in the CSR via openssl text
import subprocess
openssl = csrmod._openssl_bin()
p = subprocess.run([openssl, "req", "-in", "/dev/stdin", "-noout", "-text"], input=csr_pem.encode("utf-8"), check=True, capture_output=True)
txt = p.stdout.decode("utf-8")
import re
sans = [f"DNS:{m.group(1)}" for m in re.finditer(r"DNS:([^,\s]+)", txt)]
for m in re.finditer(r"IP:?\s*Address:?\s*([^,\s]+)", txt, flags=re.IGNORECASE):
sans.append(f"IP:{m.group(1)}")
# wildcard checks
if any(s.startswith("DNS:*." ) or "*" in s for s in sans) and not args.allow_wildcard:
from .term import print_error
print_error("Wildcard SAN detected in CSR; pass --allow-wildcard to confirm you want to submit it.")
return 1
# dispatch to adapter
from .ca.mock import MockCA
from .ca.sectigo import SectigoCA
if args.ca == "mock":
adapter = MockCA()
else:
adapter = SectigoCA()
req_id = adapter.submit_csr(csr_pem, args.name)
from .term import print_info
print_info(f"Submitted CSR, request id: {req_id}")
if args.wait:
status = adapter.poll_status(req_id, timeout=300)
from .term import print_info
print_info(f"Final status: {status}")
if status == "issued":
cert = adapter.download_certificate(req_id)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(cert)
else:
sys.stdout.write(cert)
else:
from .term import print_error
print_error(f"Request ended with status: {status}")
return 0
parser.print_help()
return 2
if __name__ == "__main__":
raise SystemExit(main())

200
legacy/certctl/console.py Normal file
View File

@@ -0,0 +1,200 @@
"""NetScaler Console (ADM/Console) NITRO helpers.
Design goals:
- Log in ONCE to obtain a session token ("sessionid" in the login response).
- Use that token on subsequent requests (Cookie: NITRO_AUTH_TOKEN=...).
- Keep credentials out of git: caller can provide password via keychain/env.
Notes:
- In some Console builds, download endpoints authorize *only* via the cookie,
even if Basic auth / X-NITRO headers are supplied.
- Requests' cookie jar can contain multiple cookies with the same name but
different paths, leading to CookieConflictError. To avoid that entire class
of issues, we do not rely on the cookie jar; we store the token ourselves
and set the Cookie header explicitly.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Dict, Optional
import requests
class NitroError(RuntimeError):
def __init__(self, status: int, message: str, payload: Any | None = None, headers: Dict[str, str] | None = None):
super().__init__(f"NITRO error (HTTP {status}): {message}")
self.status = status
self.message = message
self.payload = payload
self.headers = headers or {}
@dataclass
class ConsoleSession:
base_url: str
username: str
password: str
insecure: bool = False
timeout: int = 30
_token: Optional[str] = None
_http: Optional[requests.Session] = None
def _sess(self) -> requests.Session:
if self._http is None:
self._http = requests.Session()
return self._http
@property
def token(self) -> str:
if not self._token:
raise RuntimeError("Not logged in yet")
return self._token
def login(self) -> str:
"""Login and return the session token (NITRO_AUTH_TOKEN value)."""
url = f"{self.base_url.rstrip('/')}/nitro/v2/config/login"
payload = {"login": {"username": self.username, "password": self.password}}
r = requests.post(
url,
json=payload,
auth=(self.username, self.password),
verify=not self.insecure,
timeout=self.timeout,
)
try:
j = r.json()
except Exception:
raise NitroError(r.status_code, r.text)
if r.status_code >= 400 or (isinstance(j, dict) and j.get("errorcode") not in (None, 0)):
msg = j.get("message") if isinstance(j, dict) else str(j)
raise NitroError(r.status_code, msg or "Login failed", j)
try:
token = j["login"][0]["sessionid"]
except Exception:
raise NitroError(r.status_code, "Login response missing sessionid", j)
self._token = token
return token
# ---------------------- low-level HTTP helpers ----------------------
def _headers(self, extra: Optional[Dict[str, str]] = None, include_auth_headers: bool = True) -> Dict[str, str]:
h: Dict[str, str] = {}
if self._token:
h["Cookie"] = f"NITRO_AUTH_TOKEN={self._token}"
if include_auth_headers:
h["X-NITRO-USER"] = self.username
h["X-NITRO-PASS"] = self.password
if extra:
h.update(extra)
return h
def request(self, method: str, path: str, *, params: Dict[str, str] | None = None,
json_body: Any | None = None, data: Any | None = None,
headers: Dict[str, str] | None = None,
include_auth_headers: bool = True,
stream: bool = False) -> requests.Response:
if not self._token:
self.login()
url = f"{self.base_url.rstrip('/')}{path}"
r = self._sess().request(
method,
url,
params=params,
json=json_body,
data=data,
headers=self._headers(headers, include_auth_headers=include_auth_headers),
verify=not self.insecure,
timeout=self.timeout,
stream=stream,
)
return r
def _raise_for_nitro(self, r: requests.Response) -> None:
if r.status_code < 400:
return
# try to decode JSON error payload
msg = r.text
payload: Any = None
try:
payload = r.json()
if isinstance(payload, dict) and "message" in payload:
msg = payload.get("message") or msg
except Exception:
payload = None
raise NitroError(r.status_code, msg, payload, headers=dict(r.headers))
# ---------------------- Console operations we need ----------------------
def create_ssl_key(self, *, file_name: str, algo: str, keysize: int | None = None,
ec_curve: str | None = None, keyform: str = "PEM",
encrypt: bool = False, passphrase: str | None = None,
des_type: str = "AES256") -> Dict[str, Any]:
"""Create a private key file stored on Console.
This maps to the ns_ssl_key resource in Console NITRO v2.
We send form-urlencoded with an `object=` payload because Console
rejects raw JSON for some POSTs.
"""
obj: Dict[str, Any] = {
"file_name": file_name,
"keyform": keyform,
"algo": algo,
}
if keysize is not None:
obj["keysize"] = str(keysize)
if ec_curve:
obj["ec_curve"] = ec_curve
if encrypt:
# Console uses des_type naming even for AES in some builds
obj["des_type"] = des_type
if passphrase:
obj["passphrase"] = passphrase
payload = {"ns_ssl_key": obj}
data = {"object": json.dumps(payload)}
r = self.request(
"POST",
"/nitro/v2/config/ns_ssl_key",
params={"action": "create"},
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
self._raise_for_nitro(r)
return r.json()
def download_file(self, resource: str, file_name: str) -> bytes:
"""Download a stored file via /nitro/v2/download/<resource>/<file_name>."""
# For download endpoints, some builds want ONLY the cookie.
r = self.request(
"GET",
f"/nitro/v2/download/{resource}/{file_name}",
include_auth_headers=False,
stream=True,
)
self._raise_for_nitro(r)
return r.content
def list_ssl_certkeys(self, *, attrs: str | None = None) -> Dict[str, Any]:
"""List certificate-key objects stored on Console.
This uses the Console NITRO resource `ns_ssl_certkey`.
The response includes expiry-related fields such as `valid_to`
and `days_to_expiry` (stringified).
"""
params: Dict[str, str] = {}
if attrs:
params["attrs"] = attrs
r = self.request("GET", "/nitro/v2/config/ns_ssl_certkey", params=params)
self._raise_for_nitro(r)
return r.json()

116
legacy/certctl/csr.py Normal file
View File

@@ -0,0 +1,116 @@
"""CSR generation helpers.
We intentionally generate CSRs *locally* (OpenSSL) rather than via Console
because "ns_ssl_csr" is not supported in some Console deployments.
All outputs are PEM.
"""
from __future__ import annotations
import ipaddress
import re
import subprocess
from dataclasses import dataclass
from typing import Iterable, List, Optional
@dataclass
class Subject:
cn: str
o: str = ""
ou: str = ""
l: str = ""
st: str = ""
c: str = ""
def to_openssl_subj(self) -> str:
# Keep ordering stable; omit empty parts.
parts = [("CN", self.cn), ("O", self.o), ("OU", self.ou), ("L", self.l), ("ST", self.st), ("C", self.c)]
s = ""
for k, v in parts:
if v:
# OpenSSL expects slashes; escape slashes in values.
v = v.replace("/", "\\/")
s += f"/{k}={v}"
return s or f"/CN={self.cn}"
_SAN_SPLIT = re.compile(r"\s*,\s*|")
def parse_san_entries(raw: str) -> List[str]:
"""Parse a SAN string into OpenSSL-compatible entries.
Accepts input like:
- "DNS:example.com,DNS:www.example.com,IP:10.0.0.1"
- "example.com, www.example.com" (bare names become DNS:...)
Returns entries like ["DNS:example.com", "DNS:www.example.com", "IP:10.0.0.1"].
Raises ValueError if any entry is malformed.
"""
if not raw:
return []
# Split on commas; tolerate accidental whitespace.
items = [x.strip() for x in raw.split(",") if x.strip()]
out: List[str] = []
for item in items:
if ":" not in item:
# Bare value: infer DNS unless it's an IP.
try:
ipaddress.ip_address(item)
out.append(f"IP:{item}")
continue
except ValueError:
out.append(f"DNS:{item}")
continue
prefix, value = item.split(":", 1)
prefix_u = prefix.strip().upper()
value = value.strip()
if prefix_u in {"DNS", "IP"}:
if not value:
raise ValueError(f"Empty SAN value in '{item}'")
if prefix_u == "IP":
try:
ipaddress.ip_address(value)
except ValueError:
raise ValueError(f"Invalid IP SAN: '{item}'")
out.append(f"{prefix_u}:{value}")
else:
raise ValueError(
f"Unsupported SAN type '{prefix}' in '{item}'. Use DNS: or IP: (or bare hostnames)."
)
return out
def make_csr_openssl(
*,
key_pem_path: str,
subject: Subject,
san_entries: Iterable[str] | None = None,
key_passphrase: Optional[str] = None,
) -> str:
"""Generate a PEM CSR using openssl.
We prefer OpenSSL's `-addext` to avoid version-sensitive config quirks.
"""
cmd = ["openssl", "req", "-new", "-key", key_pem_path, "-subj", subject.to_openssl_subj()]
if key_passphrase:
cmd += ["-passin", f"pass:{key_passphrase}"]
san_entries = list(san_entries or [])
if san_entries:
san_value = ",".join(san_entries)
cmd += ["-addext", f"subjectAltName={san_value}"]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if p.returncode != 0:
raise RuntimeError(
"OpenSSL CSR generation failed:\n" + err.decode(errors="ignore")
)
return out.decode("utf-8", errors="ignore")

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import os
import time
from dataclasses import dataclass
@dataclass
class FileLock:
"""Cross-platform advisory file lock.
- POSIX: fcntl.flock
- Windows: msvcrt.locking
Best-effort: intended for a single machine on a local filesystem.
"""
path: str
timeout_seconds: float = 0.0
poll_interval_seconds: float = 0.1
def __post_init__(self) -> None:
self._fh = None
def acquire(self) -> None:
os.makedirs(os.path.dirname(os.path.abspath(self.path)) or '.', exist_ok=True)
fh = open(self.path, 'a+b')
start = time.time()
while True:
try:
if os.name == 'nt':
import msvcrt
# lock 1 byte; non-blocking attempt
msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1)
else:
import fcntl
fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
self._fh = fh
return
except Exception:
if self.timeout_seconds and (time.time() - start) >= self.timeout_seconds:
fh.close()
raise TimeoutError(f"Timed out waiting for lock: {self.path}")
time.sleep(self.poll_interval_seconds)
def release(self) -> None:
if not self._fh:
return
try:
if os.name == 'nt':
import msvcrt
self._fh.seek(0)
msvcrt.locking(self._fh.fileno(), msvcrt.LK_UNLCK, 1)
else:
import fcntl
fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
finally:
self._fh.close()
self._fh = None
def __enter__(self) -> "FileLock":
self.acquire()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.release()

120
legacy/certctl/keygen.py Normal file
View File

@@ -0,0 +1,120 @@
"""Key generation helpers using the OpenSSL CLI.
This module intentionally uses the system `openssl` command for portability on
macOS systems where the `cryptography` library may not be available by
default.
Functions return the PEM text of the generated private key.
"""
from typing import Optional
import shutil
import subprocess
class OpenSSLNotFound(Exception):
pass
def _openssl_bin() -> str:
path = shutil.which("openssl")
if not path:
raise OpenSSLNotFound("`openssl` not found in PATH")
return path
def generate_rsa_key(bits: int = 4096, passphrase: Optional[str] = None, cipher: Optional[str] = "des3", keychain_service: Optional[str] = None, save_to_keychain: bool = False, keychain_account: str = "certctl") -> str:
"""Generate an RSA private key in PEM format.
If `passphrase` is provided or available in `keychain_service`, the PEM
will be encrypted with the provided `cipher` (defaults to 3DES via
OpenSSL's `des3` v2 option).
If `save_to_keychain` is True and `keychain_service` is provided and a
passphrase was supplied by the caller, the passphrase will be saved into
the macOS Keychain using `certctl.storage.keychain_set`.
Returns PEM text.
"""
# Optionally save provided passphrase to keychain
if passphrase and keychain_service and save_to_keychain:
try:
from . import storage
storage.keychain_set(keychain_service, passphrase, account=keychain_account)
except Exception:
# Best-effort only; do not fail the key generation on keychain errors
pass
# Resolve passphrase via keychain if requested and not provided
if not passphrase and keychain_service:
try:
from . import storage
passphrase = storage.keychain_get(keychain_service)
except Exception:
pass
openssl = _openssl_bin()
# Generate an unencrypted private key into stdout using genpkey
gen_cmd = [openssl, "genpkey", "-algorithm", "RSA", "-pkeyopt", f"rsa_keygen_bits:{bits}", "-outform", "PEM"]
gen = subprocess.run(gen_cmd, check=True, capture_output=True)
private_pem = gen.stdout
if passphrase:
# Convert to PKCS#8 and encrypt
# Use -v2 <cipher> for modern encryption (des3 corresponds to 3DES)
topk8_cmd = [openssl, "pkcs8", "-topk8", "-v2", cipher, "-passout", f"pass:{passphrase}", "-outform", "PEM"]
topk8 = subprocess.run(topk8_cmd, input=private_pem, check=True, capture_output=True)
return topk8.stdout.decode("utf-8")
return private_pem.decode("utf-8")
def generate_ec_key(curve: str = "prime256v1", passphrase: Optional[str] = None, cipher: Optional[str] = "des3", keychain_service: Optional[str] = None, save_to_keychain: bool = False, keychain_account: str = "certctl") -> str:
"""Generate an EC private key (PEM) for given curve name (e.g., prime256v1 / secp384r1).
If `passphrase` is provided or available in `keychain_service`, the PEM
will be encrypted using PKCS#8 v2.
If `save_to_keychain` is True and `keychain_service` is provided and a
passphrase was supplied by the caller, the passphrase will be saved into
the macOS Keychain using `certctl.storage.keychain_set`.
"""
# Optionally save provided passphrase to keychain
if passphrase and keychain_service and save_to_keychain:
try:
from . import storage
storage.keychain_set(keychain_service, passphrase, account=keychain_account)
except Exception:
pass
# Resolve passphrase via keychain if requested and not provided
if not passphrase and keychain_service:
try:
from . import storage
passphrase = storage.keychain_get(keychain_service)
except Exception:
pass
openssl = _openssl_bin()
gen_cmd = [openssl, "ecparam", "-name", curve, "-genkey", "-noout", "-outform", "PEM"]
gen = subprocess.run(gen_cmd, check=True, capture_output=True)
private_pem = gen.stdout
if passphrase:
topk8_cmd = [openssl, "pkcs8", "-topk8", "-v2", cipher, "-passout", f"pass:{passphrase}", "-outform", "PEM"]
topk8 = subprocess.run(topk8_cmd, input=private_pem, check=True, capture_output=True)
return topk8.stdout.decode("utf-8")
return private_pem.decode("utf-8")
def generate_private_key(kind: str = "rsa", **kwargs) -> str:
"""Generic helper to generate a private key of a given `kind` ('rsa' or 'ec')."""
kind = kind.lower()
if kind == "rsa":
return generate_rsa_key(**kwargs)
if kind == "ec":
return generate_ec_key(**kwargs)
raise ValueError("Unsupported key kind: use 'rsa' or 'ec'")

View File

@@ -0,0 +1,32 @@
"""Minimal Netscaler Console helpers.
This module provides a thin wrapper around the extractor and a clear place to
add Console-specific parsing and upload helpers later.
"""
from typing import Optional
from .utils import extract_pem_from_json
def extract_csr_text(response_json: dict) -> Optional[str]:
"""Return CSR PEM text from a Console/device response JSON.
Prioritize the Console `ns_ssl_csr` response shape, then fall back to a
recursive search for the PEM block.
"""
if not isinstance(response_json, dict):
return extract_pem_from_json(response_json)
# Prefer well-known ns_ssl_csr array
ns_ssl_csr = response_json.get("ns_ssl_csr")
if isinstance(ns_ssl_csr, list):
for entry in ns_ssl_csr:
if isinstance(entry, dict):
csr = entry.get("csr")
if csr:
found = extract_pem_from_json(csr)
if found:
return found
# Fallback general search
return extract_pem_from_json(response_json)

185
legacy/certctl/poll.py Normal file
View File

@@ -0,0 +1,185 @@
from __future__ import annotations
import csv
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List
from .console import ConsoleSession
def _parse_valid_to(s: str) -> datetime | None:
"""Parse a `valid_to` string returned by Console.
Observed formats include:
- "Jan 09 2026 04:09:55"
- "2026-01-09 04:09:55"
Returns a timezone-aware datetime (UTC) if parsing succeeds.
"""
if not s:
return None
s = s.strip()
fmts = [
"%b %d %Y %H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%b %d %Y",
"%Y-%m-%d",
]
for fmt in fmts:
try:
dt = datetime.strptime(s, fmt)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
return None
@dataclass
class CertKeyRow:
file_name: str
display_name: str
days_to_expiry: int | None
valid_to: str
no_of_bound_entities: int | None
def poll_console_certkeys(
session: ConsoleSession,
*,
window_days: int = 30,
) -> Dict[str, Any]:
"""Fetch cert inventory and produce filtered views.
Returns a dict with:
- all: list[CertKeyRow]
- expired: list[CertKeyRow]
- expiring: list[CertKeyRow] (0 < days <= window_days)
- raw: raw NITRO JSON response
"""
attrs = ",".join(
[
"file_name",
"display_name",
"valid_to",
"days_to_expiry",
"no_of_bound_entities",
]
)
raw = session.list_ssl_certkeys(attrs=attrs)
items = raw.get("ns_ssl_certkey") or raw.get("ns_ssl_certkey_response") or []
rows: List[CertKeyRow] = []
for it in items:
file_name = str(it.get("file_name", "") or "")
display_name = str(it.get("display_name", "") or "")
valid_to = str(it.get("valid_to", "") or "")
dte_raw = it.get("days_to_expiry")
days_to_expiry: int | None = None
if dte_raw is not None and str(dte_raw).strip() != "":
try:
days_to_expiry = int(str(dte_raw))
except ValueError:
days_to_expiry = None
bound_raw = it.get("no_of_bound_entities")
bound: int | None = None
if bound_raw is not None and str(bound_raw).strip() != "":
try:
bound = int(str(bound_raw))
except ValueError:
bound = None
rows.append(
CertKeyRow(
file_name=file_name,
display_name=display_name,
days_to_expiry=days_to_expiry,
valid_to=valid_to,
no_of_bound_entities=bound,
)
)
def key_sort(r: CertKeyRow):
if r.days_to_expiry is None:
dt = _parse_valid_to(r.valid_to)
if dt:
delta = dt - datetime.now(timezone.utc)
return int(delta.total_seconds())
return 10**9
return r.days_to_expiry * 24 * 3600
rows_sorted = sorted(rows, key=key_sort)
expired = [r for r in rows_sorted if r.days_to_expiry is not None and r.days_to_expiry <= 0]
expiring = [
r
for r in rows_sorted
if r.days_to_expiry is not None and 0 < r.days_to_expiry <= window_days
]
return {"all": rows_sorted, "expired": expired, "expiring": expiring, "raw": raw}
def write_report(
report: Dict[str, Any],
*,
fmt: str = "table",
out_path: str | None = None,
include_all: bool = False,
) -> str:
"""Write report to a file (or return as a string for stdout)."""
rows: List[CertKeyRow] = report["all"] if include_all else (report["expired"] + report["expiring"])
if fmt == "json":
payload = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"expired": [r.__dict__ for r in report["expired"]],
"expiring": [r.__dict__ for r in report["expiring"]],
}
s = json.dumps(payload, indent=2)
elif fmt == "csv":
import io
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["file_name", "display_name", "days_to_expiry", "valid_to", "no_of_bound_entities"])
for r in rows:
w.writerow([r.file_name, r.display_name, r.days_to_expiry, r.valid_to, r.no_of_bound_entities])
s = buf.getvalue()
else:
# simple fixed-width table
headers = ["File", "Display", "Days", "Valid To", "Bound"]
data = [
[
r.file_name,
r.display_name,
"" if r.days_to_expiry is None else str(r.days_to_expiry),
r.valid_to,
"" if r.no_of_bound_entities is None else str(r.no_of_bound_entities),
]
for r in rows
]
cols = list(zip(headers, *data)) if data else [(h,) for h in headers]
widths = [min(60, max(len(str(x)) for x in col)) for col in cols]
def fmt_row(vals: List[str]) -> str:
return " ".join(str(v)[:w].ljust(w) for v, w in zip(vals, widths))
lines = [fmt_row(headers), fmt_row(["-" * w for w in widths])]
for row in data:
lines.append(fmt_row(row))
s = "\n".join(lines)
if out_path:
os.makedirs(os.path.dirname(os.path.abspath(out_path)) or ".", exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
f.write(s)
return s

52
legacy/certctl/san.py Normal file
View File

@@ -0,0 +1,52 @@
"""SAN parsing/normalization.
Accepts inputs like:
- "DNS:example.com,DNS:www.example.com"
- "example.com, www.example.com" (bare hostnames treated as DNS)
- "IP:10.0.0.1, 10.0.0.2" (bare IPs treated as IP)
Returns a list suitable for OpenSSL: ["DNS:...", "IP:...", ...].
"""
from __future__ import annotations
import ipaddress
from typing import List
def normalize_san_list(san_raw: str) -> List[str]:
if not san_raw:
return []
items = [x.strip() for x in san_raw.split(",") if x.strip()]
out: List[str] = []
for item in items:
if ":" in item:
k, v = item.split(":", 1)
k = k.strip().upper()
v = v.strip()
if not v:
continue
if k in {"DNS", "IP", "URI", "EMAIL", "RID"}:
out.append(f"{k}:{v}")
continue
# Unknown prefix: treat whole thing as DNS if it isn't an IP
item = v
# Bare value: detect IP vs DNS
try:
ipaddress.ip_address(item)
out.append(f"IP:{item}")
except ValueError:
out.append(f"DNS:{item}")
# de-dupe but preserve order
seen = set()
deduped: List[str] = []
for x in out:
k = x.upper()
if k not in seen:
seen.add(k)
deduped.append(x)
return deduped

View File

View File

@@ -0,0 +1,481 @@
#!/usr/bin/env python3
"""
cert_poll.py
Poll NetScaler Console for certificate inventory + expiry dates and generate a report.
Goals:
- Cross-platform (macOS/Windows/Linux)
- No external dependencies beyond requests (and optionally keyring)
- Uses NITRO v2 login once -> session token -> Cookie: NITRO_AUTH_TOKEN=...
- Produces JSON + Markdown report
- File lock to prevent concurrent runs on same machine
NOTE:
- NetScaler Console NITRO resources vary by build/license.
- This script tries a few known endpoints and will fall back gracefully.
"""
import argparse
import datetime as dt
import getpass
import json
import os
import platform
import re
import socket
import sys
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
# -----------------------------
# Utilities
# -----------------------------
class NitroError(RuntimeError):
def __init__(self, status_code: int, message: str, payload: Optional[dict] = None):
super().__init__(f"NITRO error (HTTP {status_code}): {message}")
self.status_code = status_code
self.message = message
self.payload = payload or {}
def _now_utc() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def _ts_local_compact() -> str:
# Example: 20260109-231530
return dt.datetime.now().strftime("%Y%m%d-%H%M%S")
def _safe_mkdir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def _write_text(path: Path, content: str) -> None:
path.write_text(content, encoding="utf-8")
def _write_bytes(path: Path, content: bytes) -> None:
path.write_bytes(content)
def _iso(dt_obj: Optional[dt.datetime]) -> Optional[str]:
if not dt_obj:
return None
return dt_obj.astimezone(dt.timezone.utc).isoformat()
def _parse_datetime_any(s: str) -> Optional[dt.datetime]:
"""
Attempts to parse common date formats seen in Console inventory.
Examples observed:
- "2026-01-09 04:09:55"
- "2026-01-09T04:09:55Z"
- epoch seconds as string
"""
if not s:
return None
s = str(s).strip()
if not s:
return None
# epoch seconds
if re.fullmatch(r"\d{9,12}", s):
try:
return dt.datetime.fromtimestamp(int(s), tz=dt.timezone.utc)
except Exception:
return None
# "YYYY-MM-DD HH:MM:SS"
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S"):
try:
return dt.datetime.strptime(s, fmt).replace(tzinfo=dt.timezone.utc)
except Exception:
pass
# ISO-ish
try:
# normalize trailing Z
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return dt.datetime.fromisoformat(s)
except Exception:
return None
# -----------------------------
# Cross-platform file lock
# -----------------------------
class FileLock:
"""
Simple cross-platform lock using atomic directory creation.
This avoids platform-specific fcntl/msvcrt issues and works well on local filesystems.
Lock is represented by a directory, e.g.:
/tmp/certctl.lock/
We also write metadata inside for debugging.
"""
def __init__(self, lock_dir: Path, stale_seconds: int = 3600):
self.lock_dir = lock_dir
self.stale_seconds = stale_seconds
def acquire(self, wait_seconds: int = 0) -> None:
start = time.time()
while True:
try:
self.lock_dir.mkdir(parents=True, exist_ok=False)
# write metadata
meta = {
"pid": os.getpid(),
"host": socket.gethostname(),
"user": getpass.getuser(),
"platform": platform.platform(),
"created": _now_utc().isoformat(),
}
_write_text(self.lock_dir / "meta.json", json.dumps(meta, indent=2))
return
except FileExistsError:
# check staleness
meta_path = self.lock_dir / "meta.json"
if meta_path.exists():
try:
m = json.loads(meta_path.read_text(encoding="utf-8"))
created = _parse_datetime_any(m.get("created", ""))
if created:
age = (_now_utc() - created.astimezone(dt.timezone.utc)).total_seconds()
if age > self.stale_seconds:
# stale lock -> remove
self.release(force=True)
continue
except Exception:
pass
if wait_seconds <= 0:
raise RuntimeError(f"Lock already held: {self.lock_dir}")
if time.time() - start > wait_seconds:
raise RuntimeError(f"Timed out waiting for lock: {self.lock_dir}")
time.sleep(0.25)
def release(self, force: bool = False) -> None:
if not self.lock_dir.exists():
return
# best effort cleanup
try:
for child in self.lock_dir.iterdir():
try:
child.unlink()
except Exception:
pass
self.lock_dir.rmdir()
except Exception:
if not force:
raise
# -----------------------------
# NetScaler Console NITRO v2 client
# -----------------------------
class ConsoleClient:
def __init__(self, console_url: str, insecure: bool = False, timeout: int = 30):
self.console_url = console_url.rstrip("/")
self.insecure = insecure
self.timeout = timeout
self.session = requests.Session()
self.token: Optional[str] = None
def _url(self, path: str) -> str:
if not path.startswith("/"):
path = "/" + path
return self.console_url + path
def login(self, username: str, password: str) -> str:
"""
POST /nitro/v2/config/login
Response includes login[0].sessionid, which must be used as NITRO_AUTH_TOKEN cookie.
"""
payload = {"login": {"username": username, "password": password}}
r = self.session.post(
self._url("/nitro/v2/config/login"),
json=payload,
verify=not self.insecure,
timeout=self.timeout,
)
j = self._json_or_raise(r)
try:
token = j["login"][0]["sessionid"]
except Exception:
raise NitroError(r.status_code, "Login response missing sessionid", j)
self.token = token
return token
def _headers(self) -> Dict[str, str]:
h: Dict[str, str] = {}
if self.token:
h["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
return h
def get(self, path: str, params: Optional[dict] = None) -> dict:
r = self.session.get(
self._url(path),
headers=self._headers(),
params=params or {},
verify=not self.insecure,
timeout=self.timeout,
)
return self._json_or_raise(r)
def _json_or_raise(self, r: requests.Response) -> dict:
try:
j = r.json()
except Exception:
raise NitroError(r.status_code, f"Non-JSON response: {r.text[:200]}")
# Console often returns 200 with errorcode != 0
if r.status_code >= 400:
msg = j.get("message") or r.reason or "HTTP error"
raise NitroError(r.status_code, msg, j)
if isinstance(j, dict) and "errorcode" in j and j.get("errorcode") not in (0, "0", None):
msg = j.get("message") or "NITRO error"
raise NitroError(r.status_code, msg, j)
return j
# -----------------------------
# Report model
# -----------------------------
@dataclass
class CertRecord:
name: str
subject: Optional[str]
issuer: Optional[str]
not_after: Optional[str]
days_remaining: Optional[int]
source: str # "console"
raw: Dict[str, Any]
def _days_remaining(not_after_dt: Optional[dt.datetime]) -> Optional[int]:
if not not_after_dt:
return None
delta = not_after_dt.astimezone(dt.timezone.utc) - _now_utc()
return int(delta.total_seconds() // 86400)
# -----------------------------
# Inventory fetch logic
# -----------------------------
def fetch_console_certs(client: ConsoleClient) -> List[CertRecord]:
"""
Tries multiple known endpoints and normalizes results.
Console may expose:
- /nitro/v1/config/ssl_cert
- /nitro/v1/config/ns_ssl_cert
- /nitro/v2/config/ssl_cert
- /nitro/v2/config/ns_ssl_cert
- /nitro/v1/config/ssl_certificate (analytics type) [not the same thing]
We'll try v2 first, then v1.
"""
candidates = [
("/nitro/v2/config/ns_ssl_cert?attrs=*", "ns_ssl_cert"),
("/nitro/v2/config/ssl_cert?attrs=*", "ssl_cert"),
("/nitro/v1/config/ns_ssl_cert?attrs=*", "ns_ssl_cert"),
("/nitro/v1/config/ssl_cert?attrs=*", "ssl_cert"),
]
last_err: Optional[Exception] = None
for path, key in candidates:
try:
j = client.get(path)
objs = j.get(key, [])
if not isinstance(objs, list):
continue
out: List[CertRecord] = []
for o in objs:
if not isinstance(o, dict):
continue
# Try common fields
name = (
o.get("file_name")
or o.get("certkey")
or o.get("name")
or o.get("certificate")
or "UNKNOWN"
)
subject = o.get("subject") or o.get("cert_subject") or o.get("certsubject")
issuer = o.get("issuer") or o.get("cert_issuer") or o.get("certissuer")
# expiry fields vary wildly
not_after = (
o.get("notafter")
or o.get("not_after")
or o.get("valid_to")
or o.get("expiry_date")
or o.get("cert_expiry")
or o.get("expiration")
)
not_after_dt = _parse_datetime_any(str(not_after)) if not_after else None
days = _days_remaining(not_after_dt)
out.append(
CertRecord(
name=str(name),
subject=str(subject) if subject else None,
issuer=str(issuer) if issuer else None,
not_after=_iso(not_after_dt),
days_remaining=days,
source="console",
raw=o,
)
)
# If endpoint exists but empty, still valid
return out
except Exception as e:
last_err = e
continue
raise RuntimeError(f"Unable to fetch cert inventory from Console. Last error: {last_err}")
# -----------------------------
# Renderers
# -----------------------------
def render_markdown(records: List[CertRecord], window_days: int, console: str) -> str:
now = _now_utc()
lines: List[str] = []
lines.append(f"# Certificate Expiry Report")
lines.append("")
lines.append(f"- Generated (UTC): `{now.isoformat()}`")
lines.append(f"- Console: `{console}`")
lines.append(f"- Renewal window: `{window_days} days`")
lines.append(f"- Total certs discovered: `{len(records)}`")
lines.append("")
expiring = [r for r in records if r.days_remaining is not None and r.days_remaining <= window_days]
expiring.sort(key=lambda r: (r.days_remaining if r.days_remaining is not None else 999999, r.name))
lines.append(f"## Expiring within {window_days} days ({len(expiring)})")
lines.append("")
if not expiring:
lines.append("_No certificates found in the renewal window._")
lines.append("")
return "\n".join(lines)
lines.append("| Days | Not After (UTC) | Name | Subject | Issuer |")
lines.append("|---:|---|---|---|---|")
for r in expiring:
lines.append(
f"| {r.days_remaining} | `{r.not_after or ''}` | `{r.name}` | `{r.subject or ''}` | `{r.issuer or ''}` |"
)
lines.append("")
return "\n".join(lines)
# -----------------------------
# Main
# -----------------------------
def main() -> int:
ap = argparse.ArgumentParser(
description="Poll NetScaler Console for cert expiry and generate a report."
)
ap.add_argument("--console", required=True, help="NetScaler Console base URL (e.g. https://192.168.113.2)")
ap.add_argument("--user", required=True, help="Console username")
ap.add_argument("--password", help="Console password (if omitted, prompt)")
ap.add_argument("--insecure", action="store_true", help="Disable TLS verification (lab use)")
ap.add_argument("--timeout", type=int, default=30, help="HTTP timeout seconds (default: 30)")
ap.add_argument("--window-days", type=int, default=30, help="Renewal window in days (default: 30)")
ap.add_argument("--out-dir", default="./reports", help="Output directory for reports (default: ./reports)")
ap.add_argument("--lock-file", default="./.certctl.lock", help="Lock dir path (default: ./.certctl.lock)")
ap.add_argument("--lock-wait", type=int, default=0, help="Seconds to wait for lock (default: 0)")
ap.add_argument("--lock-stale", type=int, default=3600, help="Stale lock seconds (default: 3600)")
args = ap.parse_args()
out_dir = Path(args.out_dir).expanduser().resolve()
lock_dir = Path(args.lock_file).expanduser().resolve()
_safe_mkdir(out_dir)
lock = FileLock(lock_dir, stale_seconds=args.lock_stale)
lock.acquire(wait_seconds=args.lock_wait)
try:
password = args.password or getpass.getpass(f"Console password for {args.user}@{args.console}: ")
client = ConsoleClient(args.console, insecure=args.insecure, timeout=args.timeout)
print("[console] Logging in...")
client.login(args.user, password)
print("[console] Login OK.")
print("[console] Fetching cert inventory...")
records = fetch_console_certs(client)
print(f"[console] Discovered {len(records)} cert records.")
# Filter only those with known expiry for reporting relevance
# (we still include unknown expiry in JSON for troubleshooting)
window = int(args.window_days)
report_ts = _ts_local_compact()
json_path = out_dir / f"expiry_{report_ts}.json"
md_path = out_dir / f"expiry_{report_ts}.md"
payload = {
"generated_utc": _now_utc().isoformat(),
"console": args.console,
"window_days": window,
"total_records": len(records),
"records": [asdict(r) for r in records],
}
_write_text(json_path, json.dumps(payload, indent=2))
_write_text(md_path, render_markdown(records, window, args.console))
print(f"[report] Wrote JSON: {json_path}")
print(f"[report] Wrote MD: {md_path}")
# convenience: latest pointers
latest_json = out_dir / "latest_expiry.json"
latest_md = out_dir / "latest_expiry.md"
_write_text(latest_json, json.dumps(payload, indent=2))
_write_text(latest_md, render_markdown(records, window, args.console))
print(f"[report] Updated latest_expiry.json / latest_expiry.md")
# Exit code useful for automation:
# 0 = ok
# 2 = certs in renewal window found
expiring = [r for r in records if r.days_remaining is not None and r.days_remaining <= window]
if expiring:
print(f"[alert] {len(expiring)} cert(s) within {window} days of expiry.")
return 2
return 0
finally:
lock.release(force=True)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""Create a private key on NetScaler Console, download it, and generate a CSR locally.
This is intentionally narrow-scope ("script 3" + "script 4" from your plan),
so it can be composed by a higher-level orchestrator later.
Outputs (PEM):
- <out_dir>/<key_name>
- <out_dir>/<csr_name>
- (optional) <out_dir>/latest.key and latest.csr symlinks/copies
Security:
- Console password and optional key passphrase can be pulled from the OS keychain
(macOS Keychain today; enterprise vault adapters can be added later).
NetScaler Console behavior:
- We log in once via /nitro/v2/config/login (POST) to obtain a session token.
- We include Cookie: NITRO_AUTH_TOKEN=<token> on subsequent calls (especially downloads).
Usage example:
python3 scripts/keycsr_console.py --console https://192.168.113.2 --user nsroot \
--app-name example.com --out-dir ./out --insecure
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from certctl.console import ConsoleClient, NitroError
from certctl.csr import Subject, make_csr_openssl
from certctl.secretstore import get_password, set_password
from certctl.san import normalize_san_list
@dataclass(frozen=True)
class DerivedNames:
key_name: str
csr_name: str
def _ts() -> str:
# Stable + filename safe
return datetime.now().strftime("%Y%m%d-%H%M%S")
def derive_names(app_name: str, rotate: bool) -> DerivedNames:
if rotate:
stamp = _ts()
base = f"{app_name}_{stamp}"
else:
base = app_name
return DerivedNames(key_name=f"{base}.key", csr_name=f"{base}.csr")
def prompt_choice(prompt: str, choices: list[str], default_index: int = 0) -> int:
while True:
print(prompt)
for i, c in enumerate(choices, 1):
default = " (default)" if i - 1 == default_index else ""
print(f" {i}) {c}{default}")
raw = input(f"Select [1-{len(choices)}]: ").strip()
if raw == "":
return default_index
if raw.isdigit() and 1 <= int(raw) <= len(choices):
return int(raw) - 1
print("Invalid selection. Try again.\n")
def prompt_yesno(prompt: str, default_yes: bool = True) -> bool:
suf = "[Y/n]" if default_yes else "[y/N]"
raw = input(f"{prompt} {suf} ").strip().lower()
if raw == "":
return default_yes
return raw in ("y", "yes")
def write_latest(out_dir: Path, latest_name: str, target_name: str) -> None:
latest = out_dir / latest_name
target = out_dir / target_name
try:
if latest.exists() or latest.is_symlink():
latest.unlink()
latest.symlink_to(target.name)
except Exception:
# Windows / restrictive FS: fall back to copy
latest.write_bytes(target.read_bytes())
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
prog="keycsr_console.py",
description="Create key on NetScaler Console, download it, and generate CSR locally.",
)
p.add_argument("--console", required=True, help="Console base URL, e.g. https://192.168.113.2")
p.add_argument("--user", required=True, help="Console username")
p.add_argument("--app-name", required=True, help="App name / base CN (used for derived filenames)")
p.add_argument("--out-dir", default="./out", help="Output directory")
p.add_argument("--rotate", action="store_true", default=True, help="Always timestamp filenames (default: on)")
p.add_argument(
"--no-rotate", dest="rotate", action="store_false", help="Do not timestamp filenames (NOT recommended)"
)
p.add_argument(
"--write-latest",
action="store_true",
default=True,
help="Write latest.key/latest.csr pointers (default: on)",
)
p.add_argument(
"--no-write-latest",
dest="write_latest",
action="store_false",
help="Disable writing latest.key/latest.csr",
)
p.add_argument("--insecure", action="store_true", help="Disable TLS verification")
args = p.parse_args(argv)
out_dir = Path(args.out_dir).expanduser().resolve()
out_dir.mkdir(parents=True, exist_ok=True)
names = derive_names(args.app_name, rotate=args.rotate)
print("\n[info] Derived names:")
print(f" Key: {names.key_name}")
print(f" CSR: {names.csr_name}")
print(f" Rotate: {'ON' if args.rotate else 'OFF'}")
print(f" Write latest: {'ON' if args.write_latest else 'OFF'}")
if not prompt_yesno("Proceed with these names?", default_yes=True):
print("Aborted.")
return 2
# ---- key type / encryption choices
key_choice = prompt_choice(
"Select key type:",
["RSA 4096 (common/compatible)", "ECDSA (best-practice default curve)"],
default_index=1,
)
key_algo = "RSA" if key_choice == 0 else "EC"
keysize = 4096 if key_algo == "RSA" else None
ec_curve = "P_256" if key_algo == "EC" else None
encrypt_key = prompt_yesno("Encrypt the private key with a passphrase?", default_yes=True)
# ---- credentials (keychain)
password = get_password(
service="netscaler-console",
account=f"{args.user}@{args.console}",
prompt=f"Console password for {args.user}@{args.console}: ",
store_prompt=True,
)
key_passphrase: str | None = None
if encrypt_key:
key_passphrase = get_password(
service="cert-private-key",
account=f"{args.app_name}",
prompt="Key passphrase (will be stored in keychain if you choose): ",
store_prompt=True,
)
# ---- CSR subject prompts
cn = input(f"CSR Common Name (CN): ").strip() or args.app_name
o = input("Organization (O) [optional]: ").strip()
ou = input("Org Unit (OU) [optional]: ").strip()
l = input("City/Locality (L) [optional]: ").strip()
st = input("State/Province (ST) [optional, spell out like 'Alabama' (not 'AL')]: ").strip()
c = input("Country (C) 2-letter [optional]: ").strip()
san_raw = input("SubjectAltName string (e.g. DNS:example.com,DNS:www.example.com) [optional]: ").strip()
san_list = normalize_san_list(san_raw)
subj = Subject(cn=cn, o=o or None, ou=ou or None, l=l or None, st=st or None, c=c or None)
# ---- Console operations
print("\n[console] Logging in to establish session (for download/upload authorization)...")
client = ConsoleClient(base_url=args.console, username=args.user, password=password, verify_tls=not args.insecure)
client.login()
print("[console] Login OK (token acquired).")
print("\n[console] Creating key on Console (never reuse keys)...")
try:
client.create_ssl_key(
file_name=names.key_name,
algo=key_algo,
keyform="PEM",
keysize=keysize,
ec_curve=ec_curve,
password=key_passphrase,
# prefer AES if supported; Console may fall back internally
cipher="AES256" if encrypt_key else None,
)
except NitroError as e:
# Common: name collision if rotate is off.
raise
print(f"[console] Key create accepted: {names.key_name}")
print("\n[console] Downloading KEY via /nitro/v2/download ...")
key_bytes = client.download_key(names.key_name)
key_path = out_dir / names.key_name
key_path.write_bytes(key_bytes)
os.chmod(key_path, 0o600)
print(f"[local] Wrote key: {key_path}")
if args.write_latest:
write_latest(out_dir, "latest.key", names.key_name)
print(f"[local] Wrote latest.key -> {names.key_name}")
# ---- CSR generation locally
print("\n[local] Generating CSR with OpenSSL...")
csr_pem = make_csr_openssl(
key_pem_path=str(key_path),
subject=subj,
san_entries=san_list,
key_passphrase=key_passphrase,
)
csr_path = out_dir / names.csr_name
csr_path.write_text(csr_pem, encoding="utf-8")
os.chmod(csr_path, 0o644)
print(f"[local] Wrote CSR: {csr_path}")
if args.write_latest:
write_latest(out_dir, "latest.csr", names.csr_name)
print(f"[local] Wrote latest.csr -> {names.csr_name}")
print("\nDone.")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except NitroError as e:
print(str(e), file=sys.stderr)
return_code = 1
raise SystemExit(return_code)

View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""Poll NetScaler Console for certificate expiry and generate a report.
This is a standalone script meant to be called from cron/CI or by a future
overlay orchestrator.
Data source
---------
Uses NITRO resource `ns_ssl_certkey` on NetScaler Console, which includes
fields like `valid_to` and `days_to_expiry`.
Exit codes
---------
0: No expired certs and none within the renewal window
2: At least one cert is within the renewal window
3: At least one cert is expired
Security
---------
- Console password can be pulled from macOS Keychain via the existing
secretstore helper.
- Use `--insecure` for self-signed lab certs, or provide `--ca-bundle`.
"""
from __future__ import annotations
# Allow running this script directly from the repo without installation.
import pathlib
import sys
_REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
import argparse
import os
import sys
from certctl.console import ConsoleSession
from certctl.filelock import FileLock
from certctl.poll import poll_console_certkeys, write_report
from certctl import secretstore
from certctl.term import print_error, print_info
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="nsconsole_certpoll",
description="Poll NetScaler Console for expired / expiring certificates.",
)
p.add_argument("--console", required=True, help="Console base URL, e.g. https://192.168.113.2")
p.add_argument("--user", required=True, help="Console username")
p.add_argument("--insecure", action="store_true", help="Disable TLS verification")
p.add_argument("--ca-bundle", default=None, help="Path to CA bundle for TLS verify")
p.add_argument("--window-days", type=int, default=30, help="Renewal window in days (default: 30)")
p.add_argument("--format", choices=["table", "csv", "json"], default="table", help="Output format")
p.add_argument(
"--out",
default=None,
help="Write report to this file instead of stdout (csv/json only)",
)
p.add_argument(
"--lock-file",
default=None,
help="Optional lock file path (default: <out-dir>/.certctl.lock or ./ .certctl.lock)",
)
p.add_argument(
"--out-dir",
default=".",
help="Used only to place default lock file if --lock-file isn't set.",
)
return p
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
# File lock keeps cron from overlapping runs.
lock_path = args.lock_file or os.path.join(os.path.abspath(args.out_dir), ".certctl.lock")
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
# Our lightweight FileLock takes `timeout_seconds` (not `timeout`).
# timeout_seconds=0 means: fail immediately if another run holds the lock.
with FileLock(lock_path, timeout_seconds=0.0):
# Password lookup order:
# 1) CERTCTL_NSCONSOLE_PASS
# 2) macOS Keychain (service=certctl:nsconsole, account=<user>@<console_url>)
# 3) Prompt (optionally store to Keychain)
password = secretstore.env_or_keychain(
env_var="CERTCTL_NSCONSOLE_PASS",
service="certctl:nsconsole",
account=f"{args.user}@{args.console}",
)
if not password:
import getpass
password = getpass.getpass(f"Console password for {args.user}@{args.console}: ")
if password and secretstore.is_macos_keychain_available():
yn = input("Store Console password in keychain? [Y/n] ").strip().lower()
if yn in ("", "y", "yes"):
secretstore.set_in_keychain(
service="certctl:nsconsole",
account=f"{args.user}@{args.console}",
secret=password,
)
if not password:
# We intentionally avoid a mandatory interactive prompt here so the script
# stays automation-friendly. If you want to run interactively, just export
# CERTCTL_NSCONSOLE_PASS or store it in keychain.
print_error(
"No Console password found. Set CERTCTL_NSCONSOLE_PASS, "
"or store it in keychain under service 'certctl:nsconsole' and account '<user>@<console_url>'."
)
return 1
if args.ca_bundle:
print_error(
"--ca-bundle is not wired up yet in this script. "
"Use --insecure or rely on the system trust store."
)
return 1
sess = ConsoleSession(
base_url=args.console,
username=args.user,
password=password,
insecure=args.insecure,
)
sess.login()
report = poll_console_certkeys(sess, window_days=args.window_days)
write_report(report, fmt=args.format, out_path=args.out)
if args.out:
print_info(f"Wrote {args.format} report: {args.out}")
# Exit codes suitable for CI
expired = report.get("expired", [])
expiring = report.get("expiring", [])
if expired:
return 3
if expiring:
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,71 @@
{
"generated_utc": "2026-01-14T20:17:20.209722+00:00",
"console": "https://192.168.113.2",
"window_days": 30,
"total_records": 7,
"records": [
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
}
]
}

View File

@@ -0,0 +1,10 @@
# Certificate Expiry Report
- Generated (UTC): `2026-01-14T20:17:20.210081+00:00`
- Console: `https://192.168.113.2`
- Renewal window: `30 days`
- Total certs discovered: `7`
## Expiring within 30 days (0)
_No certificates found in the renewal window._

View File

@@ -0,0 +1,71 @@
{
"generated_utc": "2026-01-14T20:17:36.678328+00:00",
"console": "https://192.168.113.2",
"window_days": 60,
"total_records": 7,
"records": [
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
}
]
}

View File

@@ -0,0 +1,10 @@
# Certificate Expiry Report
- Generated (UTC): `2026-01-14T20:17:36.678687+00:00`
- Console: `https://192.168.113.2`
- Renewal window: `60 days`
- Total certs discovered: `7`
## Expiring within 60 days (0)
_No certificates found in the renewal window._

View File

@@ -0,0 +1,71 @@
{
"generated_utc": "2026-01-14T20:17:36.678328+00:00",
"console": "https://192.168.113.2",
"window_days": 60,
"total_records": 7,
"records": [
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
},
{
"name": "UNKNOWN",
"subject": null,
"issuer": null,
"not_after": null,
"days_remaining": null,
"source": "console",
"raw": {}
}
]
}

View File

@@ -0,0 +1,10 @@
# Certificate Expiry Report
- Generated (UTC): `2026-01-14T20:17:36.678957+00:00`
- Console: `https://192.168.113.2`
- Renewal window: `60 days`
- Total certs discovered: `7`
## Expiring within 60 days (0)
_No certificates found in the renewal window._

View File

@@ -0,0 +1,80 @@
"""Small secret storage abstraction.
For now:
- macOS: uses the built-in Keychain via the `security` CLI.
- other OSes: prompts only (no persistent storage) unless CERTCTL_* env vars used.
This keeps dependencies minimal and avoids coupling to a specific enterprise vault.
"""
from __future__ import annotations
import os
import platform
import subprocess
from typing import Optional
def _is_macos() -> bool:
return platform.system().lower() == "darwin"
def get_from_keychain(service: str, account: str) -> Optional[str]:
if not _is_macos():
return None
try:
# -w prints password only
out = subprocess.check_output(
["security", "find-generic-password", "-s", service, "-a", account, "-w"],
stderr=subprocess.DEVNULL,
)
return out.decode().strip()
except subprocess.CalledProcessError:
return None
def set_in_keychain(service: str, account: str, secret: str) -> None:
if not _is_macos():
return
# Add or update
subprocess.check_call(
[
"security",
"add-generic-password",
"-U",
"-s",
service,
"-a",
account,
"-w",
secret,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def env_or_keychain(env_var: str, service: str, account: str) -> Optional[str]:
v = os.environ.get(env_var)
if v:
return v
return get_from_keychain(service, account)
# ---------------------------------------------------------------------------
# Backwards-compatible aliases
# ---------------------------------------------------------------------------
def is_macos_keychain_available() -> bool:
"""Return True if this host is macOS (Keychain supported via `security`)."""
return _is_macos()
def get_secret(service: str, account: str) -> Optional[str]:
"""Alias for get_from_keychain(service, account)."""
return get_from_keychain(service, account)
def set_secret(service: str, account: str, secret: str) -> None:
"""Alias for set_in_keychain(service, account, secret)."""
set_in_keychain(service, account, secret)

62
legacy/certctl/storage.py Normal file
View File

@@ -0,0 +1,62 @@
"""Credential storage abstraction for macOS Keychain.
Provides small helpers to get/set generic passwords using the `security`
command on macOS. Functions are intentionally simple and easily testable via
stubbing subprocess calls.
"""
from typing import Optional
import subprocess
import shlex
import getpass
def keychain_get(service: str) -> Optional[str]:
"""Return the password for `service` from macOS Keychain, or None if not found."""
try:
# 'security find-generic-password -s <service> -w' prints the password
completed = subprocess.run(["security", "find-generic-password", "-s", service, "-w"], check=True, capture_output=True)
return completed.stdout.decode("utf-8").rstrip("\n")
except subprocess.CalledProcessError:
return None
def keychain_set(service: str, password: str, account: str = "certctl") -> bool:
"""Set a generic password in the Keychain for the given service."""
# Use -U to update if exists
try:
subprocess.run(["security", "add-generic-password", "-a", account, "-s", service, "-w", password, "-U"], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False
def keychain_delete(service: str) -> bool:
try:
subprocess.run(["security", "delete-generic-password", "-s", service], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False
def get_or_prompt_passphrase(service: Optional[str], prompt: Optional[str] = None) -> str:
"""Get passphrase from keychain or prompt the user interactively.
If `service` is provided, on a user-entered passphrase we attempt to store
it back into the Keychain for future use.
"""
if service:
existing = keychain_get(service)
if existing:
return existing
# Fallback to interactive prompt
text = getpass.getpass(prompt or "Enter passphrase for private key: ")
if service and text:
# best effort: set into keychain but ignore failures
try:
keychain_set(service, text)
except Exception:
pass
return text

73
legacy/certctl/term.py Normal file
View File

@@ -0,0 +1,73 @@
"""Terminal/TTY helpers for CLI formatting and safe output.
Provides small helpers to format and print warnings with optional ANSI styling
when the target stream is a TTY.
"""
from typing import Optional, TextIO
import sys
def format_warning(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted warning string.
If ``stream`` is a tty (supports ``.isatty()``) the message will be wrapped
with ANSI bold yellow codes for prominence; otherwise the plain message is
returned.
"""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;33m{msg}\033[0m"
return msg
def print_warning(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write a warning message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_warning(msg, stream) + "\n")
# Additional semantic helpers
def format_error(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted error string (bold red on TTY)."""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;31m{msg}\033[0m"
return msg
def print_error(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write an error message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_error(msg, stream) + "\n")
def format_info(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted info string (bold green on TTY)."""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;32m{msg}\033[0m"
return msg
def print_info(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write an info message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_info(msg, stream) + "\n")

62
legacy/certctl/utils.py Normal file
View File

@@ -0,0 +1,62 @@
"""Utility helpers for certctl package.
Includes a robust PEM extractor that searches arbitrary Console JSON payloads
for a PEM CSR block (-----BEGIN CERTIFICATE REQUEST----- ...
-----END CERTIFICATE REQUEST-----).
"""
from typing import Optional, Any, TextIO
import re
import sys
PEM_RE = re.compile(r"-----BEGIN CERTIFICATE REQUEST-----(?:.|\n)+?-----END CERTIFICATE REQUEST-----", re.DOTALL)
# Compatibility exports: delegate terminal formatting to certctl.term
from .term import format_warning, print_warning, format_error, print_error, format_info, print_info # re-exported for backwards compatibility
def extract_pem_from_json(obj: Any) -> Optional[str]:
"""Recursively search `obj` (dict/list/str) for the first PEM CSR block.
Returns the matched PEM string or None if not found.
"""
if isinstance(obj, str):
m = PEM_RE.search(obj)
if m:
return m.group(0)
return None
if isinstance(obj, dict):
# check common CSR locations quickly
# e.g., {"ns_ssl_csr": [{"csr": "-----BEGIN..."}]}
if "ns_ssl_csr" in obj:
val = obj.get("ns_ssl_csr")
if isinstance(val, list):
for item in val:
if isinstance(item, dict):
csr = item.get("csr")
if csr:
m = PEM_RE.search(csr)
if m:
return m.group(0)
# generic recursive walk
for v in obj.values():
res = extract_pem_from_json(v)
if res:
return res
if isinstance(obj, list):
# If it's a list of strings (rows), join them and search the whole block
if all(isinstance(i, str) for i in obj):
joined = "\n".join(obj)
m = PEM_RE.search(joined)
if m:
return m.group(0)
for item in obj:
res = extract_pem_from_json(item)
if res:
return res
return None

View File

@@ -0,0 +1,6 @@
"""nscert package
Lightweight package scaffold for Netscaler certificate tooling.
"""
__all__ = ["utils", "netscaler"]

13
legacy/nscert/ca/base.py Normal file
View File

@@ -0,0 +1,13 @@
"""Base CA adapter interface."""
from typing import Protocol, Optional
class CAAdapter(Protocol):
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
"""Submit a CSR and return a request id or token."""
def poll_status(self, request_id: str, timeout: int = 60) -> str:
"""Poll request status and return a final state string (e.g., 'issued' / 'pending' / 'failed')."""
def download_certificate(self, request_id: str) -> str:
"""Return the issued certificate PEM for the request_id."""

27
legacy/nscert/ca/mock.py Normal file
View File

@@ -0,0 +1,27 @@
"""A simple mock CA adapter for tests and local runs."""
from typing import Optional
from .base import CAAdapter
class MockCA(CAAdapter):
def __init__(self):
self._store = {}
self._counter = 0
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
self._counter += 1
rid = f"mock-{self._counter}"
# store and pretend it's issued immediately for simplicity
self._store[rid] = {
"csr": csr_pem,
"name": name,
"status": "issued",
"cert": f"-----BEGIN CERTIFICATE-----\nMockCertFor:{name}\n-----END CERTIFICATE-----\n",
}
return rid
def poll_status(self, request_id: str, timeout: int = 60) -> str:
return self._store.get(request_id, {}).get("status", "unknown")
def download_certificate(self, request_id: str) -> str:
return self._store.get(request_id, {}).get("cert")

View File

@@ -0,0 +1,35 @@
"""Sectigo CA adapter skeleton.
This file provides a class with the expected interface. Implementing a full
Sectigo integration requires API credentials and network access; this is a
skeleton with TODOs and a clear place to add HTTP calls.
"""
from typing import Optional
from .base import CAAdapter
class SectigoCA(CAAdapter):
def __init__(self, api_base: str = "https://api.sectigo.com", api_key: Optional[str] = None):
self.api_base = api_base
self.api_key = api_key
def submit_csr(self, csr_pem: str, name: str, options: Optional[dict] = None) -> str:
"""Submit CSR to Sectigo and return request id.
TODO: Implement actual HTTP POSTs with authentication and error handling.
"""
raise NotImplementedError("Sectigo submission not implemented yet")
def poll_status(self, request_id: str, timeout: int = 60) -> str:
"""Poll Sectigo for request status.
TODO: implement polling logic using Sectigo APIs.
"""
raise NotImplementedError("Sectigo polling not implemented yet")
def download_certificate(self, request_id: str) -> str:
"""Download issued certificate PEM.
TODO: fetch the issued certificate from Sectigo.
"""
raise NotImplementedError("Sectigo download not implemented yet")

231
legacy/nscert/cli.py Normal file
View File

@@ -0,0 +1,231 @@
"""Minimal CLI for nscert tooling.
Provides a `keygen` subcommand which wraps `nscert.keygen.generate_private_key`.
This is intentionally small and testable: `main(argv)` accepts an argv list.
"""
from __future__ import annotations
import argparse
import sys
from typing import List, Optional
from . import keygen
def _build_subj_from_dict(subject: dict) -> str:
parts = []
for k in ["C", "ST", "L", "O", "OU", "CN"]:
v = subject.get(k)
if v:
parts.append(f"/{k}={v}")
return "".join(parts)
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="nsctl", description="nscert control utility")
sub = p.add_subparsers(dest="command")
keygen_p = sub.add_parser("keygen", help="Generate a private key")
keygen_p.add_argument("--kind", choices=["rsa", "ec"], default="rsa")
keygen_p.add_argument("--bits", type=int, default=4096, help="RSA key bits")
keygen_p.add_argument("--curve", default="prime256v1", help="EC curve")
keygen_p.add_argument("--passphrase", help="Passphrase to encrypt key (insecure on CLI)")
keygen_p.add_argument("--keychain-service", help="Keychain service name to read/write passphrase")
keygen_p.add_argument("--save-passphrase", action="store_true", help="Save provided passphrase into Keychain")
keygen_p.add_argument("--keychain-account", default="nscert", help="Keychain account name when saving passphrase")
keygen_p.add_argument("--out", help="Write key to file instead of stdout")
# csr subcommands
csr_p = sub.add_parser("csr", help="Create and inspect CSRs")
csr_sub = csr_p.add_subparsers(dest="csr_cmd")
csr_create = csr_sub.add_parser("create", help="Create a CSR from a private key file")
csr_create.add_argument("--key-file", required=True, help="Path to the private key PEM")
csr_create.add_argument("--subject", help="Subject string in /C=.../ST=.../CN=... format or as key=value pairs like C=US,ST=CA,CN=example.com")
csr_create.add_argument("--san", action="append", help="SubjectAltName entry (DNS or IP); can be given multiple times")
csr_create.add_argument("--allow-wildcard", action="store_true", help="Allow wildcard SANs without interactive confirmation. Use with caution: wildcards broaden certificate scope; in non-interactive contexts this flag bypasses interactive confirmation.")
csr_create.add_argument("--from-cert", help="Path to an existing certificate to populate subject and SANs from")
csr_create.add_argument("--passphrase", help="Passphrase for encrypted key (insecure on CLI)")
csr_create.add_argument("--out", help="Write CSR to file instead of stdout")
csr_show = csr_sub.add_parser("show", help="Show CSR details (like SANs)")
csr_show.add_argument("--csr-file", required=True, help="Path to CSR PEM file")
csr_submit = csr_sub.add_parser("submit", help="Submit a CSR to a CA adapter")
csr_submit.add_argument("--csr-file", required=True, help="CSR PEM file to submit")
csr_submit.add_argument("--ca", choices=["mock", "sectigo"], default="mock", help="CA adapter to use")
csr_submit.add_argument("--name", required=True, help="Name for the cert request")
csr_submit.add_argument("--wait", action="store_true", help="Wait for issuance and download cert")
csr_submit.add_argument("--out", help="Write downloaded cert to file")
csr_submit.add_argument("--allow-wildcard", action="store_true", help="Allow submission of CSRs containing wildcard SANs. Use with caution: wildcards broaden certificate scope and may be unsafe.")
return p
def main(argv: Optional[List[str]] = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "keygen":
if args.kind == "rsa":
pem = keygen.generate_rsa_key(bits=args.bits, passphrase=args.passphrase, keychain_service=args.keychain_service, save_to_keychain=args.save_passphrase, keychain_account=args.keychain_account)
else:
pem = keygen.generate_ec_key(curve=args.curve, passphrase=args.passphrase, keychain_service=args.keychain_service, save_to_keychain=args.save_passphrase, keychain_account=args.keychain_account)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(pem)
else:
sys.stdout.write(pem)
return 0
if args.command == "csr":
if args.csr_cmd == "create":
# Parse subject: accept both /C=... style or comma separated key=value
subject = {}
subj = args.subject or ""
if subj.startswith("/"):
# convert /C=US/ST=... into dict
for part in subj.split("/"):
if not part:
continue
if "=" in part:
k, v = part.split("=", 1)
subject[k] = v
elif subj:
for part in subj.split(","):
if "=" in part:
k, v = part.split("=", 1)
subject[k.strip()] = v.strip()
csr_pem = None
from . import csr as csrmod
if args.from_cert:
# Populate subject and SANs from an existing cert
csr_pem = csrmod.create_csr_from_cert(args.from_cert, args.key_file, passphrase=args.passphrase)
else:
subj_str = subj if subj and subj.startswith("/") else _build_subj_from_dict(subject) if subject else None
# If no subject and no SANs provided, prompt interactively
if not subj_str and not args.san:
subj_dict, san_list = csrmod.prompt_for_subject_and_sans()
subj_str = _build_subj_from_dict(subj_dict) if subj_dict else None
csr_pem = csrmod.create_csr_from_key(args.key_file, subj_str, sans=san_list, passphrase=args.passphrase)
else:
# Validate SANs supplied on CLI
norm_sans = []
if args.san:
wildcard_sans = []
try:
for s in args.san:
ns = csrmod.normalize_and_validate_san(s)
norm_sans.append(ns)
if ns.startswith("DNS:*.") or "*" in ns:
wildcard_sans.append(ns)
except ValueError as e:
from .term import print_error
print_error(f"Invalid SAN provided: {e}")
return 1
# Handle wildcard SANs: interactive confirmation or explicit allow
if wildcard_sans:
if args.allow_wildcard:
# Use formatted warning helper for reusable behavior
from .term import print_warning
print_warning("WARNING: --allow-wildcard used. Wildcard SANs broaden certificate scope and can be risky.")
else:
# If interactive terminal, confirm each wildcard SAN
try:
is_tty = sys.stdin.isatty()
except Exception:
is_tty = False
if not is_tty:
from .term import print_error
print_error("Wildcard SAN detected in CLI arguments; rerun with --allow-wildcard to allow it in non-interactive mode.")
return 1
# interactive confirm
for w in list(wildcard_sans):
ans = input(f"Wildcard SAN detected: {w}. Type 'yes' to include it, or anything else to exclude: ").strip().lower()
if ans != "yes":
# remove from normalized list
norm_sans = [x for x in norm_sans if x != w]
from .term import print_info
print_info(f"Excluded {w}")
# it as None so OpenSSL will not add subjectAltName.
san_to_use = norm_sans if args.san else None
csr_pem = csrmod.create_csr_from_key(args.key_file, subj_str, sans=san_to_use, passphrase=args.passphrase)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(csr_pem)
else:
sys.stdout.write(csr_pem)
return 0
if args.csr_cmd == "show":
with open(args.csr_file, "r", encoding="utf-8") as f:
csr_pem = f.read()
from . import csr as csrmod
# Rudimentary output: show the CSR text parsed by openssl
import subprocess
openssl = csrmod._openssl_bin()
p = subprocess.run([openssl, "req", "-in", "/dev/stdin", "-noout", "-text"], input=csr_pem.encode("utf-8"), check=True, capture_output=True)
sys.stdout.write(p.stdout.decode("utf-8"))
return 0
if args.csr_cmd == "submit":
with open(args.csr_file, "r", encoding="utf-8") as f:
csr_pem = f.read()
# validate SANs if present and wildcard rules
from . import csr as csrmod
# parse SANs in the CSR via openssl text
import subprocess
openssl = csrmod._openssl_bin()
p = subprocess.run([openssl, "req", "-in", "/dev/stdin", "-noout", "-text"], input=csr_pem.encode("utf-8"), check=True, capture_output=True)
txt = p.stdout.decode("utf-8")
import re
sans = [f"DNS:{m.group(1)}" for m in re.finditer(r"DNS:([^,\s]+)", txt)]
for m in re.finditer(r"IP:?\s*Address:?\s*([^,\s]+)", txt, flags=re.IGNORECASE):
sans.append(f"IP:{m.group(1)}")
# wildcard checks
if any(s.startswith("DNS:*." ) or "*" in s for s in sans) and not args.allow_wildcard:
from .term import print_error
print_error("Wildcard SAN detected in CSR; pass --allow-wildcard to confirm you want to submit it.")
return 1
# dispatch to adapter
from .ca.mock import MockCA
from .ca.sectigo import SectigoCA
if args.ca == "mock":
adapter = MockCA()
else:
adapter = SectigoCA()
req_id = adapter.submit_csr(csr_pem, args.name)
from .term import print_info
print_info(f"Submitted CSR, request id: {req_id}")
if args.wait:
status = adapter.poll_status(req_id, timeout=300)
from .term import print_info
print_info(f"Final status: {status}")
if status == "issued":
cert = adapter.download_certificate(req_id)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(cert)
else:
sys.stdout.write(cert)
else:
from .term import print_error
print_error(f"Request ended with status: {status}")
return 0
parser.print_help()
return 2
if __name__ == "__main__":
raise SystemExit(main())

237
legacy/nscert/csr.py Normal file
View File

@@ -0,0 +1,237 @@
"""CSR creation helpers using the OpenSSL CLI.
Provides utilities to create a CSR from a private key file or PEM content,
supporting Subject fields and SubjectAltName entries.
"""
from typing import List, Optional, Dict
import shutil
import subprocess
import tempfile
import os
class OpenSSLNotFound(Exception):
pass
def _openssl_bin() -> str:
path = shutil.which("openssl")
if not path:
raise OpenSSLNotFound("`openssl` not found in PATH")
return path
def _build_subject_string(subject: Dict[str, str]) -> str:
# Accepts keys like C, ST, L, O, OU, CN
parts = []
for k in ["C", "ST", "L", "O", "OU", "CN"]:
v = subject.get(k)
if v:
parts.append(f"/{k}={v}")
return "".join(parts)
def normalize_and_validate_san(s: str) -> str:
"""Normalize and validate a SAN entry.
Accepts 'dns.example.com', 'IP', or prefixed 'DNS:...' / 'IP:...'.
Returns 'DNS:...' or 'IP:...' normalized string. Raises ValueError on invalid input.
"""
import ipaddress
s = s.strip()
if not s:
raise ValueError("empty SAN")
if s.upper().startswith("DNS:") or s.upper().startswith("IP:"):
prefix, val = s.split(":", 1)
val = val.strip()
else:
val = s
prefix = None
# Try IP first
try:
ipaddress.ip_address(val)
return f"IP:{val}"
except Exception:
pass
# Validate DNS name roughly (allow wildcard prefixes like '*.example.com')
import re
# simplified hostname regex (labels separated by dots)
HOST_RE = re.compile(r"^(?:[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?)(?:\.(?:[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?))*$")
if val.startswith("*."):
rest = val[2:]
if HOST_RE.match(rest):
return f"DNS:{val}"
if HOST_RE.match(val):
return f"DNS:{val}"
raise ValueError(f"Invalid SAN value: {s}")
def create_csr_from_key(key_path: str, subject: Optional[Dict[str, str]] = None, sans: Optional[List[str]] = None, passphrase: Optional[str] = None) -> str:
"""Create a CSR using OpenSSL and return the CSR PEM text.
- `key_path` must point to a private key file (PEM); if it is encrypted,
supply `passphrase` which will be passed to OpenSSL via `-passin`.
- `subject` may be provided either as a dict with fields like C, ST, L, O, OU, CN
or as a subject string starting with a leading slash, e.g. '/C=US/ST=CA/CN=example.com'.
- `sans` is a list of DNS names/IPs for SubjectAltName.
"""
openssl = _openssl_bin()
if isinstance(subject, dict):
subj = _build_subject_string(subject)
else:
subj = subject
# If sans look like ['DNS:...','IP:...'] normalize to plain names for config
norm_sans = []
if sans:
for s in sans:
# Normalize and validate; allow ValueError to bubble to caller
ns = normalize_and_validate_san(s)
norm_sans.append(ns)
with tempfile.TemporaryDirectory() as td:
conf_path = os.path.join(td, "csr.conf")
csr_path = os.path.join(td, "req.csr")
# Build minimal OpenSSL config with SANs if provided
conf_lines = ["[ req ]", "distinguished_name = req_distinguished_name", "prompt = no"]
if norm_sans:
conf_lines.append("req_extensions = v3_req")
conf_lines.append("")
conf_lines.append("[ req_distinguished_name ]")
# No need to fill DN here when using -subj, but keep the section present
conf_lines.append("")
if norm_sans:
conf_lines.append("[ v3_req ]")
conf_lines.append(f"subjectAltName = {', '.join(norm_sans)}")
with open(conf_path, "w", encoding="utf-8") as f:
f.write("\n".join(conf_lines))
cmd = [openssl, "req", "-new", "-key", key_path, "-out", csr_path, "-config", conf_path]
if subj:
cmd += ["-subj", subj]
if passphrase:
cmd += ["-passin", f"pass:{passphrase}"]
subprocess.run(cmd, check=True, capture_output=True)
with open(csr_path, "r", encoding="utf-8") as f:
return f.read()
def csr_has_san(csr_pem: str, san: str) -> bool:
"""Return True if the CSR PEM contains the provided SAN entry.
This uses `openssl req -in - -noout -text` to parse the CSR contents.
"""
openssl = _openssl_bin()
p = subprocess.run([openssl, "req", "-in", "/dev/stdin", "-noout", "-text"], input=csr_pem.encode("utf-8"), check=True, capture_output=True)
out = p.stdout.decode("utf-8")
# Accept a few representations: 'DNS:...' or 'IP:...' -> just look for the name/value
if san.startswith("DNS:"):
return san[4:] in out
if san.startswith("IP:"):
return san[3:] in out
return san in out
def extract_subject_and_sans_from_cert(cert_pem: str) -> (Dict[str, str], List[str]):
"""Extract subject fields and SANs from a certificate PEM using openssl.
Returns (subject_dict, san_list)
"""
openssl = _openssl_bin()
# Get subject in RFC2253 style for easier parsing
p = subprocess.run([openssl, "x509", "-in", "/dev/stdin", "-noout", "-subject", "-nameopt", "RFC2253"], input=cert_pem.encode("utf-8"), check=True, capture_output=True)
subj_out = p.stdout.decode("utf-8").strip()
# subj_out is like: "subject=CN=example.com,O=Example,C=US"
subj = subj_out.split("=", 1)[1] if "=" in subj_out else ""
subject_parts = {}
for part in subj.split(","):
if "=" in part:
k, v = part.split("=", 1)
subject_parts[k.strip()] = v.strip()
# Get SANs from the text representation
p2 = subprocess.run([openssl, "x509", "-in", "/dev/stdin", "-noout", "-text"], input=cert_pem.encode("utf-8"), check=True, capture_output=True)
txt = p2.stdout.decode("utf-8")
sans = []
import re
for m in re.finditer(r"DNS:([^,\s]+)", txt):
sans.append(f"DNS:{m.group(1)}")
# OpenSSL sometimes prints IPs as 'IP:' or 'IP Address: '
for m in re.finditer(r"IP:?\s*Address:?\s*([^,\s]+)", txt, flags=re.IGNORECASE):
sans.append(f"IP:{m.group(1)}")
for m in re.finditer(r"IP:([^,\s]+)", txt):
sans.append(f"IP:{m.group(1)}")
return subject_parts, sans
def create_csr_from_cert(cert_path: str, key_path: str, passphrase: Optional[str] = None) -> str:
"""Create a CSR based on an existing certificate's subject and SANs, signing with `key_path`."""
with open(cert_path, "r", encoding="utf-8") as f:
cert_pem = f.read()
subject, sans = extract_subject_and_sans_from_cert(cert_pem)
# convert subject dict into subject string
subj_str = _build_subject_string(subject) if subject else None
return create_csr_from_key(key_path, subj_str, sans=sans, passphrase=passphrase)
def prompt_for_subject_and_sans() -> (Dict[str, str], List[str]):
"""Interactively prompt the user for subject fields and SANs.
Returns a (subject_dict, san_list) tuple.
"""
fields = ["C", "ST", "L", "O", "OU", "CN"]
subject: Dict[str, str] = {}
from .term import print_info
print_info("Enter subject fields (press Enter to skip a field)")
for f in fields:
try:
val = input(f"{f}: ").strip()
except EOFError:
val = ""
if val:
subject[f] = val
print_info("Enter SANs (one per line). Examples: 'www.example.com' or '10.0.0.1'. Leave blank to finish.")
sans: List[str] = []
while True:
try:
s = input("SAN: ").strip()
except EOFError:
break
if not s:
break
try:
norm = normalize_and_validate_san(s)
except ValueError as e:
from .term import print_error
print_error(f"Invalid SAN: {e}. Try again.")
continue
# If this is a wildcard SAN, ask for explicit confirmation
if norm.startswith("DNS:*."):
ans = input(
f"Wildcard SAN detected: {norm}. Wildcard SANs (e.g., *.example.com) broaden certificate scope and can be risky. Type 'yes' to include it, or anything else to exclude: "
).strip().lower()
if ans != "yes":
from .term import print_info
print_info(f"Excluded {norm}")
continue
else:
from .term import print_info
print_info(f"Included {norm}")
# store the normalized SAN
sans.append(norm)
return subject, sans

120
legacy/nscert/keygen.py Normal file
View File

@@ -0,0 +1,120 @@
"""Key generation helpers using the OpenSSL CLI.
This module intentionally uses the system `openssl` command for portability on
macOS systems where the `cryptography` library may not be available by
default.
Functions return the PEM text of the generated private key.
"""
from typing import Optional
import shutil
import subprocess
class OpenSSLNotFound(Exception):
pass
def _openssl_bin() -> str:
path = shutil.which("openssl")
if not path:
raise OpenSSLNotFound("`openssl` not found in PATH")
return path
def generate_rsa_key(bits: int = 4096, passphrase: Optional[str] = None, cipher: Optional[str] = "des3", keychain_service: Optional[str] = None, save_to_keychain: bool = False, keychain_account: str = "nscert") -> str:
"""Generate an RSA private key in PEM format.
If `passphrase` is provided or available in `keychain_service`, the PEM
will be encrypted with the provided `cipher` (defaults to 3DES via
OpenSSL's `des3` v2 option).
If `save_to_keychain` is True and `keychain_service` is provided and a
passphrase was supplied by the caller, the passphrase will be saved into
the macOS Keychain using `nscert.storage.keychain_set`.
Returns PEM text.
"""
# Optionally save provided passphrase to keychain
if passphrase and keychain_service and save_to_keychain:
try:
from . import storage
storage.keychain_set(keychain_service, passphrase, account=keychain_account)
except Exception:
# Best-effort only; do not fail the key generation on keychain errors
pass
# Resolve passphrase via keychain if requested and not provided
if not passphrase and keychain_service:
try:
from . import storage
passphrase = storage.keychain_get(keychain_service)
except Exception:
pass
openssl = _openssl_bin()
# Generate an unencrypted private key into stdout using genpkey
gen_cmd = [openssl, "genpkey", "-algorithm", "RSA", "-pkeyopt", f"rsa_keygen_bits:{bits}", "-outform", "PEM"]
gen = subprocess.run(gen_cmd, check=True, capture_output=True)
private_pem = gen.stdout
if passphrase:
# Convert to PKCS#8 and encrypt
# Use -v2 <cipher> for modern encryption (des3 corresponds to 3DES)
topk8_cmd = [openssl, "pkcs8", "-topk8", "-v2", cipher, "-passout", f"pass:{passphrase}", "-outform", "PEM"]
topk8 = subprocess.run(topk8_cmd, input=private_pem, check=True, capture_output=True)
return topk8.stdout.decode("utf-8")
return private_pem.decode("utf-8")
def generate_ec_key(curve: str = "prime256v1", passphrase: Optional[str] = None, cipher: Optional[str] = "des3", keychain_service: Optional[str] = None, save_to_keychain: bool = False, keychain_account: str = "nscert") -> str:
"""Generate an EC private key (PEM) for given curve name (e.g., prime256v1 / secp384r1).
If `passphrase` is provided or available in `keychain_service`, the PEM
will be encrypted using PKCS#8 v2.
If `save_to_keychain` is True and `keychain_service` is provided and a
passphrase was supplied by the caller, the passphrase will be saved into
the macOS Keychain using `nscert.storage.keychain_set`.
"""
# Optionally save provided passphrase to keychain
if passphrase and keychain_service and save_to_keychain:
try:
from . import storage
storage.keychain_set(keychain_service, passphrase, account=keychain_account)
except Exception:
pass
# Resolve passphrase via keychain if requested and not provided
if not passphrase and keychain_service:
try:
from . import storage
passphrase = storage.keychain_get(keychain_service)
except Exception:
pass
openssl = _openssl_bin()
gen_cmd = [openssl, "ecparam", "-name", curve, "-genkey", "-noout", "-outform", "PEM"]
gen = subprocess.run(gen_cmd, check=True, capture_output=True)
private_pem = gen.stdout
if passphrase:
topk8_cmd = [openssl, "pkcs8", "-topk8", "-v2", cipher, "-passout", f"pass:{passphrase}", "-outform", "PEM"]
topk8 = subprocess.run(topk8_cmd, input=private_pem, check=True, capture_output=True)
return topk8.stdout.decode("utf-8")
return private_pem.decode("utf-8")
def generate_private_key(kind: str = "rsa", **kwargs) -> str:
"""Generic helper to generate a private key of a given `kind` ('rsa' or 'ec')."""
kind = kind.lower()
if kind == "rsa":
return generate_rsa_key(**kwargs)
if kind == "ec":
return generate_ec_key(**kwargs)
raise ValueError("Unsupported key kind: use 'rsa' or 'ec'")

View File

@@ -0,0 +1,32 @@
"""Minimal Netscaler Console helpers.
This module provides a thin wrapper around the extractor and a clear place to
add Console-specific parsing and upload helpers later.
"""
from typing import Optional
from .utils import extract_pem_from_json
def extract_csr_text(response_json: dict) -> Optional[str]:
"""Return CSR PEM text from a Console/device response JSON.
Prioritize the Console `ns_ssl_csr` response shape, then fall back to a
recursive search for the PEM block.
"""
if not isinstance(response_json, dict):
return extract_pem_from_json(response_json)
# Prefer well-known ns_ssl_csr array
ns_ssl_csr = response_json.get("ns_ssl_csr")
if isinstance(ns_ssl_csr, list):
for entry in ns_ssl_csr:
if isinstance(entry, dict):
csr = entry.get("csr")
if csr:
found = extract_pem_from_json(csr)
if found:
return found
# Fallback general search
return extract_pem_from_json(response_json)

62
legacy/nscert/storage.py Normal file
View File

@@ -0,0 +1,62 @@
"""Credential storage abstraction for macOS Keychain.
Provides small helpers to get/set generic passwords using the `security`
command on macOS. Functions are intentionally simple and easily testable via
stubbing subprocess calls.
"""
from typing import Optional
import subprocess
import shlex
import getpass
def keychain_get(service: str) -> Optional[str]:
"""Return the password for `service` from macOS Keychain, or None if not found."""
try:
# 'security find-generic-password -s <service> -w' prints the password
completed = subprocess.run(["security", "find-generic-password", "-s", service, "-w"], check=True, capture_output=True)
return completed.stdout.decode("utf-8").rstrip("\n")
except subprocess.CalledProcessError:
return None
def keychain_set(service: str, password: str, account: str = "nscert") -> bool:
"""Set a generic password in the Keychain for the given service."""
# Use -U to update if exists
try:
subprocess.run(["security", "add-generic-password", "-a", account, "-s", service, "-w", password, "-U"], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False
def keychain_delete(service: str) -> bool:
try:
subprocess.run(["security", "delete-generic-password", "-s", service], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False
def get_or_prompt_passphrase(service: Optional[str], prompt: Optional[str] = None) -> str:
"""Get passphrase from keychain or prompt the user interactively.
If `service` is provided, on a user-entered passphrase we attempt to store
it back into the Keychain for future use.
"""
if service:
existing = keychain_get(service)
if existing:
return existing
# Fallback to interactive prompt
text = getpass.getpass(prompt or "Enter passphrase for private key: ")
if service and text:
# best effort: set into keychain but ignore failures
try:
keychain_set(service, text)
except Exception:
pass
return text

73
legacy/nscert/term.py Normal file
View File

@@ -0,0 +1,73 @@
"""Terminal/TTY helpers for CLI formatting and safe output.
Provides small helpers to format and print warnings with optional ANSI styling
when the target stream is a TTY.
"""
from typing import Optional, TextIO
import sys
def format_warning(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted warning string.
If ``stream`` is a tty (supports ``.isatty()``) the message will be wrapped
with ANSI bold yellow codes for prominence; otherwise the plain message is
returned.
"""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;33m{msg}\033[0m"
return msg
def print_warning(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write a warning message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_warning(msg, stream) + "\n")
# Additional semantic helpers
def format_error(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted error string (bold red on TTY)."""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;31m{msg}\033[0m"
return msg
def print_error(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write an error message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_error(msg, stream) + "\n")
def format_info(msg: str, stream: Optional[TextIO] = None) -> str:
"""Return a formatted info string (bold green on TTY)."""
if stream is None:
stream = sys.stderr
try:
is_tty = stream.isatty()
except Exception:
is_tty = False
if is_tty:
return f"\033[1;32m{msg}\033[0m"
return msg
def print_info(msg: str, stream: Optional[TextIO] = None) -> None:
"""Write an info message followed by a newline to ``stream`` (defaults to stderr)."""
if stream is None:
stream = sys.stderr
stream.write(format_info(msg, stream) + "\n")

62
legacy/nscert/utils.py Normal file
View File

@@ -0,0 +1,62 @@
"""Utility helpers for nscert package.
Includes a robust PEM extractor that searches arbitrary Console JSON payloads
for a PEM CSR block (-----BEGIN CERTIFICATE REQUEST----- ...
-----END CERTIFICATE REQUEST-----).
"""
from typing import Optional, Any, TextIO
import re
import sys
PEM_RE = re.compile(r"-----BEGIN CERTIFICATE REQUEST-----(?:.|\n)+?-----END CERTIFICATE REQUEST-----", re.DOTALL)
# Compatibility exports: delegate terminal formatting to nscert.term
from .term import format_warning, print_warning, format_error, print_error, format_info, print_info # re-exported for backwards compatibility
def extract_pem_from_json(obj: Any) -> Optional[str]:
"""Recursively search `obj` (dict/list/str) for the first PEM CSR block.
Returns the matched PEM string or None if not found.
"""
if isinstance(obj, str):
m = PEM_RE.search(obj)
if m:
return m.group(0)
return None
if isinstance(obj, dict):
# check common CSR locations quickly
# e.g., {"ns_ssl_csr": [{"csr": "-----BEGIN..."}]}
if "ns_ssl_csr" in obj:
val = obj.get("ns_ssl_csr")
if isinstance(val, list):
for item in val:
if isinstance(item, dict):
csr = item.get("csr")
if csr:
m = PEM_RE.search(csr)
if m:
return m.group(0)
# generic recursive walk
for v in obj.values():
res = extract_pem_from_json(v)
if res:
return res
if isinstance(obj, list):
# If it's a list of strings (rows), join them and search the whole block
if all(isinstance(i, str) for i in obj):
joined = "\n".join(obj)
m = PEM_RE.search(joined)
if m:
return m.group(0)
for item in obj:
res = extract_pem_from_json(item)
if res:
return res
return None

571
legacy/nscertkeycreate.py Normal file
View File

@@ -0,0 +1,571 @@
#!/usr/bin/env python3
"""
nscertkeycreate.py
NetScaler Console key + CSR workflow:
- Create key on Console (RSA 4096 or ECDSA prime256v1 by default)
- Download key locally
- Generate CSR locally with OpenSSL (supports SAN)
- Optionally upload CSR back to Console
Fixes included:
- Login once to /nitro/v2/config/login; store sessionid token
- Use Cookie: NITRO_AUTH_TOKEN=<token> for subsequent requests (avoid CookieConflictError)
- Correct OpenSSL SAN config generation:
[alt_names]
DNS.1 = example.com
DNS.2 = www.example.com
IP.1 = 10.0.0.1
- Optional: --insecure disables TLS verification (curl -k equivalent)
- Optional: --ca-bundle for proper TLS verification with internal CA
- Keychain storage (Console password + key passphrase) via keyring (optional)
"""
import argparse
import datetime as _dt
import getpass
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple, Union
try:
import requests
except ImportError:
print("ERROR: This script requires 'requests'. Install with: pip install requests", file=sys.stderr)
raise
try:
import keyring # type: ignore
except Exception:
keyring = None # type: ignore
APP = "nscertkeycreate"
DEFAULT_EC_CURVE = "prime256v1"
DEFAULT_RSA_BITS = 4096
class NitroError(RuntimeError):
def __init__(self, status_code: int, message: str, payload: Any = None, headers: Any = None):
super().__init__(f"NITRO error (HTTP {status_code}): {message}")
self.status_code = status_code
self.message = message
self.payload = payload
self.headers = headers
def _now_stamp() -> str:
return _dt.datetime.now().strftime("%Y%m%d-%H%M%S")
def _safe_filename(name: str) -> str:
name = name.strip().replace("/", "_").replace("\\", "_")
return name
def _prompt_yes_no(prompt: str, default_yes: bool = True) -> bool:
suffix = " [Y/n] " if default_yes else " [y/N] "
while True:
ans = input(prompt + suffix).strip().lower()
if not ans:
return default_yes
if ans in ("y", "yes"):
return True
if ans in ("n", "no"):
return False
print("Please answer y or n.")
def _require_openssl() -> str:
path = shutil.which("openssl")
if not path:
raise RuntimeError("OpenSSL not found in PATH. Install it (or ensure 'openssl' is available).")
return path
def _run(cmd: list, *, input_bytes: Optional[bytes] = None, env: Optional[Dict[str, str]] = None) -> Tuple[int, bytes, bytes]:
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE if input_bytes is not None else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
out, err = p.communicate(input=input_bytes)
return p.returncode, out, err
def _keyring_available() -> bool:
return keyring is not None
def _kr_service(console_url: str) -> str:
return f"{APP}:{console_url.rstrip('/')}"
def keyring_get(console_url: str, username: str, which: str) -> Optional[str]:
if not _keyring_available():
return None
try:
return keyring.get_password(_kr_service(console_url), f"{username}:{which}") # type: ignore
except Exception:
return None
def keyring_set(console_url: str, username: str, which: str, value: str) -> None:
if not _keyring_available():
raise RuntimeError("keyring is not available (pip install keyring).")
keyring.set_password(_kr_service(console_url), f"{username}:{which}", value) # type: ignore
def keyring_delete(console_url: str, username: str, which: str) -> None:
if not _keyring_available():
return
try:
keyring.delete_password(_kr_service(console_url), f"{username}:{which}") # type: ignore
except Exception:
pass
VerifyType = Union[bool, str]
@dataclass
class NitroConsoleClient:
base: str
verify: VerifyType
token: Optional[str] = None
timeout: int = 60
def _url(self, path: str) -> str:
return self.base.rstrip("/") + path
def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
h: Dict[str, str] = {"Accept": "*/*"}
if self.token:
h["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
if extra:
h.update(extra)
return h
def login(self, username: str, password: str) -> str:
url = self._url("/nitro/v2/config/login")
payload = {"login": {"username": username, "password": password}}
r = requests.post(
url,
headers={"Content-Type": "application/json", "Accept": "application/json"},
json=payload,
verify=self.verify,
timeout=self.timeout,
)
try:
j = r.json()
except Exception:
raise NitroError(r.status_code, r.text)
if r.status_code >= 400:
msg = j.get("message") if isinstance(j, dict) else r.text
raise NitroError(r.status_code, str(msg), j)
try:
sess = j["login"][0]["sessionid"]
except Exception:
raise NitroError(r.status_code, "Login response did not include sessionid", j)
if not isinstance(sess, str) or not sess:
raise NitroError(r.status_code, "Login returned empty sessionid", j)
self.token = sess
return sess
def _parse_json(self, r: requests.Response) -> Dict[str, Any]:
headers = dict(r.headers)
try:
j = r.json()
except Exception:
if r.status_code >= 400:
raise NitroError(r.status_code, r.text, headers=headers)
raise NitroError(r.status_code, "Expected JSON but got non-JSON response", headers=headers)
if r.status_code >= 400:
msg = j.get("message") if isinstance(j, dict) else r.text
raise NitroError(r.status_code, str(msg), j, headers=headers)
if isinstance(j, dict) and "errorcode" in j and j.get("errorcode") not in (0, "0", None):
msg = j.get("message", "Unknown NITRO error")
raise NitroError(r.status_code, str(msg), j, headers=headers)
return j
def post_json(self, path: str, payload: Dict[str, Any], *, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
url = self._url(path)
r = requests.post(
url,
headers=self._headers({"Content-Type": "application/json", "Accept": "application/json"}),
params=params,
json=payload,
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(r)
def get_json(self, path: str, *, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
url = self._url(path)
r = requests.get(
url,
headers=self._headers({"Accept": "application/json"}),
params=params,
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(r)
def download_bytes(self, path: str) -> bytes:
url = self._url(path)
r = requests.get(url, headers=self._headers(), verify=self.verify, timeout=self.timeout)
if r.status_code >= 400:
try:
j = r.json()
msg = j.get("message") or r.text
raise NitroError(r.status_code, str(msg), j, headers=dict(r.headers))
except ValueError:
raise NitroError(r.status_code, r.text, headers=dict(r.headers))
return r.content
# High-level operations
def create_key(
self,
key_name: str,
*,
algo: str,
keyform: str = "PEM",
keysize: Optional[int] = None,
ec_curve: Optional[str] = None,
password: Optional[str] = None,
file_location_path: str = "",
) -> Dict[str, Any]:
obj: Dict[str, Any] = {
"file_name": key_name,
"keyform": keyform,
"algo": algo,
"file_location_path": file_location_path,
}
if keysize is not None:
obj["keysize"] = int(keysize)
if ec_curve:
obj["ec_curve"] = ec_curve
if password:
obj["password"] = password
return self.post_json("/nitro/v2/config/ns_ssl_key", {"ns_ssl_key": obj}, params={"action": "create"})
def download_key(self, key_name: str) -> bytes:
return self.download_bytes(f"/nitro/v2/download/ns_ssl_key/{key_name}")
def _pick_key_type_interactive() -> Dict[str, Any]:
print("Select key type:")
print(f" 1) RSA {DEFAULT_RSA_BITS} (common/compatible)")
print(" 2) ECDSA (best-practice default curve) (default)")
while True:
sel = input("Select [1-2]: ").strip() or "2"
if sel == "1":
return {"algo": "RSA", "keysize": DEFAULT_RSA_BITS}
if sel == "2":
return {"algo": "ECDSA", "ec_curve": DEFAULT_EC_CURVE}
print("Invalid selection. Choose 1 or 2.")
def _parse_san_string(san: str) -> Dict[str, list]:
"""
Input: "DNS:example.com,DNS:www.example.com,IP:10.0.0.1"
Output: {"DNS": ["example.com","www.example.com"], "IP": ["10.0.0.1"]}
"""
out: Dict[str, list] = {}
if not san.strip():
return out
items = [x.strip() for x in san.split(",") if x.strip()]
for it in items:
if ":" not in it:
raise RuntimeError("SAN entry must be like DNS:example.com or IP:10.0.0.1 (comma-separated).")
k, v = it.split(":", 1)
k = k.strip().upper()
v = v.strip()
if k not in ("DNS", "IP", "EMAIL", "URI"):
raise RuntimeError(f"Unsupported SAN type '{k}'. Use DNS, IP, EMAIL, or URI.")
if not v:
raise RuntimeError("SAN value cannot be empty.")
out.setdefault(k, []).append(v)
return out
def _openssl_make_csr(
key_file: pathlib.Path,
csr_file: pathlib.Path,
*,
cn: str,
o: str = "",
ou: str = "",
l: str = "",
st: str = "",
c: str = "",
san: str = "",
passphrase: Optional[str] = None,
) -> None:
openssl = _require_openssl()
subj_parts = []
if c:
subj_parts.append(f"/C={c}")
if st:
subj_parts.append(f"/ST={st}")
if l:
subj_parts.append(f"/L={l}")
if o:
subj_parts.append(f"/O={o}")
if ou:
subj_parts.append(f"/OU={ou}")
subj_parts.append(f"/CN={cn}")
subj = "".join(subj_parts)
extra_args = []
with tempfile.TemporaryDirectory() as td:
tdpath = pathlib.Path(td)
san_map = _parse_san_string(san)
cfg_path = None
if san_map:
cfg_path = tdpath / "openssl.cnf"
lines = []
lines += [
"[ req ]",
"default_md = sha256",
"prompt = no",
"distinguished_name = dn",
"req_extensions = req_ext",
"",
"[ dn ]",
f"CN = {cn}",
"",
"[ req_ext ]",
"subjectAltName = @alt_names",
"",
"[ alt_names ]",
]
# Correct OpenSSL alt_names syntax: DNS.1=, IP.1=, etc.
for typ in ("DNS", "IP", "EMAIL", "URI"):
vals = san_map.get(typ, [])
for idx, v in enumerate(vals, start=1):
lines.append(f"{typ}.{idx} = {v}")
cfg_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
extra_args = ["-config", str(cfg_path), "-reqexts", "req_ext"]
cmd = [
openssl, "req", "-new", "-sha256",
"-key", str(key_file),
"-out", str(csr_file),
"-subj", subj,
] + extra_args
if passphrase:
env = os.environ.copy()
env["NSCERTKEY_PASSPHRASE"] = passphrase
cmd += ["-passin", "env:NSCERTKEY_PASSPHRASE"]
rc, out, err = _run(cmd, env=env)
else:
rc, out, err = _run(cmd)
if rc != 0:
raise RuntimeError(f"OpenSSL CSR generation failed:\n{err.decode(errors='ignore')}")
def _write_file(path: pathlib.Path, data: bytes, mode: int) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
try:
os.chmod(path, mode)
except Exception:
pass
def _copy_latest(target: pathlib.Path, latest_name: str) -> None:
latest = target.parent / latest_name
try:
if latest.exists():
latest.unlink()
except Exception:
pass
shutil.copy2(target, latest)
def _console_password_flow(console_url: str, username: str, reset: bool) -> str:
if reset:
keyring_delete(console_url, username, "console_password")
pw = keyring_get(console_url, username, "console_password")
if pw:
return pw
pw = getpass.getpass(f"Console password for {username}@{console_url}: ")
if _keyring_available():
if _prompt_yes_no("Store Console password in keychain?", default_yes=True):
try:
keyring_set(console_url, username, "console_password", pw)
except Exception as e:
print(f"[warn] Could not store Console password in keychain: {e}", file=sys.stderr)
return pw
def _key_passphrase_flow(console_url: str, username: str, reset: bool, encrypt: bool) -> Optional[str]:
if not encrypt:
return None
if reset:
keyring_delete(console_url, username, "key_passphrase")
pp = keyring_get(console_url, username, "key_passphrase")
if pp:
return pp
pp = getpass.getpass("Key passphrase (will be stored in keychain if you choose): ")
if _keyring_available():
if _prompt_yes_no("Store key passphrase in keychain?", default_yes=True):
try:
keyring_set(console_url, username, "key_passphrase", pp)
except Exception as e:
print(f"[warn] Could not store key passphrase in keychain: {e}", file=sys.stderr)
return pp
def _prompt_subject() -> Dict[str, str]:
cn = input("CSR Common Name (CN): ").strip()
if not cn:
raise RuntimeError("CN is required.")
o = input("Organization (O) [optional]: ").strip()
ou = input("Org Unit (OU) [optional]: ").strip()
l = input("City/Locality (L) [optional]: ").strip()
st = input("State/Province (ST) [optional, spell out like 'Alabama' (not 'AL')]: ").strip()
c = input("Country (C) 2-letter [optional]: ").strip()
san = input("SubjectAltName string (e.g. DNS:example.com,DNS:www.example.com) [optional]: ").strip()
return {"cn": cn, "o": o, "ou": ou, "l": l, "st": st, "c": c, "san": san}
def build_names(app_name: str, *, rotate: bool) -> Tuple[str, str]:
app_name = _safe_filename(app_name)
if rotate:
stamp = _now_stamp()
return f"{app_name}_{stamp}.key", f"{app_name}_{stamp}.csr"
return f"{app_name}.key", f"{app_name}.csr"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--console", required=True)
ap.add_argument("--user", required=True)
ap.add_argument("--app-name", required=True)
ap.add_argument("--out-dir", required=True)
ap.add_argument("--no-rotate", action="store_true")
ap.add_argument("--no-latest", action="store_true")
ap.add_argument("--insecure", action="store_true", help="Disable TLS verification (curl -k)")
ap.add_argument("--ca-bundle", help="Path to CA bundle PEM file to trust Console's issuing CA")
ap.add_argument("--reset-password", action="store_true")
ap.add_argument("--reset-passphrase", action="store_true")
args = ap.parse_args()
console = args.console.rstrip("/")
username = args.user
out_dir = pathlib.Path(args.out_dir).expanduser().resolve()
rotate = not args.no_rotate
write_latest = not args.no_latest
# TLS verification selection
if args.ca_bundle:
ca_path = str(pathlib.Path(args.ca_bundle).expanduser().resolve())
verify: VerifyType = ca_path
elif args.insecure:
verify = False
# avoid noisy urllib3 warnings if user explicitly requested insecure
try:
from urllib3.exceptions import InsecureRequestWarning # type: ignore
warnings.simplefilter("ignore", InsecureRequestWarning)
except Exception:
pass
else:
verify = True
password = _console_password_flow(console, username, reset=args.reset_password)
client = NitroConsoleClient(base=console, verify=verify)
print("[console] Logging in to establish session (for download/upload authorization)...")
client.login(username, password)
print("[console] Login OK (session established).")
key_name, csr_name = build_names(args.app_name, rotate=rotate)
print("\n[info] Derived names:")
print(f" Key: {key_name}")
print(f" CSR: {csr_name}")
print(f" Rotate: {'ON' if rotate else 'OFF'}")
print(f" Write latest: {'ON' if write_latest else 'OFF'}")
if not _prompt_yes_no("Proceed with these names?", default_yes=True):
print("Aborted.")
return 2
kcfg = _pick_key_type_interactive()
encrypt_key = _prompt_yes_no("Encrypt the private key with a passphrase?", default_yes=True)
passphrase = _key_passphrase_flow(console, username, reset=args.reset_passphrase, encrypt=encrypt_key)
subj = _prompt_subject()
print("\n[console] Creating key on Console (never reuse keys)...")
client.create_key(
key_name,
algo=kcfg.get("algo", ""),
keyform="PEM",
keysize=kcfg.get("keysize"),
ec_curve=kcfg.get("ec_curve"),
password=passphrase if encrypt_key else None,
)
print(f"[console] Key create accepted: {key_name}")
print("\n[console] Downloading KEY via /nitro/v2/download ...")
key_bytes = client.download_key(key_name)
key_out = out_dir / key_name
_write_file(key_out, key_bytes, 0o600)
print(f"[local] Wrote key: {key_out}")
if write_latest:
_copy_latest(key_out, "latest.key")
print(f"[local] Wrote latest.key -> {key_out.name}")
print("\n[local] Generating CSR with OpenSSL...")
csr_out = out_dir / csr_name
_openssl_make_csr(
key_out, csr_out,
cn=subj["cn"], o=subj["o"], ou=subj["ou"], l=subj["l"], st=subj["st"], c=subj["c"],
san=subj["san"],
passphrase=passphrase if encrypt_key else None,
)
try:
os.chmod(csr_out, 0o644)
except Exception:
pass
print(f"[local] Wrote CSR: {csr_out}")
if write_latest:
_copy_latest(csr_out, "latest.csr")
print(f"[local] Wrote latest.csr -> {csr_out.name}")
print("\nDone.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,20 @@
def test_allow_wildcard_warning_colored_when_tty(monkeypatch, tmp_path, capsys):
from certctl import cli
from certctl import keygen
import sys
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Simulate stderr being a TTY so colored output should be used
monkeypatch.setattr(sys.stderr, "isatty", lambda: True, raising=False)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "*.example.com", "--allow-wildcard", "--out", str(out)])
captured = capsys.readouterr()
assert rc == 0
# ANSI bold yellow prefix
assert "\033[1;33mWARNING: --allow-wildcard used" in captured.err
# ANSI reset present
assert "\033[0m" in captured.err

View File

@@ -0,0 +1,13 @@
def test_allow_wildcard_warning_prints(monkeypatch, tmp_path, capsys):
from certctl import cli
from certctl import keygen
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "*.example.com", "--allow-wildcard", "--out", str(out)])
captured = capsys.readouterr()
assert rc == 0
assert "WARNING: --allow-wildcard used" in captured.err

View File

@@ -0,0 +1,21 @@
"""Tests for CLI CSR commands."""
import subprocess
from certctl import cli
def test_cli_csr_create_and_show(monkeypatch, tmp_path):
# Generate a key and write to file
from certctl import keygen
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/ST=CA/CN=example.com", "--san", "www.example.com", "--san", "10.0.0.1", "--out", str(out)])
assert rc == 0
assert out.exists()
# Show prints text; just call show to ensure no exception
rc2 = cli.main(["csr", "show", "--csr-file", str(out)])
assert rc2 == 0

View File

@@ -0,0 +1,33 @@
"""CLI tests for creating CSR from an existing certificate."""
from certctl import cli, keygen
def test_cli_csr_create_from_cert(tmp_path):
# Generate key
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Create a cert with SANs
conf = tmp_path / "conf.cnf"
conf.write_text("""[ req ]
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no
[ req_distinguished_name ]
[ v3_req ]
subjectAltName = DNS:www.example.com, IP:10.0.0.1
""")
cert_file = tmp_path / "cert.pem"
subj = "/C=US/ST=CA/CN=example.com"
import subprocess
subprocess.run(["openssl", "req", "-new", "-x509", "-key", str(key_file), "-out", str(cert_file), "-days", "1", "-config", str(conf), "-extensions", "v3_req", "-subj", subj], check=True)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--from-cert", str(cert_file), "--out", str(out)])
assert rc == 0
assert out.exists()
# Show the CSR to ensure content
rc2 = cli.main(["csr", "show", "--csr-file", str(out)])
assert rc2 == 0

View File

@@ -0,0 +1,12 @@
import pytest
def test_csr_create_help_contains_wildcard(capsys):
from certctl import cli
p = cli.build_parser()
with pytest.raises(SystemExit):
p.parse_args(["csr", "create", "--help"])
captured = capsys.readouterr()
out = captured.out + captured.err
assert '--allow-wildcard' in out
assert 'wildcard' in out.lower()

View File

@@ -0,0 +1,35 @@
"""Tests for the CLI wiring of keygen --save-passphrase flag."""
from certctl import cli
def test_cli_keygen_save_passphrase(monkeypatch, capsys):
called = {}
def fake_gen_rsa(**kwargs):
called.update(kwargs)
return "PEM-KEY"
monkeypatch.setattr("certctl.keygen.generate_rsa_key", fake_gen_rsa)
# Simulate cli args
rc = cli.main(["keygen", "--kind", "rsa", "--bits", "1024", "--passphrase", "fromcli", "--keychain-service", "svc-cli", "--save-passphrase"])
assert rc == 0
assert called["passphrase"] == "fromcli"
assert called["keychain_service"] == "svc-cli"
assert called["save_to_keychain"] is True
def test_cli_keygen_write_file(monkeypatch, tmp_path):
def fake_gen_ec(**kwargs):
return "PEM-EC"
monkeypatch.setattr("certctl.keygen.generate_ec_key", fake_gen_ec)
out = tmp_path / "key.pem"
rc = cli.main(["keygen", "--kind", "ec", "--curve", "prime256v1", "--out", str(out)])
assert rc == 0
assert out.exists()
assert out.read_text() == "PEM-EC"

21
legacy/tests/test_csr.py Normal file
View File

@@ -0,0 +1,21 @@
"""Tests for CSR creation using OpenSSL CLI."""
import tempfile
from certctl import keygen
from certctl import csr
def test_create_csr_with_san(tmp_path):
# Generate a temporary key
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "test.key"
key_file.write_text(key_pem)
subject = {"C": "US", "ST": "CA", "CN": "example.com"}
sans = ["www.example.com", "10.0.0.1"]
csr_pem = csr.create_csr_from_key(str(key_file), subject=subject, sans=sans)
assert csr_pem.startswith("-----BEGIN CERTIFICATE REQUEST-----")
# Ensure SANs are present in the CSR
assert csr.csr_has_san(csr_pem, "DNS:www.example.com")
assert csr.csr_has_san(csr_pem, "IP:10.0.0.1")

View File

@@ -0,0 +1,72 @@
"""Tests for CSR create when wildcard SANs are provided on the CLI."""
from certctl import cli
def test_cli_csr_create_cli_san_wildcard_interactive_confirm_yes(monkeypatch, tmp_path):
from certctl import keygen, csr as csrmod
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Simulate interactive terminal and confirmation
monkeypatch.setattr("builtins.input", lambda prompt='': "yes")
import sys
monkeypatch.setattr(sys.stdin, "isatty", lambda: True, raising=False)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "*.example.com", "--out", str(out)])
assert rc == 0
assert out.exists()
csr_pem = out.read_text()
assert csrmod.csr_has_san(csr_pem, "DNS:*.example.com")
def test_cli_csr_create_cli_san_wildcard_interactive_confirm_no(monkeypatch, tmp_path):
from certctl import keygen, csr as csrmod
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Simulate interactive terminal and decline confirmation
answers = iter(["no"]) # decline wildcard
monkeypatch.setattr("builtins.input", lambda prompt='': next(answers))
import sys
monkeypatch.setattr(sys.stdin, "isatty", lambda: True, raising=False)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "*.example.com", "--out", str(out)])
assert rc == 0
assert out.exists()
csr_pem = out.read_text()
assert not csrmod.csr_has_san(csr_pem, "DNS:*.example.com")
def test_cli_csr_create_cli_san_wildcard_noninteractive_fails(monkeypatch, tmp_path):
from certctl import keygen
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Simulate non-interactive (isatty False)
import sys
monkeypatch.setattr(sys.stdin, "isatty", lambda: False, raising=False)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--san", "*.example.com", "--out", str(out)])
assert rc != 0
def test_cli_csr_create_cli_san_wildcard_allow_flag_noninteractive(monkeypatch, tmp_path):
from certctl import keygen, csr as csrmod
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
import sys
monkeypatch.setattr(sys.stdin, "isatty", lambda: False, raising=False)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "*.example.com", "--allow-wildcard", "--out", str(out)])
assert rc == 0
csr_pem = out.read_text()
assert csrmod.csr_has_san(csr_pem, "DNS:*.example.com")

View File

@@ -0,0 +1,38 @@
"""Tests for creating CSR from an existing certificate's metadata."""
from certctl import keygen, csr
def test_create_csr_from_cert(tmp_path):
# Generate a key
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Create a certificate from the key with SANs using a temp OpenSSL config
conf = tmp_path / "conf.cnf"
conf.write_text("""[ req ]
distinguished_name = req_distinguished_name
req_extensions = v3_req
prompt = no
[ req_distinguished_name ]
[ v3_req ]
subjectAltName = DNS:www.example.com, IP:10.0.0.1
""")
cert_file = tmp_path / "cert.pem"
import subprocess
subj = "/C=US/ST=CA/CN=example.com"
subprocess.run(["openssl", "req", "-new", "-x509", "-key", str(key_file), "-out", str(cert_file), "-days", "1", "-config", str(conf), "-extensions", "v3_req", "-subj", subj], check=True)
csr_pem = csr.create_csr_from_cert(str(cert_file), str(key_file))
assert csr_pem.startswith("-----BEGIN CERTIFICATE REQUEST-----")
# Ensure SANs and CN are preserved
assert csr.csr_has_san(csr_pem, "DNS:www.example.com")
assert csr.csr_has_san(csr_pem, "IP:10.0.0.1")
# Check the CN shows up in the CSR subject
import subprocess
p = subprocess.run(["openssl", "req", "-in", "/dev/stdin", "-noout", "-subject"], input=csr_pem.encode("utf-8"), capture_output=True, check=True)
out = p.stdout.decode("utf-8")
import re
assert re.search(r"CN\s*=\s*example\.com", out)

View File

@@ -0,0 +1,102 @@
"""Tests for interactive CSR prompting."""
from certctl import csr
def test_prompt_for_subject_and_sans(monkeypatch):
answers = iter([
"US", # C
"CA", # ST
"", # L
"Example Org", # O
"", # OU
"example.com", # CN
"www.example.com", # SAN 1
"10.0.0.1", # SAN 2
"" # finish
])
monkeypatch.setattr("builtins.input", lambda prompt='': next(answers))
subj, sans = csr.prompt_for_subject_and_sans()
assert subj["C"] == "US"
assert subj["ST"] == "CA"
assert subj["O"] == "Example Org"
assert subj["CN"] == "example.com"
# prompt now returns normalized SANs like DNS:... or IP:...
assert "DNS:www.example.com" in sans
assert "IP:10.0.0.1" in sans
def test_cli_csr_create_interactive(monkeypatch, tmp_path):
# Generate a key
from certctl import keygen, cli
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
# Simulate interactive inputs (subject fields + SANs)
answers = iter([
"US", # C
"CA", # ST
"", # L
"Example Org", # O
"", # OU
"example.com", # CN
"www.example.com", # SAN 1
"10.0.0.1", # SAN 2
"" # finish
])
monkeypatch.setattr("builtins.input", lambda prompt='': next(answers))
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--out", str(out)])
assert rc == 0
assert out.exists()
# Target SAN should be present
from certctl import csr as csrmod
csr_pem = out.read_text()
assert csrmod.csr_has_san(csr_pem, "DNS:www.example.com")
assert csrmod.csr_has_san(csr_pem, "IP:10.0.0.1")
def test_prompt_for_subject_and_sans_wildcard_confirm_yes(monkeypatch):
answers = iter([
"US", # C
"CA", # ST
"", # L
"Example Org", # O
"", # OU
"example.com", # CN
"*.example.com", # SAN 1 (wildcard)
"yes", # confirm wildcard
"", # finish
])
monkeypatch.setattr("builtins.input", lambda prompt='': next(answers))
subj, sans = csr.prompt_for_subject_and_sans()
assert subj["C"] == "US"
assert any(s.startswith("DNS:*.example.com") for s in sans)
def test_prompt_for_subject_and_sans_wildcard_confirm_no(monkeypatch):
answers = iter([
"US", # C
"CA", # ST
"", # L
"Example Org", # O
"", # OU
"example.com", # CN
"*.example.com", # SAN 1 (wildcard)
"no", # do not confirm
"www.example.com", # SAN 2
"", # finish
])
monkeypatch.setattr("builtins.input", lambda prompt='': next(answers))
subj, sans = csr.prompt_for_subject_and_sans()
assert subj["C"] == "US"
assert not any(s.startswith("DNS:*.example.com") for s in sans)
assert any(s.startswith("DNS:www.example.com") for s in sans)

View File

@@ -0,0 +1,37 @@
"""Tests for CSR submit flow using the Mock CA adapter."""
from certctl import cli
from certctl.ca.mock import MockCA
def test_csr_submit_mock(tmp_path):
# Create a key and a CSR
from certctl import keygen, csr
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
csr_pem = csr.create_csr_from_key(str(key_file), subject={"C": "US", "CN": "test.local"}, sans=["www.test.local"])
csr_file = tmp_path / "r.csr"
csr_file.write_text(csr_pem)
# Submit via CLI using mock adapter
rc = cli.main(["csr", "submit", "--csr-file", str(csr_file), "--ca", "mock", "--name", "testreq", "--wait"])
assert rc == 0
def test_csr_submit_reject_wildcard(tmp_path):
from certctl import keygen, csr
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
csr_pem = csr.create_csr_from_key(str(key_file), subject={"C": "US", "CN": "test.local"}, sans=["*.example.com"])
csr_file = tmp_path / "r.csr"
csr_file.write_text(csr_pem)
rc = cli.main(["csr", "submit", "--csr-file", str(csr_file), "--ca", "mock", "--name", "testreq"])
assert rc != 0
# Allow wildcard
rc2 = cli.main(["csr", "submit", "--csr-file", str(csr_file), "--ca", "mock", "--name", "testreq", "--allow-wildcard", "--wait"])
assert rc2 == 0

View File

@@ -0,0 +1,6 @@
def test_docs_mention_wildcard():
import pathlib
p = pathlib.Path(__file__).resolve().parents[1] / 'docs' / 'USAGE.md'
txt = p.read_text()
assert 'wildcard' in txt.lower()
assert '--allow-wildcard' in txt

View File

@@ -0,0 +1,39 @@
"""Tests for OpenSSL-based key generation helpers."""
import re
from certctl.keygen import generate_rsa_key, generate_ec_key, generate_private_key
# We accept PKCS#1 (RSA PRIVATE KEY), PKCS#8 (PRIVATE KEY) or encrypted forms.
PEM_PRIV_RE = re.compile(r"-----BEGIN [A-Z0-9 -]*PRIVATE KEY-----")
def test_generate_rsa_unencrypted():
pem = generate_rsa_key(bits=2048)
assert pem.strip().startswith("-----BEGIN")
# basic sanity checks for PEM content
assert "PRIVATE KEY" in pem or "RSA PRIVATE KEY" in pem
assert "BEGIN" in pem and "END" in pem
def test_generate_rsa_encrypted():
pem = generate_rsa_key(bits=2048, passphrase="s3cr3t")
assert "ENCRYPTED" in pem or "ENCRYPTED PRIVATE KEY" in pem or "Proc-Type: 4,ENCRYPTED" in pem
def test_generate_ec_unencrypted():
pem = generate_ec_key(curve="prime256v1")
assert pem.strip().startswith("-----BEGIN")
assert "PRIVATE KEY" in pem and "BEGIN" in pem and "END" in pem
def test_generate_ec_encrypted():
pem = generate_ec_key(curve="secp384r1", passphrase="s3cr3t")
assert "ENCRYPTED" in pem or "ENCRYPTED PRIVATE KEY" in pem
def test_generate_private_key_helper():
pem = generate_private_key(kind="rsa", bits=2048)
assert PEM_PRIV_RE.search(pem)
pem2 = generate_private_key(kind="ec", curve="prime256v1")
assert PEM_PRIV_RE.search(pem2)

View File

@@ -0,0 +1,29 @@
"""Integration tests to verify keygen resolves passphrases from keychain."""
from certctl import keygen
def test_generate_rsa_using_keychain(monkeypatch):
# Return a known passphrase from keychain and ensure result is encrypted
monkeypatch.setattr(keygen, "_openssl_bin", lambda: "/usr/bin/openssl")
def fake_keychain_get(service):
assert service == "svc1"
return "from-keychain"
monkeypatch.setattr("certctl.storage.keychain_get", fake_keychain_get)
pem = keygen.generate_rsa_key(bits=1024, passphrase=None, keychain_service="svc1")
assert "ENCRYPTED" in pem or "ENCRYPTED PRIVATE KEY" in pem
def test_generate_ec_using_keychain(monkeypatch):
monkeypatch.setattr(keygen, "_openssl_bin", lambda: "/usr/bin/openssl")
def fake_keychain_get(service):
assert service == "svc2"
return "from-keychain"
monkeypatch.setattr("certctl.storage.keychain_get", fake_keychain_get)
pem = keygen.generate_ec_key(curve="prime256v1", passphrase=None, keychain_service="svc2")
assert "ENCRYPTED" in pem or "ENCRYPTED PRIVATE KEY" in pem

View File

@@ -0,0 +1,43 @@
"""Tests verifying passphrase save-to-keychain behavior for keygen."""
from certctl import keygen
def test_save_rsa_passphrase_to_keychain(monkeypatch):
# stub openssl path to avoid requiring a specific binary
monkeypatch.setattr(keygen, "_openssl_bin", lambda: "/usr/bin/openssl")
called = {}
def fake_set(service, password, account="certctl"):
called['service'] = service
called['password'] = password
called['account'] = account
return True
monkeypatch.setattr("certctl.storage.keychain_set", fake_set)
# Provide passphrase and request save to keychain
pem = keygen.generate_rsa_key(bits=1024, passphrase="savetest", keychain_service="svc-save", save_to_keychain=True, keychain_account="acct1")
assert called['service'] == "svc-save"
assert called['password'] == "savetest"
assert called['account'] == "acct1"
def test_save_ec_passphrase_to_keychain(monkeypatch):
monkeypatch.setattr(keygen, "_openssl_bin", lambda: "/usr/bin/openssl")
called = {}
def fake_set(service, password, account="certctl"):
called['service'] = service
called['password'] = password
called['account'] = account
return True
monkeypatch.setattr("certctl.storage.keychain_set", fake_set)
pem = keygen.generate_ec_key(curve="prime256v1", passphrase="ecsave", keychain_service="svc-save-ec", save_to_keychain=True)
assert called['service'] == "svc-save-ec"
assert called['password'] == "ecsave"

View File

@@ -0,0 +1,16 @@
import pathlib
def test_no_direct_stderr_writes():
root = pathlib.Path(__file__).resolve().parents[1]
# Only check package sources and the top-level script(s); ignore virtualenv and tests
py_files = list((root / 'certctl').rglob('*.py'))
main_script = root / 'nscertkeycreate.py'
if main_script.exists():
py_files.append(main_script)
offending = []
for p in py_files:
txt = p.read_text()
if 'sys.stderr.write' in txt or 'file=sys.stderr' in txt:
offending.append(str(p))
assert not offending, f"Found direct stderr writes in: {offending}"

View File

@@ -0,0 +1,9 @@
"""Verify `bin/certctl` wrapper exists and is executable."""
import os
def test_nsctl_exists_and_executable():
path = os.path.join(os.path.dirname(__file__), "..", "bin", "certctl")
path = os.path.abspath(path)
assert os.path.exists(path)
assert os.access(path, os.X_OK)

View File

@@ -0,0 +1,7 @@
"""Test verifying `pyproject.toml` exposes `certctl` script entry."""
def test_pyproject_has_nsctl_entry():
content = open("pyproject.toml").read()
assert "certctl" in content
assert "certctl.cli:main" in content

View File

@@ -0,0 +1,31 @@
"""Tests for SAN validation and CLI rejection of invalid SANs."""
import pytest
from certctl.csr import normalize_and_validate_san
def test_normalize_dns():
assert normalize_and_validate_san("www.example.com") == "DNS:www.example.com"
assert normalize_and_validate_san("DNS:www.example.com") == "DNS:www.example.com"
def test_normalize_ip():
assert normalize_and_validate_san("10.2.3.4") == "IP:10.2.3.4"
assert normalize_and_validate_san("IP:10.2.3.4") == "IP:10.2.3.4"
def test_invalid_san_raises():
with pytest.raises(ValueError):
normalize_and_validate_san("*invalid_hostname*")
def test_cli_rejects_bad_san(tmp_path):
# Generate a key
from certctl import keygen, cli
key_pem = keygen.generate_private_key(kind="rsa", bits=1024)
key_file = tmp_path / "k.pem"
key_file.write_text(key_pem)
out = tmp_path / "req.csr"
rc = cli.main(["csr", "create", "--key-file", str(key_file), "--subject", "/C=US/CN=example.com", "--san", "bad@@@", "--out", str(out)])
assert rc != 0
assert not out.exists()

View File

@@ -0,0 +1,50 @@
"""Tests for macOS Keychain helpers in `certctl.storage`.
These tests stub out `subprocess.run` and `getpass.getpass` to simulate
macOS behavior.
"""
import subprocess
from certctl import storage
class DummyCompleted:
def __init__(self, out=b""):
self.stdout = out
def test_keychain_get_found(monkeypatch):
def fake_run(args, check, capture_output):
return DummyCompleted(out=b"s3cr3t\n")
monkeypatch.setattr(subprocess, "run", fake_run)
assert storage.keychain_get("svc-1") == "s3cr3t"
def test_keychain_get_not_found(monkeypatch):
def fake_run(args, check, capture_output):
raise subprocess.CalledProcessError(1, args)
monkeypatch.setattr(subprocess, "run", fake_run)
assert storage.keychain_get("svc-missing") is None
def test_get_or_prompt_passphrase_stores(monkeypatch):
# Simulate nothing in keychain, then interactive prompt, then store called
def fake_run_find(args, check, capture_output):
raise subprocess.CalledProcessError(1, args)
stored = {}
def fake_set(service, password, account="certctl"):
stored[service] = password
return True
monkeypatch.setattr(subprocess, "run", fake_run_find)
monkeypatch.setattr(storage, "keychain_set", fake_set)
monkeypatch.setattr("getpass.getpass", lambda prompt: "typed-pass")
val = storage.get_or_prompt_passphrase("myservice", prompt="enter:")
assert val == "typed-pass"
assert stored.get("myservice") == "typed-pass"

View File

@@ -0,0 +1,22 @@
def test_term_error_and_info_format(monkeypatch):
from certctl import term
class Fake:
def isatty(self):
return True
written = ''
def write(self, s):
Fake.written += s
fake = Fake()
# error
e = term.format_error('danger', stream=fake)
assert '\033[1;31m' in e and '\033[0m' in e
term.print_error('err', stream=fake)
assert '\033[1;31m' in Fake.written
# info
i = term.format_info('ok', stream=fake)
assert '\033[1;32m' in i and '\033[0m' in i
term.print_info('info', stream=fake)
assert '\033[1;32m' in Fake.written

View File

@@ -0,0 +1,25 @@
def test_term_format_and_print(monkeypatch):
import sys
from certctl import term
class Fake:
def isatty(self):
return False
def write(self, s):
Fake.written = getattr(Fake, 'written', '') + s
fake = Fake()
# plain
assert term.format_warning('w', stream=fake) == 'w'
term.print_warning('x', stream=fake)
assert 'x' in Fake.written
# TTY
class FakeTTY(Fake):
def isatty(self):
return True
fty = FakeTTY()
out = term.format_warning('z', stream=fty)
assert '\033[1;33m' in out and '\033[0m' in out
term.print_warning('y', stream=fty)
# ensure it wrote the ANSI code
assert '\033[1;33m' in Fake.written or '\033[1;33m' in fty.write('')

View File

@@ -0,0 +1,63 @@
"""Tests for certctl.utils.extract_pem_from_json"""
from certctl.utils import extract_pem_from_json
from certctl.netscaler import extract_csr_text
def test_extract_from_ns_ssl_csr():
resp = {
"ns_ssl_csr": [
{
"file_name": "app1.csr",
"csr": "-----BEGIN CERTIFICATE REQUEST-----\nMIIC123...\n-----END CERTIFICATE REQUEST-----",
"errorcode": 0,
}
],
"errorcode": 0,
}
pem = extract_pem_from_json(resp)
assert pem is not None
assert pem.startswith("-----BEGIN CERTIFICATE REQUEST-----")
# netscaler-specific helper should prefer the ns_ssl_csr field
pem2 = extract_csr_text(resp)
assert pem2 == pem
def test_extract_from_ns_command_stdout():
resp = {
"ns_command": {
"commands": [
{
"command": "shell cat /nsconfig/ssl/app1.csr",
"stdout": "-----BEGIN CERTIFICATE REQUEST-----\nMIIC_STDOUT...\n-----END CERTIFICATE REQUEST-----",
"response": None,
}
],
"errorcode": 0,
}
}
pem = extract_pem_from_json(resp)
assert pem is not None
assert "MIIC_STDOUT" in pem
def test_extract_from_rows_array():
resp = {
"ns_command": {
"commands": [
{"command": "cat ...", "rows": ["-----BEGIN CERTIFICATE REQUEST-----", "MIIC_ROWS...", "-----END CERTIFICATE REQUEST-----"]}
]
}
}
pem = extract_pem_from_json(resp)
assert pem is not None
assert "MIIC_ROWS" in pem
def test_no_pem_returns_none():
resp = {"foo": "bar", "nested": {"x": [1, 2, {"y": "nothing here"}]}}
assert extract_pem_from_json(resp) is None

View File

@@ -0,0 +1,38 @@
def test_format_warning_colored_and_plain(monkeypatch, capsys):
import sys
from certctl import utils
# Non-tty stream -> plain
class Fake:
def isatty(self):
return False
def write(self, s):
pass
fake = Fake()
out_plain = utils.format_warning("test", stream=fake)
assert out_plain == "test"
# TTY stream -> colored
class FakeTTY(Fake):
def isatty(self):
return True
fty = FakeTTY()
out_colored = utils.format_warning("test", stream=fty)
assert "\033[1;33mtest\033[0m" == out_colored
def test_print_warning_writes(monkeypatch, capsys):
import sys
from certctl import utils
class FakeTTY:
def isatty(self):
return True
written = ""
def write(self, s):
# accumulate
FakeTTY.written += s
f = FakeTTY()
utils.print_warning("hello", stream=f)
assert "hello" in FakeTTY.written
assert "\033[1;33m" in FakeTTY.written