Files
nscertkeycreate/certctl/scripts/imperva_deploy.py
deamonkai fc94008530 initial
2026-01-23 12:11:21 -06:00

240 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""Deploy a Console certificate to Imperva WAF using a combined PEM."""
from __future__ import annotations
import argparse
import base64
import getpass
import json
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from certctl.console import NitroConsoleClient
def _parse_cn(subject: str) -> Optional[str]:
if not subject:
return None
match = re.search(r"(?:^|[,/\\s])CN\\s*=\\s*([^,/]+)", subject)
if match:
return match.group(1).strip()
return None
def _normalize_certkey_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9]+", "_", name or "").strip("_")
def _load_password(username: str, console: str, provided: Optional[str]) -> str:
if provided:
return provided
env_pw = os.environ.get("CERTCTL_CONSOLE_PASSWORD")
if env_pw:
return env_pw
return getpass.getpass(f"Console password for {username}@{console}: ")
def _get_key_passphrase(provided: Optional[str]) -> str:
if provided:
return provided
env_pw = os.environ.get("CERTCTL_KEY_PASSPHRASE")
if env_pw:
return env_pw
return getpass.getpass("Key passphrase (AES-256): ")
def _list_certkeys(client: NitroConsoleClient) -> List[Dict[str, Any]]:
payload = client.get_json("/nitro/v2/config/ns_ssl_certkey", params={"pagesize": "200"})
items = payload.get("ns_ssl_certkey", [])
if isinstance(items, dict):
return [items]
if isinstance(items, list):
return [item for item in items if isinstance(item, dict)]
return []
def _find_certkey(client: NitroConsoleClient, name: str) -> Dict[str, Any]:
payload = client.get_json(
"/nitro/v2/config/ns_ssl_certkey",
params={"filter": f"certkeypair_name:{name}"},
)
items = payload.get("ns_ssl_certkey", [])
if isinstance(items, dict):
return items
if isinstance(items, list) and items:
return items[0]
normalized = _normalize_certkey_name(name)
if normalized and normalized != name:
payload = client.get_json(
"/nitro/v2/config/ns_ssl_certkey",
params={"filter": f"certkeypair_name:{normalized}"},
)
items = payload.get("ns_ssl_certkey", [])
if isinstance(items, dict):
return items
if isinstance(items, list) and items:
return items[0]
candidates = _list_certkeys(client)
matches = []
for cert in candidates:
subject = str(cert.get("subject") or cert.get("certificate_dn") or "")
cn = _parse_cn(subject)
if cn and cn.lower() == name.lower():
matches.append(cert)
if len(matches) == 1:
return matches[0]
if matches:
names = ", ".join(sorted({m.get("certkeypair_name", "unknown") for m in matches}))
raise SystemExit(f"Multiple certkeypairs match CN {name}: {names}")
raise SystemExit(f"Certkeypair not found: {name}")
def _first_value(cert: Dict[str, Any], keys: List[str]) -> Optional[str]:
for key in keys:
value = cert.get(key)
if value:
return str(value)
return None
def _extract_chain_files(cert: Dict[str, Any]) -> List[str]:
chain = cert.get("certkeychain") or []
if isinstance(chain, dict):
chain = [chain]
names = []
for entry in chain:
if not isinstance(entry, dict):
continue
name = entry.get("certificate_file_name") or entry.get("cert_name") or entry.get("linked_to")
if name and name not in names:
names.append(str(name))
return names
def _auth_type(cert: Dict[str, Any]) -> str:
algo = str(cert.get("public_key_algorithm") or cert.get("signature_algorithm") or "").upper()
if "EC" in algo or "ECDSA" in algo:
return "ECC"
return "RSA"
def _read_pem(content: bytes) -> str:
text = content.decode("utf-8", errors="replace")
return text if text.endswith("\n") else text + "\n"
def _b64(content: str) -> str:
return base64.b64encode(content.encode("utf-8")).decode("ascii")
def run(args: argparse.Namespace) -> int:
if not args.console or not args.user:
raise SystemExit("Console URL and user are required.")
if not args.site_id:
raise SystemExit("Imperva site id is required (--site-id).")
verify: object
if args.insecure:
verify = False
elif args.ca_bundle:
verify = args.ca_bundle
else:
verify = True
password = _load_password(args.user, args.console, args.password)
client = NitroConsoleClient(base=args.console, verify=verify, timeout=args.timeout)
client.login(args.user, password)
cert = _find_certkey(client, args.certkeypair)
cert_file = _first_value(cert, ["certificate_file_name", "ssl_certificate", "file_name"])
key_file = _first_value(cert, ["key_file_name", "ssl_key", "key_name"])
if not cert_file:
raise SystemExit("Console certkey did not include certificate_file_name.")
if not key_file:
raise SystemExit("Console certkey did not include key_file_name.")
chain_files = _extract_chain_files(cert)
print(f"Using certificate: {cert_file}")
print(f"Using key: {key_file}")
if chain_files:
print(f"Using CA chain files: {', '.join(chain_files)}")
else:
print("No CA chain metadata found; Imperva upload will include only the leaf cert.")
cert_pem = _read_pem(client.download_file("ns_ssl_cert", cert_file))
key_pem = _read_pem(client.download_file("ns_ssl_key", key_file))
chain_pems = []
for name in chain_files:
chain_pems.append(_read_pem(client.download_file("ns_ssl_cert", name)))
full_chain = cert_pem + "".join(chain_pems)
key_passphrase = _get_key_passphrase(args.key_passphrase)
api_id = args.api_id or os.environ.get("IMPERVA_API_ID")
api_key = args.api_key or os.environ.get("IMPERVA_API_KEY")
if not api_id:
api_id = input("Imperva API Id: ")
if not api_key:
api_key = getpass.getpass("Imperva API Key: ")
payload = {
"certificate": _b64(full_chain),
"private_key": _b64(key_pem),
"auth_type": _auth_type(cert),
}
if key_passphrase:
payload["passphrase"] = key_passphrase
base_url = args.imperva_base.rstrip("/")
url = f"{base_url}/api/prov/v2/sites/{args.site_id}/customCertificate"
headers = {
"x-API-Id": api_id,
"x-API-Key": api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
resp = requests.put(url, headers=headers, json=payload, timeout=args.imperva_timeout)
if resp.status_code >= 400:
raise SystemExit(f"Imperva upload failed ({resp.status_code}): {resp.text}")
try:
data = resp.json()
except Exception:
data = {"raw": resp.text}
print(json.dumps(data, indent=2))
return 0
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Deploy a Console certificate to Imperva (combined PEM is built automatically)."
)
parser.add_argument("--certkeypair", required=True, help="Console certkeypair name or CN")
parser.add_argument("--console", help="Console base URL, e.g. https://console")
parser.add_argument("--user", help="Console username")
parser.add_argument("--password", help="Console password (or set CERTCTL_CONSOLE_PASSWORD)")
parser.add_argument("--insecure", action="store_true", help="Disable TLS verification for Console")
parser.add_argument("--ca-bundle", help="Path to CA bundle for Console TLS verification")
parser.add_argument("--timeout", type=int, default=60, help="Console HTTP timeout in seconds")
parser.add_argument("--key-passphrase", help="Passphrase for the private key (or CERTCTL_KEY_PASSPHRASE)")
parser.add_argument("--imperva-base", default="https://my.impervaservices.com", help="Imperva API base URL")
parser.add_argument("--imperva-timeout", type=int, default=60, help="Imperva HTTP timeout in seconds")
parser.add_argument("--site-id", required=True, help="Imperva site ID")
parser.add_argument("--api-id", help="Imperva API ID (or IMPERVA_API_ID)")
parser.add_argument("--api-key", help="Imperva API key (or IMPERVA_API_KEY)")
return parser
def main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
raise SystemExit(run(args))
if __name__ == "__main__":
main()