Make sure alle entries are processed in sorted order

Change-Id: I3f1f171533a0b8962436c5a8259848178b9850dd
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 7789bfd..65e83cc 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -338,41 +338,37 @@
 
     private var annotationWorkerPool : AnnotationWorkerPool? = null
 
-    // Track processed text count per foundry for priority scheduling
-    private val processedTextsPerFoundry: ConcurrentHashMap<String, java.util.concurrent.atomic.AtomicInteger> = ConcurrentHashMap()
-
-    // Map foundry to ZIP path for calculating expected text counts
-    private val foundryToZipPath: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+    // Track the next text ID (watermark) each foundry needs to process for priority scheduling
+    // The foundry with the lexicographically smallest next text ID gets priority
+    private val foundryWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
 
     // Priority-based task for foundry-aware scheduling
     private inner class PrioritizedTask(
         val foundry: String,
+        val textId: String,  // The text ID this task will process
         val task: Runnable,
         val submissionTime: Long = System.nanoTime()
     ) : Comparable<PrioritizedTask>, Runnable {
         override fun compareTo(other: PrioritizedTask): Int {
-            // Priority is based on percentage of texts processed, not absolute count
-            // This handles sparse foundries correctly (e.g., cmc foundry only has texts with emojis)
+            // Priority is based on text ID comparison
+            // Tasks with lexicographically smaller text IDs get higher priority
+            // This keeps all foundries progressing through the corpus together
+            // and handles sparse foundries naturally (they won't block on non-existent texts)
 
-            // Get expected text count for each foundry from zipInventory
-            val thisZipPath = foundryToZipPath[foundry]
-            val otherZipPath = foundryToZipPath[other.foundry]
+            // First, compare text IDs lexicographically
+            val textIdDiff = textId.compareTo(other.textId)
+            if (textIdDiff != 0) return textIdDiff
 
-            val thisExpected = thisZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
-            val otherExpected = otherZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+            // If same text ID, prefer base foundry (it should be processed first)
+            if (foundry == "base" && other.foundry != "base") return -1
+            if (foundry != "base" && other.foundry == "base") return 1
 
-            val thisProcessed = processedTextsPerFoundry[foundry]?.get() ?: 0
-            val otherProcessed = processedTextsPerFoundry[other.foundry]?.get() ?: 0
+            // If same text ID and both base or both non-base, use foundry name
+            val foundryDiff = foundry.compareTo(other.foundry)
+            if (foundryDiff != 0) return foundryDiff
 
-            // Calculate percentage (as double to avoid integer division)
-            val thisPercentage = thisProcessed.toDouble() / thisExpected.toDouble()
-            val otherPercentage = otherProcessed.toDouble() / otherExpected.toDouble()
-
-            // Foundry with lower percentage gets higher priority (lower value)
-            val percentageDiff = thisPercentage.compareTo(otherPercentage)
-
-            // If same percentage, use submission time as tiebreaker
-            return if (percentageDiff != 0) percentageDiff else submissionTime.compareTo(other.submissionTime)
+            // Final tiebreaker: submission time
+            return submissionTime.compareTo(other.submissionTime)
         }
 
         override fun run() {
@@ -383,6 +379,16 @@
     // Single priority-based executor for all entry processing
     private var entryExecutor: java.util.concurrent.ExecutorService? = null
 
+    // Extract text ID from ZIP entry path (e.g., "ZGE24/JAN/00001/base/data.xml" -> "ZGE24_JAN.00001")
+    private fun getTextIdFromPath(path: String): String {
+        val parts = path.split('/')
+        return if (parts.size >= 3) {
+            "${parts[0]}_${parts[1]}.${parts[2]}"
+        } else {
+            parts[0]  // Fallback to first component
+        }
+    }
+
     val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
     val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
     val tokens: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
@@ -498,8 +504,18 @@
         }
 
         // Initialize priority-based entry executor
-        // Tasks for foundries with fewer processed texts get higher priority
-        val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>()
+        // Tasks are scheduled based on text ID - foundry with smallest text ID gets priority
+        val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>(
+            11,  // initial capacity
+            Comparator { r1, r2 ->
+                when {
+                    r1 is PrioritizedTask && r2 is PrioritizedTask -> r1.compareTo(r2)
+                    r1 is PrioritizedTask -> -1  // Prioritized tasks go first
+                    r2 is PrioritizedTask -> 1
+                    else -> 0  // Equal priority for non-prioritized tasks
+                }
+            }
+        )
         entryExecutor = java.util.concurrent.ThreadPoolExecutor(
             maxThreads,
             maxThreads,
@@ -507,7 +523,7 @@
             java.util.concurrent.TimeUnit.MILLISECONDS,
             priorityQueue
         )
-        LOGGER.info("Initialized priority-based entry executor with $maxThreads threads (foundries with lower completion percentage get priority)")
+        LOGGER.info("Initialized watermark-based entry executor with $maxThreads threads (foundries scheduled by text ID to progress together)")
 
         // Initialize TAR output for krill format
         if (outputFormat == OutputFormat.KRILL) {
@@ -997,10 +1013,10 @@
                     foundry  // Keep original foundry for non-krill formats
                 }
                 ApacheZipFile(File(zip)).use { zipFile ->
-                    // Iterate entries in a deterministic order to keep related files close together
+                    // Iterate entries sorted by text ID to ensure consistent processing order
                     zipFile.entries.toList()
                         .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
-                        .sortedBy { it.name }
+                        .sortedBy { getTextIdFromPath(it.name) }
                         .forEach { zipEntry ->
                             processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
                         }
@@ -1010,7 +1026,7 @@
             ApacheZipFile(File(zipFilePath)).use { zipFile ->
                 zipFile.entries.toList()
                     .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
-                    .sortedBy { it.name }
+                    .sortedBy { getTextIdFromPath(it.name) }
                     .forEach { zipEntry ->
                         processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
                     }
@@ -1078,11 +1094,11 @@
         LOGGER.fine("Collected ${entries.size} entries from ZIP, foundry=$foundry")
         if (entries.isEmpty()) return
 
-        // Sort entries by text ID (first path component) to ensure texts complete as early as possible
+        // Sort entries by text ID to ensure texts complete as early as possible
         // This is crucial for incremental output - all ZIPs will process texts in the same order
         entries.sortBy { entry ->
-            // Extract text ID from path like "TEXT.ID/layer/file.xml"
-            entry.name.substringBefore('/')
+            // Extract text ID from path like "ZGE24/JAN/00001/base/data.xml" -> "ZGE24_JAN.00001"
+            getTextIdFromPath(entry.name)
         }
         LOGGER.fine("Sorted entries by text ID for incremental processing")
 
@@ -1121,33 +1137,42 @@
             return
         }
 
-        // Submit all entry tasks with priority based on foundry progress
-        val latch = java.util.concurrent.CountDownLatch(entries.size)
-        LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries with foundry=$foundry")
+        // Group entries by text ID to ensure all files for a text are processed together
+        val entriesByTextId = entries.groupBy { getTextIdFromPath(it.name) }
+        val textIds = entriesByTextId.keys.sorted()  // Process text IDs in lexicographic order
 
-        // Initialize counter for this foundry if not exists
-        processedTextsPerFoundry.putIfAbsent(foundry, java.util.concurrent.atomic.AtomicInteger(0))
+        LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries (${textIds.size} texts) with foundry=$foundry")
 
-        // Map foundry to ZIP path for percentage-based priority calculation
-        foundryToZipPath.putIfAbsent(foundry, zipPath)
-
-        // Log current foundry progress for debugging
-        val foundryStats = processedTextsPerFoundry.entries.sortedBy { it.key }.joinToString(", ") { entry ->
-            val expected = foundryToZipPath[entry.key]?.let { zipInventory[it]?.size ?: 1 } ?: 1
-            val processed = entry.value.get()
-            val percentage = (processed.toDouble() / expected.toDouble() * 100).toInt()
-            "${entry.key}=$processed/$expected (${percentage}%)"
-        }
-        if (foundryStats.isNotEmpty()) {
-            LOGGER.fine("Foundry progress before submitting $foundry entries: $foundryStats")
+        // Initialize watermark for this foundry if not exists (set to first text ID)
+        if (!foundryWatermarks.containsKey(foundry) && textIds.isNotEmpty()) {
+            foundryWatermarks.putIfAbsent(foundry, textIds.first())
+            LOGGER.fine("Initialized watermark for $foundry to ${textIds.first()}")
         }
 
-        entries.forEach { entry ->
-            val prioritizedTask = PrioritizedTask(foundry, Runnable {
+        // Log current foundry watermarks for debugging
+        val watermarkStats = foundryWatermarks.entries.sortedBy { it.key }.joinToString(", ") { entry ->
+            "${entry.key}=${entry.value}"
+        }
+        if (watermarkStats.isNotEmpty()) {
+            LOGGER.fine("Foundry watermarks before submitting $foundry entries: $watermarkStats")
+        }
+
+        // Submit one task per text ID (each task processes all entries for that text)
+        val latch = java.util.concurrent.CountDownLatch(textIds.size)
+        textIds.forEach { textId ->
+            val textEntries = entriesByTextId[textId] ?: emptyList()
+
+            val prioritizedTask = PrioritizedTask(foundry, textId, Runnable {
                 try {
-                    processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+                    // Process all entries for this text ID sequentially
+                    textEntries.forEach { entry ->
+                        processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+                    }
+
+                    // Update watermark after completing this text
+                    foundryWatermarks[foundry] = textId
                 } catch (t: Throwable) {
-                    LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
+                    LOGGER.warning("Failed to process text $textId: ${t.message}")
                 } finally {
                     latch.countDown()
                 }
@@ -1341,8 +1366,6 @@
                 if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
                     // Mark this text as processed from this ZIP (writer thread will scan periodically)
                     processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
-                    // Increment foundry counter for priority-based scheduling
-                    processedTextsPerFoundry.getOrPut(foundry) { java.util.concurrent.atomic.AtomicInteger(0) }.incrementAndGet()
                 }
 
                 val morphoRequired = when {