Stabilize Krill scheduling and compression backpressure
Change-Id: I267f9c4f079769fd486646c1da7381de92c6fd5b
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a919d47..f7087d1 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -3440,13 +3440,17 @@
val maxMB = rt.maxMemory() / (1024 * 1024)
val outputCount = krillOutputCount.get()
val totalTexts = expectedTextOrder.size
+ val compressionPoolStats = (compressionExecutor as? java.util.concurrent.ThreadPoolExecutor)?.let { executor ->
+ " compressionPool{active=${executor.activeCount},queued=${executor.queue.size},threads=${executor.poolSize}}"
+ } ?: ""
LOGGER.info(
"KRILL-STATS reason=$reason output=$outputCount/$totalTexts " +
"heapMB{used=$usedMB,total=$totalMB,max=$maxMB} " +
"queues{raw=$rawPending,compressed=$compressedPending,inFlight=$compressionInFlight,ready=$readyQueueDepth} " +
"peaks{raw=${krillPeakRawPending.get()},compressed=${krillPeakCompressedPending.get()}," +
- "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}"
+ "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}" +
+ compressionPoolStats
)
}
@@ -5426,10 +5430,20 @@
// Start compression thread pool (parallel compression)
val compressionThreads = maxThreads.coerceAtLeast(2)
- compressionExecutor = java.util.concurrent.Executors.newFixedThreadPool(compressionThreads) { r ->
- Thread(r, "KrillCompressor-${Thread.currentThread().threadId()}")
- }
- LOGGER.info("Started compression thread pool with $compressionThreads threads")
+ val compressionQueueLimit = maxOf(compressionThreads * 2, 16)
+ compressionExecutor = java.util.concurrent.ThreadPoolExecutor(
+ compressionThreads,
+ compressionThreads,
+ 0L,
+ java.util.concurrent.TimeUnit.MILLISECONDS,
+ java.util.concurrent.ArrayBlockingQueue(compressionQueueLimit),
+ { r -> Thread(r, "KrillCompressor-${Thread.currentThread().threadId()}") },
+ java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
+ )
+ LOGGER.info(
+ "Started compression thread pool with $compressionThreads threads " +
+ "(queueLimit=$compressionQueueLimit, saturationPolicy=caller-runs)"
+ )
// Start single writer thread (sequential TAR writing)
incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
@@ -5642,7 +5656,11 @@
0
}
- // Phase 1: Find ALL ready texts (order doesn't matter for output)
+ // Phase 1: Find texts that are complete in the legacy ZIP-tracking flow.
+ // In the current work-stealing Krill flow, completion handlers already enqueue
+ // compression as soon as a text's expected foundries are done. Re-enqueueing from
+ // the writer thread is harmful because a saturated compression pool can bounce
+ // work back onto this single scheduler thread and stall output entirely.
var checkedCount = 0
var newFlowCount = 0
var oldFlowCount = 0
@@ -5670,7 +5688,7 @@
}
}
- if (allProcessed) {
+ if (allProcessed && !expectedFoundriesPerText.containsKey(textId)) {
val textData = krillData[textId]
if (textData != null) {
enqueueKrillCompression(textId, textData)