Stabilize Krill scheduling and compression backpressure

Change-Id: I267f9c4f079769fd486646c1da7381de92c6fd5b
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a919d47..f7087d1 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -3440,13 +3440,17 @@
         val maxMB = rt.maxMemory() / (1024 * 1024)
         val outputCount = krillOutputCount.get()
         val totalTexts = expectedTextOrder.size
+        val compressionPoolStats = (compressionExecutor as? java.util.concurrent.ThreadPoolExecutor)?.let { executor ->
+            " compressionPool{active=${executor.activeCount},queued=${executor.queue.size},threads=${executor.poolSize}}"
+        } ?: ""
 
         LOGGER.info(
             "KRILL-STATS reason=$reason output=$outputCount/$totalTexts " +
                 "heapMB{used=$usedMB,total=$totalMB,max=$maxMB} " +
                 "queues{raw=$rawPending,compressed=$compressedPending,inFlight=$compressionInFlight,ready=$readyQueueDepth} " +
                 "peaks{raw=${krillPeakRawPending.get()},compressed=${krillPeakCompressedPending.get()}," +
-                "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}"
+                "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}" +
+                compressionPoolStats
         )
     }
 
@@ -5426,10 +5430,20 @@
         
         // Start compression thread pool (parallel compression)
         val compressionThreads = maxThreads.coerceAtLeast(2)
-        compressionExecutor = java.util.concurrent.Executors.newFixedThreadPool(compressionThreads) { r ->
-            Thread(r, "KrillCompressor-${Thread.currentThread().threadId()}")
-        }
-        LOGGER.info("Started compression thread pool with $compressionThreads threads")
+        val compressionQueueLimit = maxOf(compressionThreads * 2, 16)
+        compressionExecutor = java.util.concurrent.ThreadPoolExecutor(
+            compressionThreads,
+            compressionThreads,
+            0L,
+            java.util.concurrent.TimeUnit.MILLISECONDS,
+            java.util.concurrent.ArrayBlockingQueue(compressionQueueLimit),
+            { r -> Thread(r, "KrillCompressor-${Thread.currentThread().threadId()}") },
+            java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
+        )
+        LOGGER.info(
+            "Started compression thread pool with $compressionThreads threads " +
+                "(queueLimit=$compressionQueueLimit, saturationPolicy=caller-runs)"
+        )
         
         // Start single writer thread (sequential TAR writing)
         incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
@@ -5642,7 +5656,11 @@
             0
         }
 
-        // Phase 1: Find ALL ready texts (order doesn't matter for output)
+        // Phase 1: Find texts that are complete in the legacy ZIP-tracking flow.
+        // In the current work-stealing Krill flow, completion handlers already enqueue
+        // compression as soon as a text's expected foundries are done. Re-enqueueing from
+        // the writer thread is harmful because a saturated compression pool can bounce
+        // work back onto this single scheduler thread and stall output entirely.
         var checkedCount = 0
         var newFlowCount = 0
         var oldFlowCount = 0
@@ -5670,7 +5688,7 @@
                 }
             }
             
-            if (allProcessed) {
+            if (allProcessed && !expectedFoundriesPerText.containsKey(textId)) {
                 val textData = krillData[textId]
                 if (textData != null) {
                     enqueueKrillCompression(textId, textData)