| #!/usr/bin/env python3 |
| """ |
| import-c2-groups.py – Import c2-grps group definitions into Kustvakt. |
| |
| Input file format (one entry per line): |
| G_groupName=username1,username2,username3 |
| |
| Lines starting with '#' or blank lines are ignored. |
| The 'G_' prefix is stripped from group names before importing. |
| |
| The script obtains a Bearer token via the OAuth2 password grant and calls: |
| PUT <base-url>/<api-version>/group/@<groupName> |
| PUT <base-url>/<api-version>/group/@<groupName>/member |
| |
| Usage: |
| python3 import-c2-groups.py --file data/c2-grps.txt \\ |
| --url http://localhost:8080/api \\ |
| --user admin --password pass \\ |
| --client-id <client_id> --client-secret <client_secret> |
| |
| Run with --help for all options. |
| """ |
| |
| import argparse |
| import json |
| import logging |
| import sys |
| import urllib.error |
| import urllib.parse |
| import urllib.request |
| from pathlib import Path |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(levelname)s %(message)s", |
| ) |
| log = logging.getLogger(__name__) |
| |
| |
| # --------------------------------------------------------------------------- |
| # HTTP helpers |
| # --------------------------------------------------------------------------- |
| |
| def fetch_bearer_token( |
| base_url: str, |
| api_version: str, |
| user: str, |
| password: str, |
| client_id: str, |
| client_secret: str, |
| ) -> str: |
| """Obtain an OAuth2 Bearer token via the resource-owner password grant. |
| |
| Returns the raw access-token string. |
| Raises SystemExit on failure. |
| """ |
| token_url = f"{base_url}/{api_version}/oauth2/token" |
| body = urllib.parse.urlencode({ |
| "grant_type": "password", |
| "username": user, |
| "password": password, |
| "client_id": client_id, |
| "client_secret": client_secret, |
| }).encode() |
| |
| req = urllib.request.Request( |
| token_url, |
| data=body, |
| method="POST", |
| headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| ) |
| try: |
| with urllib.request.urlopen(req) as resp: |
| data = json.loads(resp.read().decode()) |
| except urllib.error.HTTPError as exc: |
| log.error("Token request failed (HTTP %s): %s", |
| exc.code, exc.read().decode(errors="replace")) |
| sys.exit(1) |
| except urllib.error.URLError as exc: |
| log.error("Token request network error: %s", exc.reason) |
| sys.exit(1) |
| |
| token = data.get("access_token") |
| if not token: |
| log.error("No access_token in response: %s", data) |
| sys.exit(1) |
| |
| log.debug("Bearer token obtained successfully.") |
| return token |
| |
| |
| def _put(url: str, form: dict, auth_header: str, dry_run: bool) -> int: |
| """Send a PUT request with application/x-www-form-urlencoded body. |
| |
| Returns the HTTP status code, or 0 on network/URL error. |
| """ |
| body = urllib.parse.urlencode(form).encode() |
| req = urllib.request.Request( |
| url, |
| data=body, |
| method="PUT", |
| headers={ |
| "Authorization": auth_header, |
| "Content-Type": "application/x-www-form-urlencoded", |
| }, |
| ) |
| if dry_run: |
| log.info("[DRY-RUN] PUT %s body=%s", url, form) |
| return 201 |
| |
| try: |
| with urllib.request.urlopen(req) as resp: |
| return resp.status |
| except urllib.error.HTTPError as exc: |
| body_text = exc.read().decode(errors="replace") |
| log.error("HTTP %s for %s – %s", exc.code, url, body_text) |
| return exc.code |
| except urllib.error.URLError as exc: |
| log.error("Network error for %s – %s", url, exc.reason) |
| return 0 |
| |
| |
| # --------------------------------------------------------------------------- |
| # Parsing |
| # --------------------------------------------------------------------------- |
| |
| def parse_groups(path: Path) -> list[tuple[str, list[str]]]: |
| """Parse the c2-grps file and return [(groupName, [member, ...]), ...].""" |
| groups = [] |
| for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): |
| line = raw.strip() |
| if not line or line.startswith("#"): |
| continue |
| if "=" not in line: |
| log.warning("Line %d: no '=' found, skipping: %s", lineno, line) |
| continue |
| left, _, right = line.partition("=") |
| raw_name = left.strip() |
| group_name = raw_name[2:] if raw_name.startswith("G_") else raw_name |
| members = [m.strip() for m in right.split(",") if m.strip()] |
| # deduplicate while preserving order |
| seen: set[str] = set() |
| unique_members: list[str] = [] |
| for m in members: |
| if m not in seen: |
| seen.add(m) |
| unique_members.append(m) |
| groups.append((group_name, unique_members)) |
| return groups |
| |
| |
| # --------------------------------------------------------------------------- |
| # Import |
| # --------------------------------------------------------------------------- |
| |
| def import_groups( |
| groups: list[tuple[str, list[str]]], |
| base_url: str, |
| api_version: str, |
| auth_header: str, |
| add_members: bool, |
| dry_run: bool, |
| ) -> None: |
| created = updated = member_ok = member_err = 0 |
| |
| for group_name, members in groups: |
| group_url = f"{base_url}/{api_version}/group/@{group_name}" |
| |
| # --- create / update group --- |
| status = _put(group_url, {}, auth_header, dry_run) |
| if status == 201: |
| log.info("Created group '%s'", group_name) |
| created += 1 |
| elif status == 204: |
| log.info("Exists group '%s' (no-op)", group_name) |
| updated += 1 |
| else: |
| log.error("Failed to create/update group '%s' (HTTP %s)", group_name, status) |
| continue |
| |
| # --- add members --- |
| if add_members and members: |
| member_url = f"{group_url}/member" |
| status = _put( |
| member_url, |
| {"members": ",".join(members)}, |
| auth_header, |
| dry_run, |
| ) |
| if status in (200, 201, 204): |
| log.info(" Added %d member(s) to '%s'", len(members), group_name) |
| member_ok += 1 |
| else: |
| log.error( |
| " Failed to add members to '%s' (HTTP %s)", group_name, status |
| ) |
| member_err += 1 |
| elif add_members and not members: |
| log.debug(" No members listed for '%s'", group_name) |
| |
| log.info( |
| "Done: %d created, %d already existed, " |
| "%d member imports OK, %d member imports failed.", |
| created, updated, member_ok, member_err, |
| ) |
| |
| |
| # --------------------------------------------------------------------------- |
| # CLI |
| # --------------------------------------------------------------------------- |
| |
| def build_parser() -> argparse.ArgumentParser: |
| p = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| p.add_argument( |
| "--file", |
| default="data/c2-grps.txt", |
| metavar="PATH", |
| help="Path to the c2-grps input file (default: data/c2-grps.txt)", |
| ) |
| p.add_argument( |
| "--url", |
| default="http://localhost:8080/api", |
| metavar="URL", |
| help="Kustvakt base URL, without trailing slash " |
| "(default: http://localhost:8080/api)", |
| ) |
| p.add_argument( |
| "--api-version", |
| default="v1.0", |
| metavar="VER", |
| help="API version segment, e.g. v1.0 or v1.1 (default: v1.0)", |
| ) |
| p.add_argument( |
| "--user", |
| default="admin", |
| metavar="USERNAME", |
| help="Resource-owner username (default: admin)", |
| ) |
| p.add_argument( |
| "--password", |
| default=None, |
| metavar="PASSWORD", |
| help="Resource-owner password (prompted if omitted)", |
| ) |
| p.add_argument( |
| "--client-id", |
| required=True, |
| metavar="CLIENT_ID", |
| help="OAuth2 client_id for the password grant", |
| ) |
| p.add_argument( |
| "--client-secret", |
| default="", |
| metavar="CLIENT_SECRET", |
| help="OAuth2 client_secret (default: empty, for public clients)", |
| ) |
| p.add_argument( |
| "--skip-members", |
| action="store_true", |
| help="Create groups only, do not add members", |
| ) |
| p.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Print what would be done without sending any requests", |
| ) |
| p.add_argument( |
| "--verbose", |
| action="store_true", |
| help="Enable DEBUG-level logging", |
| ) |
| return p |
| |
| |
| def main() -> None: |
| args = build_parser().parse_args() |
| |
| if args.verbose: |
| logging.getLogger().setLevel(logging.DEBUG) |
| |
| path = Path(args.file) |
| if not path.exists(): |
| log.error("Input file not found: %s", path) |
| sys.exit(1) |
| |
| base_url = args.url.rstrip("/") |
| |
| if args.dry_run: |
| auth_header = "Bearer <dry-run>" |
| else: |
| password = args.password |
| if password is None: |
| import getpass |
| password = getpass.getpass(f"Password for '{args.user}': ") |
| token = fetch_bearer_token( |
| base_url=base_url, |
| api_version=args.api_version, |
| user=args.user, |
| password=password, |
| client_id=args.client_id, |
| client_secret=args.client_secret, |
| ) |
| auth_header = f"Bearer {token}" |
| |
| groups = parse_groups(path) |
| log.info("Parsed %d group(s) from %s", len(groups), path) |
| |
| if not groups: |
| log.warning("No groups found – nothing to do.") |
| return |
| |
| import_groups( |
| groups=groups, |
| base_url=base_url, |
| api_version=args.api_version, |
| auth_header=auth_header, |
| add_members=not args.skip_members, |
| dry_run=args.dry_run, |
| ) |
| |
| |
| if __name__ == "__main__": |
| main() |