blob: d776da2a61174c667aaab98388d8e91dc539e892 [file] [log] [blame]
#!/usr/bin/env python3
"""
@author margaretha with AI assistance
import-c2-project-vc.py – Parse cdef.all.txt and create project VCs in Kustvakt.
Input file format (tab-separated, one entry per line):
[vcDescription]\tLOAD('[vcName]')\tG_[groupName]
For each line the script:
1. Creates (or updates) a PROJECT virtual corpus named <groupName>-VC
owned by --user, using the corpus query refersto(<vcName>)
2. Shares that VC with the user group <groupName>
Lines starting with '#' or blank lines are ignored.
The script obtains a Bearer token via the OAuth2 password grant and calls:
PUT <base-url>/<api-version>/vc/~<user>/<groupName>-VC (JSON body)
POST <base-url>/<api-version>/vc/~<user>/<groupName>-VC/share/@<groupName>
Usage:
python3 import-c2-project-vc.py --file data/cdef.all.txt \\
--url http://localhost:8080/api \\
----api-version v1.0 \\
--user admin --password pass \\
--client-id <client_id> [--client-secret <client_secret>]
Run with --help for all options.
"""
import argparse
import json
import logging
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
# cdef.all.txt line pattern:
# <vcDescription><TAB>LOAD('<vcName>')<TAB>G_<groupName>
_LINE_RE = re.compile(
r"^(?P<vcDesc>[^\t]*?)"
r"\tLOAD\('(?P<vcName>[^']+)'\)"
r"\tG_(?P<groupName>\S+)$"
)
# Matches lines that have LOAD() but no G_<groupName> column – silently skipped.
_NO_GROUP_RE = re.compile(r"\tLOAD\('[^']+'\)\s*$")
# ---------------------------------------------------------------------------
# Data class
# ---------------------------------------------------------------------------
@dataclass
class CdefEntry:
vc_sigles: list[str] # corpus sigles from LOAD('sigle1 sigle2 ...')
vc_desc: str # description (from the first occurrence of the group)
group_name: str
@property
def kustvakt_vc_name(self) -> str:
"""Name used when creating the VC in Kustvakt: <groupName>-VC"""
return f"{self.group_name}-VC"
# ---------------------------------------------------------------------------
# Parsing
# ---------------------------------------------------------------------------
def parse_cdef(path: Path) -> list[CdefEntry]:
# Use an ordered dict so insertion order (first occurrence) is preserved.
by_group: dict[str, CdefEntry] = {}
for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = raw.rstrip("\n")
if not line.strip() or line.strip().startswith("#"):
continue
m = _LINE_RE.match(line)
if not m:
# Lines with LOAD() but no G_<groupName> are silently skipped.
if _NO_GROUP_RE.search(line):
log.debug("Line %d: no G_<groupName>, skipping: %s", lineno, line)
else:
log.warning("Line %d: unexpected format, skipping: %s", lineno, line)
continue
group_name = m.group("groupName")
# LOAD() contains a space-separated list of corpus sigles.
new_sigles = m.group("vcName").split()
if group_name in by_group:
# Merge: append sigles that are not already present.
existing = by_group[group_name]
seen = set(existing.vc_sigles)
for s in new_sigles:
if s not in seen:
existing.vc_sigles.append(s)
seen.add(s)
log.debug("Line %d: merged %d sigle(s) into group '%s'",
lineno, len(new_sigles), group_name)
else:
by_group[group_name] = CdefEntry(
vc_sigles=list(new_sigles),
vc_desc=m.group("vcDesc").strip(),
group_name=group_name,
)
return list(by_group.values())
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def fetch_bearer_token(
base_url: str,
api_version: str,
user: str,
password: str,
client_id: str,
client_secret: str,
) -> str:
"""Obtain an OAuth2 Bearer token via the resource-owner password grant."""
token_url = f"{base_url}/{api_version}/oauth2/token"
body = urllib.parse.urlencode({
"grant_type": "password",
"username": user,
"password": password,
"client_id": client_id,
"client_secret": client_secret,
}).encode()
req = urllib.request.Request(
token_url,
data=body,
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
log.error("Token request failed (HTTP %s): %s",
exc.code, exc.read().decode(errors="replace"))
sys.exit(1)
except urllib.error.URLError as exc:
log.error("Token request network error: %s", exc.reason)
sys.exit(1)
token = data.get("access_token")
if not token:
log.error("No access_token in response: %s", data)
sys.exit(1)
log.debug("Bearer token obtained successfully.")
return token
def _put_json(url: str, payload: dict, auth_header: str, dry_run: bool) -> int:
"""PUT with an application/json body. Returns HTTP status code."""
body = json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=body,
method="PUT",
headers={
"Authorization": auth_header,
"Content-Type": "application/json;charset=utf-8",
},
)
if dry_run:
log.info("[DRY-RUN] PUT %s body=%s", url, payload)
return 201
try:
with urllib.request.urlopen(req) as resp:
return resp.status
except urllib.error.HTTPError as exc:
log.error("HTTP %s for PUT %s – %s", exc.code, url, exc.read().decode(errors="replace"))
return exc.code
except urllib.error.URLError as exc:
log.error("Network error for PUT %s – %s", url, exc.reason)
return 0
def _post(url: str, auth_header: str, dry_run: bool) -> int:
"""POST with an empty body. Returns HTTP status code."""
req = urllib.request.Request(
url,
data=b"",
method="POST",
headers={"Authorization": auth_header},
)
if dry_run:
log.info("[DRY-RUN] POST %s", url)
return 200
try:
with urllib.request.urlopen(req) as resp:
return resp.status
except urllib.error.HTTPError as exc:
log.error("HTTP %s for POST %s – %s", exc.code, url, exc.read().decode(errors="replace"))
return exc.code
except urllib.error.URLError as exc:
log.error("Network error for POST %s – %s", url, exc.reason)
return 0
# ---------------------------------------------------------------------------
# QueryJson builder
# ---------------------------------------------------------------------------
def _build_query_json(entry: CdefEntry) -> dict:
"""
Build the JSON body that maps to Kustvakt's QueryJson POJO:
QueryJson field │ Java type │ value sent
──────────────────┼────────────────┼──────────────────────────────
type │ ResourceType │ "PROJECT"
queryType │ QueryType │ "VIRTUAL_CORPUS"
corpusQuery │ String │ corpusSigle="A" | corpusSigle="B" ...
description │ String │ <vcDescription>
corpusQuery is a KorAP collection query string that is serialized
to KoralQuery JSON by Kustvakt internally (via Koral QuerySerializer).
Multiple sigles from repeated LOAD() lines for the same group are
combined with | (OR).
The controller (VirtualCorpusController.createUpdateVC) defaults
queryType to VIRTUAL_CORPUS when null, so it could be omitted, but
is included here for clarity.
"""
corpus_query = " | ".join(
f'corpusSigle="{sigle}"' for sigle in entry.vc_sigles
)
return {
"type": "PROJECT", # ResourceType.PROJECT
"queryType": "VIRTUAL_CORPUS", # QueryType.VIRTUAL_CORPUS (default if omitted)
"corpusQuery": corpus_query,
"description": entry.vc_desc,
}
# ---------------------------------------------------------------------------
# Import
# ---------------------------------------------------------------------------
def import_vc(
entries: list[CdefEntry],
base_url: str,
api_version: str,
creator: str,
auth_header: str,
share: bool,
dry_run: bool,
) -> None:
created = updated = shared_ok = shared_err = 0
for e in entries:
vc_url = f"{base_url}/{api_version}/vc/~{creator}/{e.kustvakt_vc_name}"
payload = _build_query_json(e)
# --- create / update VC ---
status = _put_json(vc_url, payload, auth_header, dry_run)
if status == 201:
log.info("Created VC '%s' (group '%s')", e.kustvakt_vc_name, e.group_name)
created += 1
elif status == 204:
log.info("Updated VC '%s' (group '%s')", e.kustvakt_vc_name, e.group_name)
updated += 1
else:
log.error("Failed to create/update VC '%s' (HTTP %s) – skipping share",
e.kustvakt_vc_name, status)
continue
# --- share with group ---
if share:
share_url = f"{vc_url}/share/@{e.group_name}"
status = _post(share_url, auth_header, dry_run)
if status == 200:
log.info(" Shared '%s' with group '%s'",
e.kustvakt_vc_name, e.group_name)
shared_ok += 1
else:
log.error(" Failed to share '%s' with group '%s' (HTTP %s)",
e.kustvakt_vc_name, e.group_name, status)
shared_err += 1
log.info(
"Done: %d created, %d updated, %d shared OK, %d share failed.",
created, updated, shared_ok, shared_err,
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--file",
default="data/cdef.all.txt",
metavar="PATH",
help="Path to cdef.all.txt (default: data/cdef.all.txt)",
)
p.add_argument(
"--url",
default="http://localhost:8080/api",
metavar="URL",
help="Kustvakt base URL, without trailing slash "
"(default: http://localhost:8080/api)",
)
p.add_argument(
"--api-version",
default="v1.0",
metavar="VER",
help="API version segment, e.g. v1.0 or v1.1 (default: v1.0)",
)
p.add_argument(
"--user",
default="admin",
metavar="USERNAME",
help="VC creator / resource-owner username (default: admin)",
)
p.add_argument(
"--password",
default=None,
metavar="PASSWORD",
help="Resource-owner password (prompted if omitted)",
)
p.add_argument(
"--client-id",
required=True,
metavar="CLIENT_ID",
help="OAuth2 client_id for the password grant",
)
p.add_argument(
"--client-secret",
default="",
metavar="CLIENT_SECRET",
help="OAuth2 client_secret (default: empty, for public clients)",
)
p.add_argument(
"--skip-share",
action="store_true",
help="Create VCs only, do not share them with the groups",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done without sending any requests",
)
p.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging",
)
return p
def main() -> None:
args = build_parser().parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
path = Path(args.file)
if not path.exists():
log.error("Input file not found: %s", path)
sys.exit(1)
base_url = args.url.rstrip("/")
if args.dry_run:
auth_header = "Bearer <dry-run>"
else:
password = args.password
if password is None:
import getpass
password = getpass.getpass(f"Password for '{args.user}': ")
token = fetch_bearer_token(
base_url=base_url,
api_version=args.api_version,
user=args.user,
password=password,
client_id=args.client_id,
client_secret=args.client_secret,
)
auth_header = f"Bearer {token}"
entries = parse_cdef(path)
log.info("Parsed %d entr%s from %s",
len(entries), "y" if len(entries) == 1 else "ies", path)
if not entries:
log.warning("No entries found – nothing to do.")
return
import_vc(
entries=entries,
base_url=base_url,
api_version=args.api_version,
creator=args.user,
auth_header=auth_header,
share=not args.skip_share,
dry_run=args.dry_run,
)
if __name__ == "__main__":
main()