Improve fairness of watermark scheduler
Change-Id: I687866587e3ada645ac0089b72c940a4dbf5a716
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 5347597..50e90d5 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1793,9 +1793,33 @@
LOGGER.fine("Foundry watermarks before submitting $foundry entries: $watermarkStats")
}
- // Submit one task per text ID (each task processes all entries for that text)
+ // Submit tasks with watermark-aware throttling
+ // Only submit tasks within a window of the global minimum watermark
+ // This keeps all foundries progressing together
+ val windowSize = maxOf(1000, maxThreads * 10) // Allow some lead but not too much
val latch = java.util.concurrent.CountDownLatch(textIds.size)
+
textIds.forEach { textId ->
+ // Check if we should throttle submission based on watermarks
+ var shouldWait = true
+ while (shouldWait) {
+ val minWatermark = foundryWatermarks.values.minByOrNull { it } ?: textId
+ val currentWatermark = foundryWatermarks[foundry] ?: textId
+
+ // Calculate how far ahead this foundry is from the minimum
+ val minIndex = textIds.indexOf(minWatermark).takeIf { it >= 0 } ?: 0
+ val currentIndex = textIds.indexOf(currentWatermark).takeIf { it >= 0 } ?: 0
+ val textIdIndex = textIds.indexOf(textId)
+
+ // Allow submission if within window of minimum or if we're behind minimum
+ if (textIdIndex < 0 || textIdIndex <= minIndex + windowSize || currentIndex <= minIndex) {
+ shouldWait = false
+ } else {
+ // Wait briefly for slower foundries to catch up
+ Thread.sleep(10)
+ }
+ }
+
val textEntries = entriesByTextId[textId] ?: emptyList()
val prioritizedTask = PrioritizedTask(foundry, textId, Runnable {