201 lines
6.9 KiB
Python
201 lines
6.9 KiB
Python
"""NetScaler Console (ADM/Console) NITRO helpers.
|
|
|
|
Design goals:
|
|
- Log in ONCE to obtain a session token ("sessionid" in the login response).
|
|
- Use that token on subsequent requests (Cookie: NITRO_AUTH_TOKEN=...).
|
|
- Keep credentials out of git: caller can provide password via keychain/env.
|
|
|
|
Notes:
|
|
- In some Console builds, download endpoints authorize *only* via the cookie,
|
|
even if Basic auth / X-NITRO headers are supplied.
|
|
- Requests' cookie jar can contain multiple cookies with the same name but
|
|
different paths, leading to CookieConflictError. To avoid that entire class
|
|
of issues, we do not rely on the cookie jar; we store the token ourselves
|
|
and set the Cookie header explicitly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional
|
|
|
|
import requests
|
|
|
|
|
|
class NitroError(RuntimeError):
|
|
def __init__(self, status: int, message: str, payload: Any | None = None, headers: Dict[str, str] | None = None):
|
|
super().__init__(f"NITRO error (HTTP {status}): {message}")
|
|
self.status = status
|
|
self.message = message
|
|
self.payload = payload
|
|
self.headers = headers or {}
|
|
|
|
|
|
@dataclass
|
|
class ConsoleSession:
|
|
base_url: str
|
|
username: str
|
|
password: str
|
|
insecure: bool = False
|
|
timeout: int = 30
|
|
|
|
_token: Optional[str] = None
|
|
_http: Optional[requests.Session] = None
|
|
|
|
def _sess(self) -> requests.Session:
|
|
if self._http is None:
|
|
self._http = requests.Session()
|
|
return self._http
|
|
|
|
@property
|
|
def token(self) -> str:
|
|
if not self._token:
|
|
raise RuntimeError("Not logged in yet")
|
|
return self._token
|
|
|
|
def login(self) -> str:
|
|
"""Login and return the session token (NITRO_AUTH_TOKEN value)."""
|
|
url = f"{self.base_url.rstrip('/')}/nitro/v2/config/login"
|
|
payload = {"login": {"username": self.username, "password": self.password}}
|
|
|
|
r = requests.post(
|
|
url,
|
|
json=payload,
|
|
auth=(self.username, self.password),
|
|
verify=not self.insecure,
|
|
timeout=self.timeout,
|
|
)
|
|
|
|
try:
|
|
j = r.json()
|
|
except Exception:
|
|
raise NitroError(r.status_code, r.text)
|
|
|
|
if r.status_code >= 400 or (isinstance(j, dict) and j.get("errorcode") not in (None, 0)):
|
|
msg = j.get("message") if isinstance(j, dict) else str(j)
|
|
raise NitroError(r.status_code, msg or "Login failed", j)
|
|
|
|
try:
|
|
token = j["login"][0]["sessionid"]
|
|
except Exception:
|
|
raise NitroError(r.status_code, "Login response missing sessionid", j)
|
|
|
|
self._token = token
|
|
return token
|
|
|
|
# ---------------------- low-level HTTP helpers ----------------------
|
|
|
|
def _headers(self, extra: Optional[Dict[str, str]] = None, include_auth_headers: bool = True) -> Dict[str, str]:
|
|
h: Dict[str, str] = {}
|
|
if self._token:
|
|
h["Cookie"] = f"NITRO_AUTH_TOKEN={self._token}"
|
|
if include_auth_headers:
|
|
h["X-NITRO-USER"] = self.username
|
|
h["X-NITRO-PASS"] = self.password
|
|
if extra:
|
|
h.update(extra)
|
|
return h
|
|
|
|
def request(self, method: str, path: str, *, params: Dict[str, str] | None = None,
|
|
json_body: Any | None = None, data: Any | None = None,
|
|
headers: Dict[str, str] | None = None,
|
|
include_auth_headers: bool = True,
|
|
stream: bool = False) -> requests.Response:
|
|
if not self._token:
|
|
self.login()
|
|
|
|
url = f"{self.base_url.rstrip('/')}{path}"
|
|
r = self._sess().request(
|
|
method,
|
|
url,
|
|
params=params,
|
|
json=json_body,
|
|
data=data,
|
|
headers=self._headers(headers, include_auth_headers=include_auth_headers),
|
|
verify=not self.insecure,
|
|
timeout=self.timeout,
|
|
stream=stream,
|
|
)
|
|
return r
|
|
|
|
def _raise_for_nitro(self, r: requests.Response) -> None:
|
|
if r.status_code < 400:
|
|
return
|
|
# try to decode JSON error payload
|
|
msg = r.text
|
|
payload: Any = None
|
|
try:
|
|
payload = r.json()
|
|
if isinstance(payload, dict) and "message" in payload:
|
|
msg = payload.get("message") or msg
|
|
except Exception:
|
|
payload = None
|
|
raise NitroError(r.status_code, msg, payload, headers=dict(r.headers))
|
|
|
|
# ---------------------- Console operations we need ----------------------
|
|
|
|
def create_ssl_key(self, *, file_name: str, algo: str, keysize: int | None = None,
|
|
ec_curve: str | None = None, keyform: str = "PEM",
|
|
encrypt: bool = False, passphrase: str | None = None,
|
|
des_type: str = "AES256") -> Dict[str, Any]:
|
|
"""Create a private key file stored on Console.
|
|
|
|
This maps to the ns_ssl_key resource in Console NITRO v2.
|
|
We send form-urlencoded with an `object=` payload because Console
|
|
rejects raw JSON for some POSTs.
|
|
"""
|
|
obj: Dict[str, Any] = {
|
|
"file_name": file_name,
|
|
"keyform": keyform,
|
|
"algo": algo,
|
|
}
|
|
if keysize is not None:
|
|
obj["keysize"] = str(keysize)
|
|
if ec_curve:
|
|
obj["ec_curve"] = ec_curve
|
|
if encrypt:
|
|
# Console uses des_type naming even for AES in some builds
|
|
obj["des_type"] = des_type
|
|
if passphrase:
|
|
obj["passphrase"] = passphrase
|
|
|
|
payload = {"ns_ssl_key": obj}
|
|
data = {"object": json.dumps(payload)}
|
|
|
|
r = self.request(
|
|
"POST",
|
|
"/nitro/v2/config/ns_ssl_key",
|
|
params={"action": "create"},
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
self._raise_for_nitro(r)
|
|
return r.json()
|
|
|
|
def download_file(self, resource: str, file_name: str) -> bytes:
|
|
"""Download a stored file via /nitro/v2/download/<resource>/<file_name>."""
|
|
# For download endpoints, some builds want ONLY the cookie.
|
|
r = self.request(
|
|
"GET",
|
|
f"/nitro/v2/download/{resource}/{file_name}",
|
|
include_auth_headers=False,
|
|
stream=True,
|
|
)
|
|
self._raise_for_nitro(r)
|
|
return r.content
|
|
|
|
def list_ssl_certkeys(self, *, attrs: str | None = None) -> Dict[str, Any]:
|
|
"""List certificate-key objects stored on Console.
|
|
|
|
This uses the Console NITRO resource `ns_ssl_certkey`.
|
|
The response includes expiry-related fields such as `valid_to`
|
|
and `days_to_expiry` (stringified).
|
|
"""
|
|
params: Dict[str, str] = {}
|
|
if attrs:
|
|
params["attrs"] = attrs
|
|
r = self.request("GET", "/nitro/v2/config/ns_ssl_certkey", params=params)
|
|
self._raise_for_nitro(r)
|
|
return r.json()
|