209 lines
7.1 KiB
Python
209 lines
7.1 KiB
Python
"""NetScaler Console NITRO API helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional, Union
|
|
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
|
|
VerifyType = Union[bool, str]
|
|
|
|
|
|
class NitroError(RuntimeError):
|
|
def __init__(self, status_code: int, message: str, payload: Any = None, headers: Any = None):
|
|
super().__init__(f"NITRO error (HTTP {status_code}): {message}")
|
|
self.status_code = status_code
|
|
self.message = message
|
|
self.payload = payload
|
|
self.headers = headers
|
|
|
|
|
|
@dataclass
|
|
class NitroConsoleClient:
|
|
base: str
|
|
verify: VerifyType
|
|
token: Optional[str] = None
|
|
timeout: int = 60
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.verify is False:
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # type: ignore[attr-defined]
|
|
|
|
def _url(self, path: str) -> str:
|
|
return self.base.rstrip("/") + path
|
|
|
|
def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
|
headers: Dict[str, str] = {"Accept": "*/*"}
|
|
if self.token:
|
|
headers["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
|
|
if extra:
|
|
headers.update(extra)
|
|
return headers
|
|
|
|
def login(self, username: str, password: str) -> str:
|
|
url = self._url("/nitro/v2/config/login")
|
|
payload = {"login": {"username": username, "password": password}}
|
|
resp = requests.post(
|
|
url,
|
|
headers={"Content-Type": "application/json", "Accept": "application/json"},
|
|
json=payload,
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
data = self._parse_json(resp)
|
|
try:
|
|
session_id = data["login"][0]["sessionid"]
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
raise NitroError(resp.status_code, "Login response did not include sessionid", data) from exc
|
|
if not session_id:
|
|
raise NitroError(resp.status_code, "Login returned empty sessionid", data)
|
|
self.token = session_id
|
|
return session_id
|
|
|
|
def _parse_json(self, resp: requests.Response) -> Dict[str, Any]:
|
|
headers = dict(resp.headers)
|
|
try:
|
|
data = resp.json()
|
|
except Exception:
|
|
if resp.status_code >= 400:
|
|
raise NitroError(resp.status_code, resp.text, headers=headers)
|
|
raise NitroError(resp.status_code, "Expected JSON but got non-JSON response", headers=headers)
|
|
|
|
if resp.status_code >= 400:
|
|
msg = data.get("message") if isinstance(data, dict) else resp.text
|
|
raise NitroError(resp.status_code, str(msg), data, headers=headers)
|
|
|
|
if isinstance(data, dict):
|
|
err = data.get("errorcode")
|
|
if err not in (0, "0", None):
|
|
msg = data.get("message", "Unknown NITRO error")
|
|
raise NitroError(resp.status_code, str(msg), data, headers=headers)
|
|
|
|
return data
|
|
|
|
def get_json(self, path: str, *, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
|
url = self._url(path)
|
|
resp = requests.get(
|
|
url,
|
|
headers=self._headers({"Accept": "application/json"}),
|
|
params=params,
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
return self._parse_json(resp)
|
|
|
|
def post_json(
|
|
self,
|
|
path: str,
|
|
payload: Dict[str, Any],
|
|
*,
|
|
params: Optional[Dict[str, str]] = None,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
use_cookie: bool = True,
|
|
) -> Dict[str, Any]:
|
|
url = self._url(path)
|
|
base_headers: Dict[str, str] = {"Content-Type": "application/json", "Accept": "application/json"}
|
|
if use_cookie and self.token:
|
|
base_headers["Cookie"] = f"NITRO_AUTH_TOKEN={self.token}"
|
|
if headers:
|
|
base_headers.update(headers)
|
|
resp = requests.post(
|
|
url,
|
|
headers=base_headers,
|
|
params=params,
|
|
json=payload,
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
return self._parse_json(resp)
|
|
|
|
def put_json(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = self._url(path)
|
|
resp = requests.put(
|
|
url,
|
|
headers=self._headers({"Content-Type": "application/json", "Accept": "application/json"}),
|
|
json=payload,
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
return self._parse_json(resp)
|
|
|
|
def upload_file(
|
|
self,
|
|
path: str,
|
|
file_path: str,
|
|
*,
|
|
basic_user: Optional[str] = None,
|
|
basic_password: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
url = self._url(path)
|
|
headers = self._headers()
|
|
if basic_user and basic_password:
|
|
headers["X-NITRO-USER"] = basic_user
|
|
headers["X-NITRO-PASS"] = basic_password
|
|
with open(file_path, "rb") as handle:
|
|
resp = requests.post(
|
|
url,
|
|
headers=headers,
|
|
files={"file": handle},
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
return self._parse_json(resp)
|
|
|
|
def download_file(self, resource: str, file_name: str) -> bytes:
|
|
url = self._url(f"/nitro/v2/download/{resource}/{file_name}")
|
|
resp = requests.get(
|
|
url,
|
|
headers=self._headers(),
|
|
verify=self.verify,
|
|
timeout=self.timeout,
|
|
)
|
|
if resp.status_code >= 400:
|
|
raise NitroError(resp.status_code, resp.text, headers=dict(resp.headers))
|
|
return resp.content
|
|
|
|
def get_system_settings(self) -> Dict[str, Any]:
|
|
data = self.get_json("/nitro/v2/config/system_settings")
|
|
settings = data.get("system_settings")
|
|
if isinstance(settings, list) and settings:
|
|
return settings[0]
|
|
if isinstance(settings, dict):
|
|
return settings
|
|
return {}
|
|
|
|
def set_basicauth(self, enabled: bool) -> None:
|
|
settings = self.get_system_settings()
|
|
settings_id = settings.get("id")
|
|
if not settings_id:
|
|
raise NitroError(400, "system_settings id not found")
|
|
payload = {"system_settings": {"basicauth": bool(enabled)}}
|
|
self.put_json(f"/nitro/v2/config/system_settings/{settings_id}", payload)
|
|
|
|
def create_key(
|
|
self,
|
|
key_name: str,
|
|
*,
|
|
algo: str,
|
|
keyform: str = "PEM",
|
|
keysize: Optional[int] = None,
|
|
ec_curve: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
file_location_path: str = "",
|
|
) -> Dict[str, Any]:
|
|
obj: Dict[str, Any] = {
|
|
"file_name": key_name,
|
|
"keyform": keyform,
|
|
"algo": algo,
|
|
"file_location_path": file_location_path,
|
|
}
|
|
if keysize is not None:
|
|
obj["keysize"] = int(keysize)
|
|
if ec_curve:
|
|
obj["ec_curve"] = ec_curve
|
|
if password:
|
|
obj["password"] = password
|
|
return self.post_json("/nitro/v2/config/ns_ssl_key", {"ns_ssl_key": obj}, params={"action": "create"})
|