Fix scheduler for external annotators

Change-Id: I5b98620ea87be8b53cb7204d18aec8989ea3709a
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a821718..11f1cbe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
 
 - Krill metadata inheritance now ignores empty text-level `creatDate`/`pubDate` elements, inherits metadata consistently from corpus and document headers, and backfills `creationDate` and `pubDate` from each other so both dates are always present once either one is available
 - Corpus and document headers now expose the same common Krill metadata fields for downstream text-level inheritance, including title/author-style fields and publication metadata
+- External annotation with ZIP output no longer slows down progressively on very large corpora; the text-submission scheduler now avoids repeated text-order list scans and the hot document-processing path no longer forces periodic full GCs
 
 ## [v3.3.2] - 2026-04-06
 
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index fd05dbb..066c0a8 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1196,6 +1196,9 @@
     var dBuilder: DocumentBuilder? = null
     // Output stream for the "morpho" layer ZIP entries
     internal var morphoZipOutputStream: ZipArchiveOutputStream? = null
+    // Dedicated single-thread writer for async ZIP annotation output
+    private var annotationZipWriteQueue: java.util.concurrent.LinkedBlockingQueue<Pair<String, AnnotationWorkerPool.AnnotationTask?>>? = null
+    private var annotationZipWriteExecutor: java.util.concurrent.ExecutorService? = null
     var krillTarOutputStream: TarArchiveOutputStream? = null
     var krillOutputFileName: String? = null
     private var krillOutputPath: String? = null
@@ -1547,8 +1550,28 @@
             }
 
             if (outputFormat == OutputFormat.KORAP_XML) {
+                // Write to ZIP from a dedicated single thread so annotation workers are
+                // never blocked waiting for the ZIP write lock.  Without this, all 12
+                // workers compete for synchronized(morphoZipOutputStream), each worker's
+                // reader coroutine stalls, the Docker container's stdout pipe fills up,
+                // the container blocks on write, and effective throughput collapses.
+                val writeQueue = java.util.concurrent.LinkedBlockingQueue<Pair<String, AnnotationWorkerPool.AnnotationTask?>>(maxThreads * 4)
+                val writeExecutor = java.util.concurrent.Executors.newSingleThreadExecutor()
+                annotationZipWriteQueue = writeQueue
+                annotationZipWriteExecutor = writeExecutor
+                writeExecutor.execute {
+                    try {
+                        while (true) {
+                            val (text, task) = writeQueue.take()
+                            if (text.isEmpty() && task == null) break  // poison pill
+                            parseAndWriteAnnotatedConllu(text, task)
+                        }
+                    } catch (e: InterruptedException) {
+                        Thread.currentThread().interrupt()
+                    }
+                }
                 annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER, { annotatedConllu, task ->
-                     parseAndWriteAnnotatedConllu(annotatedConllu, task)
+                    writeQueue.put(Pair(annotatedConllu, task))
                 }, stderrLogPath = logFilePath)
             } else {
                 val handler: ((String, AnnotationWorkerPool.AnnotationTask?) -> Unit)? = if (outputFile != null) {
@@ -1679,6 +1702,10 @@
             LOGGER.info("closing worker pool")
             LOGGER.info("Documents sent to annotation: ${docsSentToAnnotation.get()}")
             annotationWorkerPool?.close()
+            // Signal the async ZIP writer thread to drain and stop, then wait for it
+            annotationZipWriteQueue?.put(Pair("", null))  // poison pill
+            annotationZipWriteExecutor?.shutdown()
+            annotationZipWriteExecutor?.awaitTermination(2, java.util.concurrent.TimeUnit.HOURS)
             LOGGER.info("Documents written to ZIP: ${docsWrittenToZip.get()}")
 
             // Close the ZIP file after worker pool is done (if using external annotation with ZIP output)
@@ -2759,6 +2786,10 @@
         // Group entries by text ID to ensure all files for a text are processed together
         val entriesByTextId = entries.groupBy { getTextIdFromPath(it.name) }
         val textIds = entriesByTextId.keys.sortedWith(this::compareTextIds)  // Process text IDs in month-aware order
+        val textIdPositions = HashMap<String, Int>(textIds.size)
+        textIds.forEachIndexed { index, textId ->
+            textIdPositions[textId] = index
+        }
 
         // Initialize watermark for this foundry if not exists (set to first text ID)
         if (!foundryWatermarks.containsKey(foundry) && textIds.isNotEmpty()) {
@@ -2801,8 +2832,8 @@
                         ?: textId
                     
                     // Calculate position difference
-                    val minIndex = textIds.indexOf(minNextTextId).takeIf { it >= 0 } ?: 0
-                    val currentIndex = textIds.indexOf(textId).takeIf { it >= 0 } ?: 0
+                    val minIndex = textIdPositions[minNextTextId] ?: 0
+                    val currentIndex = textIdPositions[textId] ?: 0
                     
                     // Allow submission if within window or if we're behind
                     if (currentIndex <= minIndex + submissionWindow) {
@@ -2855,12 +2886,12 @@
                 // Check if we should throttle submission based on watermarks
                 var shouldWait = true
                 while (shouldWait) {
-                    val minWatermark = foundryWatermarks.values.minByOrNull { it } ?: textId
+                    val minWatermark = foundryWatermarks.values.minWithOrNull(this::compareTextIds) ?: textId
                     val currentWatermark = foundryWatermarks[foundry] ?: textId
                     
-                    val minIndex = textIds.indexOf(minWatermark).takeIf { it >= 0 } ?: 0
-                    val currentIndex = textIds.indexOf(currentWatermark).takeIf { it >= 0 } ?: 0
-                    val textIdIndex = textIds.indexOf(textId)
+                    val minIndex = textIdPositions[minWatermark] ?: 0
+                    val currentIndex = textIdPositions[currentWatermark] ?: 0
+                    val textIdIndex = textIdPositions[textId] ?: -1
                     
                     if (textIdIndex < 0 || textIdIndex <= minIndex + windowSize || currentIndex <= minIndex) {
                         shouldWait = false
@@ -3677,16 +3708,11 @@
             if (!quiet) progressBar?.step()
         }
 
-        // Periodic GC hint after processing many docs (lightweight safeguard)
-        if ((processedDocs.incrementAndGet() % 2000) == 0) {
-            LOGGER.fine("Processed ${processedDocs.get()} docs – requesting GC hint")
-            System.gc()
-        }
+        val processedCount = processedDocs.incrementAndGet()
         // Memory / cache statistics logging
         if (memStatsInterval > 0) {
-            val count = processedDocs.get()
-            if (count % memStatsInterval == 0) {
-                logMemoryStats(count)
+            if (processedCount % memStatsInterval == 0) {
+                logMemoryStats(processedCount)
             }
         }
     }
@@ -4775,6 +4801,16 @@
 
         morpho.remove(tempDocId)
         sentences.remove(tempDocId)
+
+        // Free the original document data loaded during ZIP parsing – no longer needed after annotation is written.
+        // Without this cleanup, tokens/texts/metadata accumulate for every processed document, causing
+        // progressive GC pressure and throughput degradation in large corpora.
+        texts.remove(docId)
+        tokens.remove(docId)
+        sentences.remove(docId)
+        fnames.remove(docId)
+        metadata.remove(docId)
+        extraFeatures.remove(docId)
     }
 
     /**