Switch to priority based scheduling

Change-Id: I23ec16a180f3da344f786311fbbe5b376eea581c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index f030d2a..7789bfd 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -337,7 +337,50 @@
     private val LOGGER: Logger = Logger.getLogger(KorapXmlTool::class.java.name)
 
     private var annotationWorkerPool : AnnotationWorkerPool? = null
-    // Shared executor for entry-level parallelism across all zips
+
+    // Track processed text count per foundry for priority scheduling
+    private val processedTextsPerFoundry: ConcurrentHashMap<String, java.util.concurrent.atomic.AtomicInteger> = ConcurrentHashMap()
+
+    // Map foundry to ZIP path for calculating expected text counts
+    private val foundryToZipPath: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+
+    // Priority-based task for foundry-aware scheduling
+    private inner class PrioritizedTask(
+        val foundry: String,
+        val task: Runnable,
+        val submissionTime: Long = System.nanoTime()
+    ) : Comparable<PrioritizedTask>, Runnable {
+        override fun compareTo(other: PrioritizedTask): Int {
+            // Priority is based on percentage of texts processed, not absolute count
+            // This handles sparse foundries correctly (e.g., cmc foundry only has texts with emojis)
+
+            // Get expected text count for each foundry from zipInventory
+            val thisZipPath = foundryToZipPath[foundry]
+            val otherZipPath = foundryToZipPath[other.foundry]
+
+            val thisExpected = thisZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+            val otherExpected = otherZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+
+            val thisProcessed = processedTextsPerFoundry[foundry]?.get() ?: 0
+            val otherProcessed = processedTextsPerFoundry[other.foundry]?.get() ?: 0
+
+            // Calculate percentage (as double to avoid integer division)
+            val thisPercentage = thisProcessed.toDouble() / thisExpected.toDouble()
+            val otherPercentage = otherProcessed.toDouble() / otherExpected.toDouble()
+
+            // Foundry with lower percentage gets higher priority (lower value)
+            val percentageDiff = thisPercentage.compareTo(otherPercentage)
+
+            // If same percentage, use submission time as tiebreaker
+            return if (percentageDiff != 0) percentageDiff else submissionTime.compareTo(other.submissionTime)
+        }
+
+        override fun run() {
+            task.run()
+        }
+    }
+
+    // Single priority-based executor for all entry processing
     private var entryExecutor: java.util.concurrent.ExecutorService? = null
 
     val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
@@ -454,8 +497,17 @@
             outputTexts.clear()
         }
 
-        // Initialize shared entry executor (used inside each zip)
-        entryExecutor = Executors.newFixedThreadPool(maxThreads)
+        // Initialize priority-based entry executor
+        // Tasks for foundries with fewer processed texts get higher priority
+        val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>()
+        entryExecutor = java.util.concurrent.ThreadPoolExecutor(
+            maxThreads,
+            maxThreads,
+            0L,
+            java.util.concurrent.TimeUnit.MILLISECONDS,
+            priorityQueue
+        )
+        LOGGER.info("Initialized priority-based entry executor with $maxThreads threads (foundries with lower completion percentage get priority)")
 
         // Initialize TAR output for krill format
         if (outputFormat == OutputFormat.KRILL) {
@@ -656,7 +708,7 @@
             if (entryExecutor != null) {
                 val terminated = entryExecutor!!.awaitTermination(7, java.util.concurrent.TimeUnit.DAYS)
                 if (!terminated) {
-                    LOGGER.warning("Entry executor did not terminate within timeout; proceeding to close worker pool.")
+                    LOGGER.warning("Entry executor did not terminate within timeout")
                 }
             }
         } catch (ie: InterruptedException) {
@@ -904,6 +956,7 @@
         } else {
             LOGGER.fine("Opening ZipFile for processing: $zipFilePath")
             try {
+                // If no corresponding base ZIP exists, this IS the base ZIP
                 ApacheZipFile(File(zipFilePath)).use { zipFile ->
                     LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
                     processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
@@ -1068,11 +1121,29 @@
             return
         }
 
-        // Submit all entry tasks to the shared executor and await completion before closing the zip
+        // Submit all entry tasks with priority based on foundry progress
         val latch = java.util.concurrent.CountDownLatch(entries.size)
         LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries with foundry=$foundry")
+
+        // Initialize counter for this foundry if not exists
+        processedTextsPerFoundry.putIfAbsent(foundry, java.util.concurrent.atomic.AtomicInteger(0))
+
+        // Map foundry to ZIP path for percentage-based priority calculation
+        foundryToZipPath.putIfAbsent(foundry, zipPath)
+
+        // Log current foundry progress for debugging
+        val foundryStats = processedTextsPerFoundry.entries.sortedBy { it.key }.joinToString(", ") { entry ->
+            val expected = foundryToZipPath[entry.key]?.let { zipInventory[it]?.size ?: 1 } ?: 1
+            val processed = entry.value.get()
+            val percentage = (processed.toDouble() / expected.toDouble() * 100).toInt()
+            "${entry.key}=$processed/$expected (${percentage}%)"
+        }
+        if (foundryStats.isNotEmpty()) {
+            LOGGER.fine("Foundry progress before submitting $foundry entries: $foundryStats")
+        }
+
         entries.forEach { entry ->
-            entryExecutor?.execute {
+            val prioritizedTask = PrioritizedTask(foundry, Runnable {
                 try {
                     processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
                 } catch (t: Throwable) {
@@ -1080,7 +1151,8 @@
                 } finally {
                     latch.countDown()
                 }
-            }
+            })
+            entryExecutor?.execute(prioritizedTask)
         }
         try {
             latch.await()
@@ -1269,6 +1341,8 @@
                 if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
                     // Mark this text as processed from this ZIP (writer thread will scan periodically)
                     processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
+                    // Increment foundry counter for priority-based scheduling
+                    processedTextsPerFoundry.getOrPut(foundry) { java.util.concurrent.atomic.AtomicInteger(0) }.incrementAndGet()
                 }
 
                 val morphoRequired = when {