Improve fairness of watermark scheduler

Change-Id: I687866587e3ada645ac0089b72c940a4dbf5a716
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 5347597..50e90d5 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1793,9 +1793,33 @@
             LOGGER.fine("Foundry watermarks before submitting $foundry entries: $watermarkStats")
         }
 
-        // Submit one task per text ID (each task processes all entries for that text)
+        // Submit tasks with watermark-aware throttling
+        // Only submit tasks within a window of the global minimum watermark
+        // This keeps all foundries progressing together
+        val windowSize = maxOf(1000, maxThreads * 10)  // Allow some lead but not too much
         val latch = java.util.concurrent.CountDownLatch(textIds.size)
+        
         textIds.forEach { textId ->
+            // Check if we should throttle submission based on watermarks
+            var shouldWait = true
+            while (shouldWait) {
+                val minWatermark = foundryWatermarks.values.minByOrNull { it } ?: textId
+                val currentWatermark = foundryWatermarks[foundry] ?: textId
+                
+                // Calculate how far ahead this foundry is from the minimum
+                val minIndex = textIds.indexOf(minWatermark).takeIf { it >= 0 } ?: 0
+                val currentIndex = textIds.indexOf(currentWatermark).takeIf { it >= 0 } ?: 0
+                val textIdIndex = textIds.indexOf(textId)
+                
+                // Allow submission if within window of minimum or if we're behind minimum
+                if (textIdIndex < 0 || textIdIndex <= minIndex + windowSize || currentIndex <= minIndex) {
+                    shouldWait = false
+                } else {
+                    // Wait briefly for slower foundries to catch up
+                    Thread.sleep(10)
+                }
+            }
+            
             val textEntries = entriesByTextId[textId] ?: emptyList()
 
             val prioritizedTask = PrioritizedTask(foundry, textId, Runnable {