Import c2-grps group definitions into Kustvakt.
c2-gprs.dat file is required
Change-Id: I6a7e200e380f797ba4fefad8737ea08073d14fe7
diff --git a/bin/import-c2-groups.py b/bin/import-c2-groups.py
new file mode 100755
index 0000000..04dce16
--- /dev/null
+++ b/bin/import-c2-groups.py
@@ -0,0 +1,326 @@
+#!/usr/bin/env python3
+"""
+import-c2-groups.py – Import c2-grps group definitions into Kustvakt.
+
+Input file format (one entry per line):
+ G_groupName=username1,username2,username3
+
+Lines starting with '#' or blank lines are ignored.
+The 'G_' prefix is stripped from group names before importing.
+
+The script obtains a Bearer token via the OAuth2 password grant and calls:
+ PUT <base-url>/<api-version>/group/@<groupName>
+ PUT <base-url>/<api-version>/group/@<groupName>/member
+
+Usage:
+ python3 import-c2-groups.py --file data/c2-grps.txt \\
+ --url http://localhost:8080/api \\
+ --user admin --password pass \\
+ --client-id <client_id> --client-secret <client_secret>
+
+Run with --help for all options.
+"""
+
+import argparse
+import json
+import logging
+import sys
+import urllib.error
+import urllib.parse
+import urllib.request
+from pathlib import Path
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(levelname)s %(message)s",
+)
+log = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# HTTP helpers
+# ---------------------------------------------------------------------------
+
+def fetch_bearer_token(
+ base_url: str,
+ api_version: str,
+ user: str,
+ password: str,
+ client_id: str,
+ client_secret: str,
+) -> str:
+ """Obtain an OAuth2 Bearer token via the resource-owner password grant.
+
+ Returns the raw access-token string.
+ Raises SystemExit on failure.
+ """
+ token_url = f"{base_url}/{api_version}/oauth2/token"
+ body = urllib.parse.urlencode({
+ "grant_type": "password",
+ "username": user,
+ "password": password,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ }).encode()
+
+ req = urllib.request.Request(
+ token_url,
+ data=body,
+ method="POST",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
+ try:
+ with urllib.request.urlopen(req) as resp:
+ data = json.loads(resp.read().decode())
+ except urllib.error.HTTPError as exc:
+ log.error("Token request failed (HTTP %s): %s",
+ exc.code, exc.read().decode(errors="replace"))
+ sys.exit(1)
+ except urllib.error.URLError as exc:
+ log.error("Token request network error: %s", exc.reason)
+ sys.exit(1)
+
+ token = data.get("access_token")
+ if not token:
+ log.error("No access_token in response: %s", data)
+ sys.exit(1)
+
+ log.debug("Bearer token obtained successfully.")
+ return token
+
+
+def _put(url: str, form: dict, auth_header: str, dry_run: bool) -> int:
+ """Send a PUT request with application/x-www-form-urlencoded body.
+
+ Returns the HTTP status code, or 0 on network/URL error.
+ """
+ body = urllib.parse.urlencode(form).encode()
+ req = urllib.request.Request(
+ url,
+ data=body,
+ method="PUT",
+ headers={
+ "Authorization": auth_header,
+ "Content-Type": "application/x-www-form-urlencoded",
+ },
+ )
+ if dry_run:
+ log.info("[DRY-RUN] PUT %s body=%s", url, form)
+ return 201
+
+ try:
+ with urllib.request.urlopen(req) as resp:
+ return resp.status
+ except urllib.error.HTTPError as exc:
+ body_text = exc.read().decode(errors="replace")
+ log.error("HTTP %s for %s – %s", exc.code, url, body_text)
+ return exc.code
+ except urllib.error.URLError as exc:
+ log.error("Network error for %s – %s", url, exc.reason)
+ return 0
+
+
+# ---------------------------------------------------------------------------
+# Parsing
+# ---------------------------------------------------------------------------
+
+def parse_groups(path: Path) -> list[tuple[str, list[str]]]:
+ """Parse the c2-grps file and return [(groupName, [member, ...]), ...]."""
+ groups = []
+ for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
+ line = raw.strip()
+ if not line or line.startswith("#"):
+ continue
+ if "=" not in line:
+ log.warning("Line %d: no '=' found, skipping: %s", lineno, line)
+ continue
+ left, _, right = line.partition("=")
+ raw_name = left.strip()
+ group_name = raw_name[2:] if raw_name.startswith("G_") else raw_name
+ members = [m.strip() for m in right.split(",") if m.strip()]
+ # deduplicate while preserving order
+ seen: set[str] = set()
+ unique_members: list[str] = []
+ for m in members:
+ if m not in seen:
+ seen.add(m)
+ unique_members.append(m)
+ groups.append((group_name, unique_members))
+ return groups
+
+
+# ---------------------------------------------------------------------------
+# Import
+# ---------------------------------------------------------------------------
+
+def import_groups(
+ groups: list[tuple[str, list[str]]],
+ base_url: str,
+ api_version: str,
+ auth_header: str,
+ add_members: bool,
+ dry_run: bool,
+) -> None:
+ created = updated = member_ok = member_err = 0
+
+ for group_name, members in groups:
+ group_url = f"{base_url}/{api_version}/group/@{group_name}"
+
+ # --- create / update group ---
+ status = _put(group_url, {}, auth_header, dry_run)
+ if status == 201:
+ log.info("Created group '%s'", group_name)
+ created += 1
+ elif status == 204:
+ log.info("Exists group '%s' (no-op)", group_name)
+ updated += 1
+ else:
+ log.error("Failed to create/update group '%s' (HTTP %s)", group_name, status)
+ continue
+
+ # --- add members ---
+ if add_members and members:
+ member_url = f"{group_url}/member"
+ status = _put(
+ member_url,
+ {"members": ",".join(members)},
+ auth_header,
+ dry_run,
+ )
+ if status in (200, 201, 204):
+ log.info(" Added %d member(s) to '%s'", len(members), group_name)
+ member_ok += 1
+ else:
+ log.error(
+ " Failed to add members to '%s' (HTTP %s)", group_name, status
+ )
+ member_err += 1
+ elif add_members and not members:
+ log.debug(" No members listed for '%s'", group_name)
+
+ log.info(
+ "Done: %d created, %d already existed, "
+ "%d member imports OK, %d member imports failed.",
+ created, updated, member_ok, member_err,
+ )
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
+
+def build_parser() -> argparse.ArgumentParser:
+ p = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ p.add_argument(
+ "--file",
+ default="data/c2-grps.txt",
+ metavar="PATH",
+ help="Path to the c2-grps input file (default: data/c2-grps.txt)",
+ )
+ p.add_argument(
+ "--url",
+ default="http://localhost:8080/api",
+ metavar="URL",
+ help="Kustvakt base URL, without trailing slash "
+ "(default: http://localhost:8080/api)",
+ )
+ p.add_argument(
+ "--api-version",
+ default="v1.0",
+ metavar="VER",
+ help="API version segment, e.g. v1.0 or v1.1 (default: v1.0)",
+ )
+ p.add_argument(
+ "--user",
+ default="admin",
+ metavar="USERNAME",
+ help="Resource-owner username (default: admin)",
+ )
+ p.add_argument(
+ "--password",
+ default=None,
+ metavar="PASSWORD",
+ help="Resource-owner password (prompted if omitted)",
+ )
+ p.add_argument(
+ "--client-id",
+ required=True,
+ metavar="CLIENT_ID",
+ help="OAuth2 client_id for the password grant",
+ )
+ p.add_argument(
+ "--client-secret",
+ default="",
+ metavar="CLIENT_SECRET",
+ help="OAuth2 client_secret (default: empty, for public clients)",
+ )
+ p.add_argument(
+ "--skip-members",
+ action="store_true",
+ help="Create groups only, do not add members",
+ )
+ p.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Print what would be done without sending any requests",
+ )
+ p.add_argument(
+ "--verbose",
+ action="store_true",
+ help="Enable DEBUG-level logging",
+ )
+ return p
+
+
+def main() -> None:
+ args = build_parser().parse_args()
+
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ path = Path(args.file)
+ if not path.exists():
+ log.error("Input file not found: %s", path)
+ sys.exit(1)
+
+ base_url = args.url.rstrip("/")
+
+ if args.dry_run:
+ auth_header = "Bearer <dry-run>"
+ else:
+ password = args.password
+ if password is None:
+ import getpass
+ password = getpass.getpass(f"Password for '{args.user}': ")
+ token = fetch_bearer_token(
+ base_url=base_url,
+ api_version=args.api_version,
+ user=args.user,
+ password=password,
+ client_id=args.client_id,
+ client_secret=args.client_secret,
+ )
+ auth_header = f"Bearer {token}"
+
+ groups = parse_groups(path)
+ log.info("Parsed %d group(s) from %s", len(groups), path)
+
+ if not groups:
+ log.warning("No groups found – nothing to do.")
+ return
+
+ import_groups(
+ groups=groups,
+ base_url=base_url,
+ api_version=args.api_version,
+ auth_header=auth_header,
+ add_members=not args.skip_members,
+ dry_run=args.dry_run,
+ )
+
+
+if __name__ == "__main__":
+ main()