Files
nscertkeycreate/certctl/console.py
deamonkai fc94008530 initial
2026-01-23 12:11:21 -06:00

209 lines
7.1 KiB
Python

"""NetScaler Console NITRO API helpers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union
import requests
from urllib3.exceptions import InsecureRequestWarning
VerifyType = Union[bool, str]
class NitroError(RuntimeError):
def __init__(self, status_code: int, message: str, payload: Any = None, headers: Any = None):
super().__init__(f"NITRO error (HTTP {status_code}): {message}")
self.status_code = status_code
self.message = message
self.payload = payload
self.headers = headers
@dataclass
class NitroConsoleClient:
base: str
verify: VerifyType
token: Optional[str] = None
timeout: int = 60
def __post_init__(self) -> None:
if self.verify is False:
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # type: ignore[attr-defined]
def _url(self, path: str) -> str:
return self.base.rstrip("/") + path
def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
headers: Dict[str, str] = {"Accept": "*/*"}
if self.token:
headers["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
if extra:
headers.update(extra)
return headers
def login(self, username: str, password: str) -> str:
url = self._url("/nitro/v2/config/login")
payload = {"login": {"username": username, "password": password}}
resp = requests.post(
url,
headers={"Content-Type": "application/json", "Accept": "application/json"},
json=payload,
verify=self.verify,
timeout=self.timeout,
)
data = self._parse_json(resp)
try:
session_id = data["login"][0]["sessionid"]
except Exception as exc: # pragma: no cover - defensive
raise NitroError(resp.status_code, "Login response did not include sessionid", data) from exc
if not session_id:
raise NitroError(resp.status_code, "Login returned empty sessionid", data)
self.token = session_id
return session_id
def _parse_json(self, resp: requests.Response) -> Dict[str, Any]:
headers = dict(resp.headers)
try:
data = resp.json()
except Exception:
if resp.status_code >= 400:
raise NitroError(resp.status_code, resp.text, headers=headers)
raise NitroError(resp.status_code, "Expected JSON but got non-JSON response", headers=headers)
if resp.status_code >= 400:
msg = data.get("message") if isinstance(data, dict) else resp.text
raise NitroError(resp.status_code, str(msg), data, headers=headers)
if isinstance(data, dict):
err = data.get("errorcode")
if err not in (0, "0", None):
msg = data.get("message", "Unknown NITRO error")
raise NitroError(resp.status_code, str(msg), data, headers=headers)
return data
def get_json(self, path: str, *, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
url = self._url(path)
resp = requests.get(
url,
headers=self._headers({"Accept": "application/json"}),
params=params,
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(resp)
def post_json(
self,
path: str,
payload: Dict[str, Any],
*,
params: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
use_cookie: bool = True,
) -> Dict[str, Any]:
url = self._url(path)
base_headers: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"}
if use_cookie and self.token:
base_headers["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
if headers:
base_headers.update(headers)
resp = requests.post(
url,
headers=base_headers,
params=params,
json=payload,
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(resp)
def put_json(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = self._url(path)
resp = requests.put(
url,
headers=self._headers({"Content-Type": "application/json", "Accept": "application/json"}),
json=payload,
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(resp)
def upload_file(
self,
path: str,
file_path: str,
*,
basic_user: Optional[str] = None,
basic_password: Optional[str] = None,
) -> Dict[str, Any]:
url = self._url(path)
headers = self._headers()
if basic_user and basic_password:
headers["X-NITRO-USER"] = basic_user
headers["X-NITRO-PASS"] = basic_password
with open(file_path, "rb") as handle:
resp = requests.post(
url,
headers=headers,
files={"file": handle},
verify=self.verify,
timeout=self.timeout,
)
return self._parse_json(resp)
def download_file(self, resource: str, file_name: str) -> bytes:
url = self._url(f"/nitro/v2/download/{resource}/{file_name}")
resp = requests.get(
url,
headers=self._headers(),
verify=self.verify,
timeout=self.timeout,
)
if resp.status_code >= 400:
raise NitroError(resp.status_code, resp.text, headers=dict(resp.headers))
return resp.content
def get_system_settings(self) -> Dict[str, Any]:
data = self.get_json("/nitro/v2/config/system_settings")
settings = data.get("system_settings")
if isinstance(settings, list) and settings:
return settings[0]
if isinstance(settings, dict):
return settings
return {}
def set_basicauth(self, enabled: bool) -> None:
settings = self.get_system_settings()
settings_id = settings.get("id")
if not settings_id:
raise NitroError(400, "system_settings id not found")
payload = {"system_settings": {"basicauth": bool(enabled)}}
self.put_json(f"/nitro/v2/config/system_settings/{settings_id}", payload)
def create_key(
self,
key_name: str,
*,
algo: str,
keyform: str = "PEM",
keysize: Optional[int] = None,
ec_curve: Optional[str] = None,
password: Optional[str] = None,
file_location_path: str = "",
) -> Dict[str, Any]:
obj: Dict[str, Any] = {
"file_name": key_name,
"keyform": keyform,
"algo": algo,
"file_location_path": file_location_path,
}
if keysize is not None:
obj["keysize"] = int(keysize)
if ec_curve:
obj["ec_curve"] = ec_curve
if password:
obj["password"] = password
return self.post_json("/nitro/v2/config/ns_ssl_key", {"ns_ssl_key": obj}, params={"action": "create"})