Switch to priority based scheduling
Change-Id: I23ec16a180f3da344f786311fbbe5b376eea581c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index f030d2a..7789bfd 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -337,7 +337,50 @@
private val LOGGER: Logger = Logger.getLogger(KorapXmlTool::class.java.name)
private var annotationWorkerPool : AnnotationWorkerPool? = null
- // Shared executor for entry-level parallelism across all zips
+
+ // Track processed text count per foundry for priority scheduling
+ private val processedTextsPerFoundry: ConcurrentHashMap<String, java.util.concurrent.atomic.AtomicInteger> = ConcurrentHashMap()
+
+ // Map foundry to ZIP path for calculating expected text counts
+ private val foundryToZipPath: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+
+ // Priority-based task for foundry-aware scheduling
+ private inner class PrioritizedTask(
+ val foundry: String,
+ val task: Runnable,
+ val submissionTime: Long = System.nanoTime()
+ ) : Comparable<PrioritizedTask>, Runnable {
+ override fun compareTo(other: PrioritizedTask): Int {
+ // Priority is based on percentage of texts processed, not absolute count
+ // This handles sparse foundries correctly (e.g., cmc foundry only has texts with emojis)
+
+ // Get expected text count for each foundry from zipInventory
+ val thisZipPath = foundryToZipPath[foundry]
+ val otherZipPath = foundryToZipPath[other.foundry]
+
+ val thisExpected = thisZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+ val otherExpected = otherZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+
+ val thisProcessed = processedTextsPerFoundry[foundry]?.get() ?: 0
+ val otherProcessed = processedTextsPerFoundry[other.foundry]?.get() ?: 0
+
+ // Calculate percentage (as double to avoid integer division)
+ val thisPercentage = thisProcessed.toDouble() / thisExpected.toDouble()
+ val otherPercentage = otherProcessed.toDouble() / otherExpected.toDouble()
+
+ // Foundry with lower percentage gets higher priority (lower value)
+ val percentageDiff = thisPercentage.compareTo(otherPercentage)
+
+ // If same percentage, use submission time as tiebreaker
+ return if (percentageDiff != 0) percentageDiff else submissionTime.compareTo(other.submissionTime)
+ }
+
+ override fun run() {
+ task.run()
+ }
+ }
+
+ // Single priority-based executor for all entry processing
private var entryExecutor: java.util.concurrent.ExecutorService? = null
val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
@@ -454,8 +497,17 @@
outputTexts.clear()
}
- // Initialize shared entry executor (used inside each zip)
- entryExecutor = Executors.newFixedThreadPool(maxThreads)
+ // Initialize priority-based entry executor
+ // Tasks for foundries with fewer processed texts get higher priority
+ val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>()
+ entryExecutor = java.util.concurrent.ThreadPoolExecutor(
+ maxThreads,
+ maxThreads,
+ 0L,
+ java.util.concurrent.TimeUnit.MILLISECONDS,
+ priorityQueue
+ )
+ LOGGER.info("Initialized priority-based entry executor with $maxThreads threads (foundries with lower completion percentage get priority)")
// Initialize TAR output for krill format
if (outputFormat == OutputFormat.KRILL) {
@@ -656,7 +708,7 @@
if (entryExecutor != null) {
val terminated = entryExecutor!!.awaitTermination(7, java.util.concurrent.TimeUnit.DAYS)
if (!terminated) {
- LOGGER.warning("Entry executor did not terminate within timeout; proceeding to close worker pool.")
+ LOGGER.warning("Entry executor did not terminate within timeout")
}
}
} catch (ie: InterruptedException) {
@@ -904,6 +956,7 @@
} else {
LOGGER.fine("Opening ZipFile for processing: $zipFilePath")
try {
+ // If no corresponding base ZIP exists, this IS the base ZIP
ApacheZipFile(File(zipFilePath)).use { zipFile ->
LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
@@ -1068,11 +1121,29 @@
return
}
- // Submit all entry tasks to the shared executor and await completion before closing the zip
+ // Submit all entry tasks with priority based on foundry progress
val latch = java.util.concurrent.CountDownLatch(entries.size)
LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries with foundry=$foundry")
+
+ // Initialize counter for this foundry if not exists
+ processedTextsPerFoundry.putIfAbsent(foundry, java.util.concurrent.atomic.AtomicInteger(0))
+
+ // Map foundry to ZIP path for percentage-based priority calculation
+ foundryToZipPath.putIfAbsent(foundry, zipPath)
+
+ // Log current foundry progress for debugging
+ val foundryStats = processedTextsPerFoundry.entries.sortedBy { it.key }.joinToString(", ") { entry ->
+ val expected = foundryToZipPath[entry.key]?.let { zipInventory[it]?.size ?: 1 } ?: 1
+ val processed = entry.value.get()
+ val percentage = (processed.toDouble() / expected.toDouble() * 100).toInt()
+ "${entry.key}=$processed/$expected (${percentage}%)"
+ }
+ if (foundryStats.isNotEmpty()) {
+ LOGGER.fine("Foundry progress before submitting $foundry entries: $foundryStats")
+ }
+
entries.forEach { entry ->
- entryExecutor?.execute {
+ val prioritizedTask = PrioritizedTask(foundry, Runnable {
try {
processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
} catch (t: Throwable) {
@@ -1080,7 +1151,8 @@
} finally {
latch.countDown()
}
- }
+ })
+ entryExecutor?.execute(prioritizedTask)
}
try {
latch.await()
@@ -1269,6 +1341,8 @@
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
// Mark this text as processed from this ZIP (writer thread will scan periodically)
processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
+ // Increment foundry counter for priority-based scheduling
+ processedTextsPerFoundry.getOrPut(foundry) { java.util.concurrent.atomic.AtomicInteger(0) }.incrementAndGet()
}
val morphoRequired = when {