| #!/usr/bin/env python3 |
| """ |
| @author margaretha with AI assistance |
| |
| |
| import-c2-project-vc.py – Parse cdef.all.txt and create project VCs in Kustvakt. |
| |
| Input file format (tab-separated, one entry per line): |
| [vcDescription]\tLOAD('[vcName]')\tG_[groupName] |
| |
| For each line the script: |
| 1. Creates (or updates) a PROJECT virtual corpus named <groupName>-VC |
| owned by --user, using the corpus query refersto(<vcName>) |
| 2. Shares that VC with the user group <groupName> |
| |
| Lines starting with '#' or blank lines are ignored. |
| |
| The script obtains a Bearer token via the OAuth2 password grant and calls: |
| PUT <base-url>/<api-version>/vc/~<user>/<groupName>-VC (JSON body) |
| POST <base-url>/<api-version>/vc/~<user>/<groupName>-VC/share/@<groupName> |
| |
| Usage: |
| python3 import-c2-project-vc.py --file data/cdef.all.txt \\ |
| --url http://localhost:8080/api \\ |
| ----api-version v1.0 \\ |
| --user admin --password pass \\ |
| --client-id <client_id> [--client-secret <client_secret>] |
| |
| Run with --help for all options. |
| """ |
| |
| import argparse |
| import json |
| import logging |
| import re |
| import sys |
| import urllib.error |
| import urllib.parse |
| import urllib.request |
| from dataclasses import dataclass |
| from pathlib import Path |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(levelname)s %(message)s", |
| ) |
| log = logging.getLogger(__name__) |
| |
| # cdef.all.txt line pattern: |
| # <vcDescription><TAB>LOAD('<vcName>')<TAB>G_<groupName> |
| _LINE_RE = re.compile( |
| r"^(?P<vcDesc>[^\t]*?)" |
| r"\tLOAD\('(?P<vcName>[^']+)'\)" |
| r"\tG_(?P<groupName>\S+)$" |
| ) |
| |
| # Matches lines that have LOAD() but no G_<groupName> column – silently skipped. |
| _NO_GROUP_RE = re.compile(r"\tLOAD\('[^']+'\)\s*$") |
| |
| |
| # --------------------------------------------------------------------------- |
| # Data class |
| # --------------------------------------------------------------------------- |
| |
| @dataclass |
| class CdefEntry: |
| vc_sigles: list[str] # corpus sigles from LOAD('sigle1 sigle2 ...') |
| vc_desc: str # description (from the first occurrence of the group) |
| group_name: str |
| |
| @property |
| def kustvakt_vc_name(self) -> str: |
| """Name used when creating the VC in Kustvakt: <groupName>-VC""" |
| return f"{self.group_name}-VC" |
| |
| |
| # --------------------------------------------------------------------------- |
| # Parsing |
| # --------------------------------------------------------------------------- |
| |
| def parse_cdef(path: Path) -> list[CdefEntry]: |
| # Use an ordered dict so insertion order (first occurrence) is preserved. |
| by_group: dict[str, CdefEntry] = {} |
| |
| for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): |
| line = raw.rstrip("\n") |
| if not line.strip() or line.strip().startswith("#"): |
| continue |
| m = _LINE_RE.match(line) |
| if not m: |
| # Lines with LOAD() but no G_<groupName> are silently skipped. |
| if _NO_GROUP_RE.search(line): |
| log.debug("Line %d: no G_<groupName>, skipping: %s", lineno, line) |
| else: |
| log.warning("Line %d: unexpected format, skipping: %s", lineno, line) |
| continue |
| |
| group_name = m.group("groupName") |
| # LOAD() contains a space-separated list of corpus sigles. |
| new_sigles = m.group("vcName").split() |
| |
| if group_name in by_group: |
| # Merge: append sigles that are not already present. |
| existing = by_group[group_name] |
| seen = set(existing.vc_sigles) |
| for s in new_sigles: |
| if s not in seen: |
| existing.vc_sigles.append(s) |
| seen.add(s) |
| log.debug("Line %d: merged %d sigle(s) into group '%s'", |
| lineno, len(new_sigles), group_name) |
| else: |
| by_group[group_name] = CdefEntry( |
| vc_sigles=list(new_sigles), |
| vc_desc=m.group("vcDesc").strip(), |
| group_name=group_name, |
| ) |
| |
| return list(by_group.values()) |
| |
| |
| # --------------------------------------------------------------------------- |
| # HTTP helpers |
| # --------------------------------------------------------------------------- |
| |
| def fetch_bearer_token( |
| base_url: str, |
| api_version: str, |
| user: str, |
| password: str, |
| client_id: str, |
| client_secret: str, |
| ) -> str: |
| """Obtain an OAuth2 Bearer token via the resource-owner password grant.""" |
| token_url = f"{base_url}/{api_version}/oauth2/token" |
| body = urllib.parse.urlencode({ |
| "grant_type": "password", |
| "username": user, |
| "password": password, |
| "client_id": client_id, |
| "client_secret": client_secret, |
| }).encode() |
| req = urllib.request.Request( |
| token_url, |
| data=body, |
| method="POST", |
| headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| ) |
| try: |
| with urllib.request.urlopen(req) as resp: |
| data = json.loads(resp.read().decode()) |
| except urllib.error.HTTPError as exc: |
| log.error("Token request failed (HTTP %s): %s", |
| exc.code, exc.read().decode(errors="replace")) |
| sys.exit(1) |
| except urllib.error.URLError as exc: |
| log.error("Token request network error: %s", exc.reason) |
| sys.exit(1) |
| |
| token = data.get("access_token") |
| if not token: |
| log.error("No access_token in response: %s", data) |
| sys.exit(1) |
| |
| log.debug("Bearer token obtained successfully.") |
| return token |
| |
| |
| def _put_json(url: str, payload: dict, auth_header: str, dry_run: bool) -> int: |
| """PUT with an application/json body. Returns HTTP status code.""" |
| body = json.dumps(payload).encode() |
| req = urllib.request.Request( |
| url, |
| data=body, |
| method="PUT", |
| headers={ |
| "Authorization": auth_header, |
| "Content-Type": "application/json;charset=utf-8", |
| }, |
| ) |
| if dry_run: |
| log.info("[DRY-RUN] PUT %s body=%s", url, payload) |
| return 201 |
| |
| try: |
| with urllib.request.urlopen(req) as resp: |
| return resp.status |
| except urllib.error.HTTPError as exc: |
| log.error("HTTP %s for PUT %s – %s", exc.code, url, exc.read().decode(errors="replace")) |
| return exc.code |
| except urllib.error.URLError as exc: |
| log.error("Network error for PUT %s – %s", url, exc.reason) |
| return 0 |
| |
| |
| def _post(url: str, auth_header: str, dry_run: bool) -> int: |
| """POST with an empty body. Returns HTTP status code.""" |
| req = urllib.request.Request( |
| url, |
| data=b"", |
| method="POST", |
| headers={"Authorization": auth_header}, |
| ) |
| if dry_run: |
| log.info("[DRY-RUN] POST %s", url) |
| return 200 |
| |
| try: |
| with urllib.request.urlopen(req) as resp: |
| return resp.status |
| except urllib.error.HTTPError as exc: |
| log.error("HTTP %s for POST %s – %s", exc.code, url, exc.read().decode(errors="replace")) |
| return exc.code |
| except urllib.error.URLError as exc: |
| log.error("Network error for POST %s – %s", url, exc.reason) |
| return 0 |
| |
| |
| # --------------------------------------------------------------------------- |
| # QueryJson builder |
| # --------------------------------------------------------------------------- |
| |
| def _build_query_json(entry: CdefEntry) -> dict: |
| """ |
| Build the JSON body that maps to Kustvakt's QueryJson POJO: |
| |
| QueryJson field │ Java type │ value sent |
| ──────────────────┼────────────────┼────────────────────────────── |
| type │ ResourceType │ "PROJECT" |
| queryType │ QueryType │ "VIRTUAL_CORPUS" |
| corpusQuery │ String │ corpusSigle="A" | corpusSigle="B" ... |
| description │ String │ <vcDescription> |
| |
| corpusQuery is a KorAP collection query string that is serialized |
| to KoralQuery JSON by Kustvakt internally (via Koral QuerySerializer). |
| Multiple sigles from repeated LOAD() lines for the same group are |
| combined with | (OR). |
| |
| The controller (VirtualCorpusController.createUpdateVC) defaults |
| queryType to VIRTUAL_CORPUS when null, so it could be omitted, but |
| is included here for clarity. |
| """ |
| corpus_query = " | ".join( |
| f'corpusSigle="{sigle}"' for sigle in entry.vc_sigles |
| ) |
| return { |
| "type": "PROJECT", # ResourceType.PROJECT |
| "queryType": "VIRTUAL_CORPUS", # QueryType.VIRTUAL_CORPUS (default if omitted) |
| "corpusQuery": corpus_query, |
| "description": entry.vc_desc, |
| } |
| |
| |
| # --------------------------------------------------------------------------- |
| # Import |
| # --------------------------------------------------------------------------- |
| |
| def import_vc( |
| entries: list[CdefEntry], |
| base_url: str, |
| api_version: str, |
| creator: str, |
| auth_header: str, |
| share: bool, |
| dry_run: bool, |
| ) -> None: |
| created = updated = shared_ok = shared_err = 0 |
| |
| for e in entries: |
| vc_url = f"{base_url}/{api_version}/vc/~{creator}/{e.kustvakt_vc_name}" |
| payload = _build_query_json(e) |
| |
| # --- create / update VC --- |
| status = _put_json(vc_url, payload, auth_header, dry_run) |
| if status == 201: |
| log.info("Created VC '%s' (group '%s')", e.kustvakt_vc_name, e.group_name) |
| created += 1 |
| elif status == 204: |
| log.info("Updated VC '%s' (group '%s')", e.kustvakt_vc_name, e.group_name) |
| updated += 1 |
| else: |
| log.error("Failed to create/update VC '%s' (HTTP %s) – skipping share", |
| e.kustvakt_vc_name, status) |
| continue |
| |
| # --- share with group --- |
| if share: |
| share_url = f"{vc_url}/share/@{e.group_name}" |
| status = _post(share_url, auth_header, dry_run) |
| if status == 200: |
| log.info(" Shared '%s' with group '%s'", |
| e.kustvakt_vc_name, e.group_name) |
| shared_ok += 1 |
| else: |
| log.error(" Failed to share '%s' with group '%s' (HTTP %s)", |
| e.kustvakt_vc_name, e.group_name, status) |
| shared_err += 1 |
| |
| log.info( |
| "Done: %d created, %d updated, %d shared OK, %d share failed.", |
| created, updated, shared_ok, shared_err, |
| ) |
| |
| |
| # --------------------------------------------------------------------------- |
| # CLI |
| # --------------------------------------------------------------------------- |
| |
| def build_parser() -> argparse.ArgumentParser: |
| p = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| p.add_argument( |
| "--file", |
| default="data/cdef.all.txt", |
| metavar="PATH", |
| help="Path to cdef.all.txt (default: data/cdef.all.txt)", |
| ) |
| p.add_argument( |
| "--url", |
| default="http://localhost:8080/api", |
| metavar="URL", |
| help="Kustvakt base URL, without trailing slash " |
| "(default: http://localhost:8080/api)", |
| ) |
| p.add_argument( |
| "--api-version", |
| default="v1.0", |
| metavar="VER", |
| help="API version segment, e.g. v1.0 or v1.1 (default: v1.0)", |
| ) |
| p.add_argument( |
| "--user", |
| default="admin", |
| metavar="USERNAME", |
| help="VC creator / resource-owner username (default: admin)", |
| ) |
| p.add_argument( |
| "--password", |
| default=None, |
| metavar="PASSWORD", |
| help="Resource-owner password (prompted if omitted)", |
| ) |
| p.add_argument( |
| "--client-id", |
| required=True, |
| metavar="CLIENT_ID", |
| help="OAuth2 client_id for the password grant", |
| ) |
| p.add_argument( |
| "--client-secret", |
| default="", |
| metavar="CLIENT_SECRET", |
| help="OAuth2 client_secret (default: empty, for public clients)", |
| ) |
| p.add_argument( |
| "--skip-share", |
| action="store_true", |
| help="Create VCs only, do not share them with the groups", |
| ) |
| p.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Print what would be done without sending any requests", |
| ) |
| p.add_argument( |
| "--verbose", |
| action="store_true", |
| help="Enable DEBUG-level logging", |
| ) |
| return p |
| |
| |
| def main() -> None: |
| args = build_parser().parse_args() |
| |
| if args.verbose: |
| logging.getLogger().setLevel(logging.DEBUG) |
| |
| path = Path(args.file) |
| if not path.exists(): |
| log.error("Input file not found: %s", path) |
| sys.exit(1) |
| |
| base_url = args.url.rstrip("/") |
| |
| if args.dry_run: |
| auth_header = "Bearer <dry-run>" |
| else: |
| password = args.password |
| if password is None: |
| import getpass |
| password = getpass.getpass(f"Password for '{args.user}': ") |
| token = fetch_bearer_token( |
| base_url=base_url, |
| api_version=args.api_version, |
| user=args.user, |
| password=password, |
| client_id=args.client_id, |
| client_secret=args.client_secret, |
| ) |
| auth_header = f"Bearer {token}" |
| |
| entries = parse_cdef(path) |
| log.info("Parsed %d entr%s from %s", |
| len(entries), "y" if len(entries) == 1 else "ies", path) |
| |
| if not entries: |
| log.warning("No entries found – nothing to do.") |
| return |
| |
| import_vc( |
| entries=entries, |
| base_url=base_url, |
| api_version=args.api_version, |
| creator=args.user, |
| auth_header=auth_header, |
| share=not args.skip_share, |
| dry_run=args.dry_run, |
| ) |
| |
| |
| if __name__ == "__main__": |
| main() |