Fix scheduler for external annotators
Change-Id: I5b98620ea87be8b53cb7204d18aec8989ea3709a
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a821718..11f1cbe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
- Krill metadata inheritance now ignores empty text-level `creatDate`/`pubDate` elements, inherits metadata consistently from corpus and document headers, and backfills `creationDate` and `pubDate` from each other so both dates are always present once either one is available
- Corpus and document headers now expose the same common Krill metadata fields for downstream text-level inheritance, including title/author-style fields and publication metadata
+- External annotation with ZIP output no longer slows down progressively on very large corpora; the text-submission scheduler now avoids repeated text-order list scans and the hot document-processing path no longer forces periodic full GCs
## [v3.3.2] - 2026-04-06
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index fd05dbb..066c0a8 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1196,6 +1196,9 @@
var dBuilder: DocumentBuilder? = null
// Output stream for the "morpho" layer ZIP entries
internal var morphoZipOutputStream: ZipArchiveOutputStream? = null
+ // Dedicated single-thread writer for async ZIP annotation output
+ private var annotationZipWriteQueue: java.util.concurrent.LinkedBlockingQueue<Pair<String, AnnotationWorkerPool.AnnotationTask?>>? = null
+ private var annotationZipWriteExecutor: java.util.concurrent.ExecutorService? = null
var krillTarOutputStream: TarArchiveOutputStream? = null
var krillOutputFileName: String? = null
private var krillOutputPath: String? = null
@@ -1547,8 +1550,28 @@
}
if (outputFormat == OutputFormat.KORAP_XML) {
+ // Write to ZIP from a dedicated single thread so annotation workers are
+ // never blocked waiting for the ZIP write lock. Without this, all 12
+ // workers compete for synchronized(morphoZipOutputStream), each worker's
+ // reader coroutine stalls, the Docker container's stdout pipe fills up,
+ // the container blocks on write, and effective throughput collapses.
+ val writeQueue = java.util.concurrent.LinkedBlockingQueue<Pair<String, AnnotationWorkerPool.AnnotationTask?>>(maxThreads * 4)
+ val writeExecutor = java.util.concurrent.Executors.newSingleThreadExecutor()
+ annotationZipWriteQueue = writeQueue
+ annotationZipWriteExecutor = writeExecutor
+ writeExecutor.execute {
+ try {
+ while (true) {
+ val (text, task) = writeQueue.take()
+ if (text.isEmpty() && task == null) break // poison pill
+ parseAndWriteAnnotatedConllu(text, task)
+ }
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ }
+ }
annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER, { annotatedConllu, task ->
- parseAndWriteAnnotatedConllu(annotatedConllu, task)
+ writeQueue.put(Pair(annotatedConllu, task))
}, stderrLogPath = logFilePath)
} else {
val handler: ((String, AnnotationWorkerPool.AnnotationTask?) -> Unit)? = if (outputFile != null) {
@@ -1679,6 +1702,10 @@
LOGGER.info("closing worker pool")
LOGGER.info("Documents sent to annotation: ${docsSentToAnnotation.get()}")
annotationWorkerPool?.close()
+ // Signal the async ZIP writer thread to drain and stop, then wait for it
+ annotationZipWriteQueue?.put(Pair("", null)) // poison pill
+ annotationZipWriteExecutor?.shutdown()
+ annotationZipWriteExecutor?.awaitTermination(2, java.util.concurrent.TimeUnit.HOURS)
LOGGER.info("Documents written to ZIP: ${docsWrittenToZip.get()}")
// Close the ZIP file after worker pool is done (if using external annotation with ZIP output)
@@ -2759,6 +2786,10 @@
// Group entries by text ID to ensure all files for a text are processed together
val entriesByTextId = entries.groupBy { getTextIdFromPath(it.name) }
val textIds = entriesByTextId.keys.sortedWith(this::compareTextIds) // Process text IDs in month-aware order
+ val textIdPositions = HashMap<String, Int>(textIds.size)
+ textIds.forEachIndexed { index, textId ->
+ textIdPositions[textId] = index
+ }
// Initialize watermark for this foundry if not exists (set to first text ID)
if (!foundryWatermarks.containsKey(foundry) && textIds.isNotEmpty()) {
@@ -2801,8 +2832,8 @@
?: textId
// Calculate position difference
- val minIndex = textIds.indexOf(minNextTextId).takeIf { it >= 0 } ?: 0
- val currentIndex = textIds.indexOf(textId).takeIf { it >= 0 } ?: 0
+ val minIndex = textIdPositions[minNextTextId] ?: 0
+ val currentIndex = textIdPositions[textId] ?: 0
// Allow submission if within window or if we're behind
if (currentIndex <= minIndex + submissionWindow) {
@@ -2855,12 +2886,12 @@
// Check if we should throttle submission based on watermarks
var shouldWait = true
while (shouldWait) {
- val minWatermark = foundryWatermarks.values.minByOrNull { it } ?: textId
+ val minWatermark = foundryWatermarks.values.minWithOrNull(this::compareTextIds) ?: textId
val currentWatermark = foundryWatermarks[foundry] ?: textId
- val minIndex = textIds.indexOf(minWatermark).takeIf { it >= 0 } ?: 0
- val currentIndex = textIds.indexOf(currentWatermark).takeIf { it >= 0 } ?: 0
- val textIdIndex = textIds.indexOf(textId)
+ val minIndex = textIdPositions[minWatermark] ?: 0
+ val currentIndex = textIdPositions[currentWatermark] ?: 0
+ val textIdIndex = textIdPositions[textId] ?: -1
if (textIdIndex < 0 || textIdIndex <= minIndex + windowSize || currentIndex <= minIndex) {
shouldWait = false
@@ -3677,16 +3708,11 @@
if (!quiet) progressBar?.step()
}
- // Periodic GC hint after processing many docs (lightweight safeguard)
- if ((processedDocs.incrementAndGet() % 2000) == 0) {
- LOGGER.fine("Processed ${processedDocs.get()} docs – requesting GC hint")
- System.gc()
- }
+ val processedCount = processedDocs.incrementAndGet()
// Memory / cache statistics logging
if (memStatsInterval > 0) {
- val count = processedDocs.get()
- if (count % memStatsInterval == 0) {
- logMemoryStats(count)
+ if (processedCount % memStatsInterval == 0) {
+ logMemoryStats(processedCount)
}
}
}
@@ -4775,6 +4801,16 @@
morpho.remove(tempDocId)
sentences.remove(tempDocId)
+
+ // Free the original document data loaded during ZIP parsing – no longer needed after annotation is written.
+ // Without this cleanup, tokens/texts/metadata accumulate for every processed document, causing
+ // progressive GC pressure and throughput degradation in large corpora.
+ texts.remove(docId)
+ tokens.remove(docId)
+ sentences.remove(docId)
+ fnames.remove(docId)
+ metadata.remove(docId)
+ extraFeatures.remove(docId)
}
/**