blob: 04dce169ab9bf4fec4a534c55f8da047021fc89b [file] [log] [blame]
#!/usr/bin/env python3
"""
import-c2-groups.py – Import c2-grps group definitions into Kustvakt.
Input file format (one entry per line):
G_groupName=username1,username2,username3
Lines starting with '#' or blank lines are ignored.
The 'G_' prefix is stripped from group names before importing.
The script obtains a Bearer token via the OAuth2 password grant and calls:
PUT <base-url>/<api-version>/group/@<groupName>
PUT <base-url>/<api-version>/group/@<groupName>/member
Usage:
python3 import-c2-groups.py --file data/c2-grps.txt \\
--url http://localhost:8080/api \\
--user admin --password pass \\
--client-id <client_id> --client-secret <client_secret>
Run with --help for all options.
"""
import argparse
import json
import logging
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def fetch_bearer_token(
base_url: str,
api_version: str,
user: str,
password: str,
client_id: str,
client_secret: str,
) -> str:
"""Obtain an OAuth2 Bearer token via the resource-owner password grant.
Returns the raw access-token string.
Raises SystemExit on failure.
"""
token_url = f"{base_url}/{api_version}/oauth2/token"
body = urllib.parse.urlencode({
"grant_type": "password",
"username": user,
"password": password,
"client_id": client_id,
"client_secret": client_secret,
}).encode()
req = urllib.request.Request(
token_url,
data=body,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
log.error("Token request failed (HTTP %s): %s",
exc.code, exc.read().decode(errors="replace"))
sys.exit(1)
except urllib.error.URLError as exc:
log.error("Token request network error: %s", exc.reason)
sys.exit(1)
token = data.get("access_token")
if not token:
log.error("No access_token in response: %s", data)
sys.exit(1)
log.debug("Bearer token obtained successfully.")
return token
def _put(url: str, form: dict, auth_header: str, dry_run: bool) -> int:
"""Send a PUT request with application/x-www-form-urlencoded body.
Returns the HTTP status code, or 0 on network/URL error.
"""
body = urllib.parse.urlencode(form).encode()
req = urllib.request.Request(
url,
data=body,
method="PUT",
headers={
"Authorization": auth_header,
"Content-Type": "application/x-www-form-urlencoded",
},
)
if dry_run:
log.info("[DRY-RUN] PUT %s body=%s", url, form)
return 201
try:
with urllib.request.urlopen(req) as resp:
return resp.status
except urllib.error.HTTPError as exc:
body_text = exc.read().decode(errors="replace")
log.error("HTTP %s for %s – %s", exc.code, url, body_text)
return exc.code
except urllib.error.URLError as exc:
log.error("Network error for %s – %s", url, exc.reason)
return 0
# ---------------------------------------------------------------------------
# Parsing
# ---------------------------------------------------------------------------
def parse_groups(path: Path) -> list[tuple[str, list[str]]]:
"""Parse the c2-grps file and return [(groupName, [member, ...]), ...]."""
groups = []
for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
log.warning("Line %d: no '=' found, skipping: %s", lineno, line)
continue
left, _, right = line.partition("=")
raw_name = left.strip()
group_name = raw_name[2:] if raw_name.startswith("G_") else raw_name
members = [m.strip() for m in right.split(",") if m.strip()]
# deduplicate while preserving order
seen: set[str] = set()
unique_members: list[str] = []
for m in members:
if m not in seen:
seen.add(m)
unique_members.append(m)
groups.append((group_name, unique_members))
return groups
# ---------------------------------------------------------------------------
# Import
# ---------------------------------------------------------------------------
def import_groups(
groups: list[tuple[str, list[str]]],
base_url: str,
api_version: str,
auth_header: str,
add_members: bool,
dry_run: bool,
) -> None:
created = updated = member_ok = member_err = 0
for group_name, members in groups:
group_url = f"{base_url}/{api_version}/group/@{group_name}"
# --- create / update group ---
status = _put(group_url, {}, auth_header, dry_run)
if status == 201:
log.info("Created group '%s'", group_name)
created += 1
elif status == 204:
log.info("Exists group '%s' (no-op)", group_name)
updated += 1
else:
log.error("Failed to create/update group '%s' (HTTP %s)", group_name, status)
continue
# --- add members ---
if add_members and members:
member_url = f"{group_url}/member"
status = _put(
member_url,
{"members": ",".join(members)},
auth_header,
dry_run,
)
if status in (200, 201, 204):
log.info(" Added %d member(s) to '%s'", len(members), group_name)
member_ok += 1
else:
log.error(
" Failed to add members to '%s' (HTTP %s)", group_name, status
)
member_err += 1
elif add_members and not members:
log.debug(" No members listed for '%s'", group_name)
log.info(
"Done: %d created, %d already existed, "
"%d member imports OK, %d member imports failed.",
created, updated, member_ok, member_err,
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--file",
default="data/c2-grps.txt",
metavar="PATH",
help="Path to the c2-grps input file (default: data/c2-grps.txt)",
)
p.add_argument(
"--url",
default="http://localhost:8080/api",
metavar="URL",
help="Kustvakt base URL, without trailing slash "
"(default: http://localhost:8080/api)",
)
p.add_argument(
"--api-version",
default="v1.0",
metavar="VER",
help="API version segment, e.g. v1.0 or v1.1 (default: v1.0)",
)
p.add_argument(
"--user",
default="admin",
metavar="USERNAME",
help="Resource-owner username (default: admin)",
)
p.add_argument(
"--password",
default=None,
metavar="PASSWORD",
help="Resource-owner password (prompted if omitted)",
)
p.add_argument(
"--client-id",
required=True,
metavar="CLIENT_ID",
help="OAuth2 client_id for the password grant",
)
p.add_argument(
"--client-secret",
default="",
metavar="CLIENT_SECRET",
help="OAuth2 client_secret (default: empty, for public clients)",
)
p.add_argument(
"--skip-members",
action="store_true",
help="Create groups only, do not add members",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done without sending any requests",
)
p.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging",
)
return p
def main() -> None:
args = build_parser().parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
path = Path(args.file)
if not path.exists():
log.error("Input file not found: %s", path)
sys.exit(1)
base_url = args.url.rstrip("/")
if args.dry_run:
auth_header = "Bearer <dry-run>"
else:
password = args.password
if password is None:
import getpass
password = getpass.getpass(f"Password for '{args.user}': ")
token = fetch_bearer_token(
base_url=base_url,
api_version=args.api_version,
user=args.user,
password=password,
client_id=args.client_id,
client_secret=args.client_secret,
)
auth_header = f"Bearer {token}"
groups = parse_groups(path)
log.info("Parsed %d group(s) from %s", len(groups), path)
if not groups:
log.warning("No groups found – nothing to do.")
return
import_groups(
groups=groups,
base_url=base_url,
api_version=args.api_version,
auth_header=auth_header,
add_members=not args.skip_members,
dry_run=args.dry_run,
)
if __name__ == "__main__":
main()