Make sure alle entries are processed in sorted order
Change-Id: I3f1f171533a0b8962436c5a8259848178b9850dd
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 7789bfd..65e83cc 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -338,41 +338,37 @@
private var annotationWorkerPool : AnnotationWorkerPool? = null
- // Track processed text count per foundry for priority scheduling
- private val processedTextsPerFoundry: ConcurrentHashMap<String, java.util.concurrent.atomic.AtomicInteger> = ConcurrentHashMap()
-
- // Map foundry to ZIP path for calculating expected text counts
- private val foundryToZipPath: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+ // Track the next text ID (watermark) each foundry needs to process for priority scheduling
+ // The foundry with the lexicographically smallest next text ID gets priority
+ private val foundryWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
// Priority-based task for foundry-aware scheduling
private inner class PrioritizedTask(
val foundry: String,
+ val textId: String, // The text ID this task will process
val task: Runnable,
val submissionTime: Long = System.nanoTime()
) : Comparable<PrioritizedTask>, Runnable {
override fun compareTo(other: PrioritizedTask): Int {
- // Priority is based on percentage of texts processed, not absolute count
- // This handles sparse foundries correctly (e.g., cmc foundry only has texts with emojis)
+ // Priority is based on text ID comparison
+ // Tasks with lexicographically smaller text IDs get higher priority
+ // This keeps all foundries progressing through the corpus together
+ // and handles sparse foundries naturally (they won't block on non-existent texts)
- // Get expected text count for each foundry from zipInventory
- val thisZipPath = foundryToZipPath[foundry]
- val otherZipPath = foundryToZipPath[other.foundry]
+ // First, compare text IDs lexicographically
+ val textIdDiff = textId.compareTo(other.textId)
+ if (textIdDiff != 0) return textIdDiff
- val thisExpected = thisZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
- val otherExpected = otherZipPath?.let { zipInventory[it]?.size ?: 1 } ?: 1
+ // If same text ID, prefer base foundry (it should be processed first)
+ if (foundry == "base" && other.foundry != "base") return -1
+ if (foundry != "base" && other.foundry == "base") return 1
- val thisProcessed = processedTextsPerFoundry[foundry]?.get() ?: 0
- val otherProcessed = processedTextsPerFoundry[other.foundry]?.get() ?: 0
+ // If same text ID and both base or both non-base, use foundry name
+ val foundryDiff = foundry.compareTo(other.foundry)
+ if (foundryDiff != 0) return foundryDiff
- // Calculate percentage (as double to avoid integer division)
- val thisPercentage = thisProcessed.toDouble() / thisExpected.toDouble()
- val otherPercentage = otherProcessed.toDouble() / otherExpected.toDouble()
-
- // Foundry with lower percentage gets higher priority (lower value)
- val percentageDiff = thisPercentage.compareTo(otherPercentage)
-
- // If same percentage, use submission time as tiebreaker
- return if (percentageDiff != 0) percentageDiff else submissionTime.compareTo(other.submissionTime)
+ // Final tiebreaker: submission time
+ return submissionTime.compareTo(other.submissionTime)
}
override fun run() {
@@ -383,6 +379,16 @@
// Single priority-based executor for all entry processing
private var entryExecutor: java.util.concurrent.ExecutorService? = null
+ // Extract text ID from ZIP entry path (e.g., "ZGE24/JAN/00001/base/data.xml" -> "ZGE24_JAN.00001")
+ private fun getTextIdFromPath(path: String): String {
+ val parts = path.split('/')
+ return if (parts.size >= 3) {
+ "${parts[0]}_${parts[1]}.${parts[2]}"
+ } else {
+ parts[0] // Fallback to first component
+ }
+ }
+
val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
val tokens: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
@@ -498,8 +504,18 @@
}
// Initialize priority-based entry executor
- // Tasks for foundries with fewer processed texts get higher priority
- val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>()
+ // Tasks are scheduled based on text ID - foundry with smallest text ID gets priority
+ val priorityQueue = java.util.concurrent.PriorityBlockingQueue<Runnable>(
+ 11, // initial capacity
+ Comparator { r1, r2 ->
+ when {
+ r1 is PrioritizedTask && r2 is PrioritizedTask -> r1.compareTo(r2)
+ r1 is PrioritizedTask -> -1 // Prioritized tasks go first
+ r2 is PrioritizedTask -> 1
+ else -> 0 // Equal priority for non-prioritized tasks
+ }
+ }
+ )
entryExecutor = java.util.concurrent.ThreadPoolExecutor(
maxThreads,
maxThreads,
@@ -507,7 +523,7 @@
java.util.concurrent.TimeUnit.MILLISECONDS,
priorityQueue
)
- LOGGER.info("Initialized priority-based entry executor with $maxThreads threads (foundries with lower completion percentage get priority)")
+ LOGGER.info("Initialized watermark-based entry executor with $maxThreads threads (foundries scheduled by text ID to progress together)")
// Initialize TAR output for krill format
if (outputFormat == OutputFormat.KRILL) {
@@ -997,10 +1013,10 @@
foundry // Keep original foundry for non-krill formats
}
ApacheZipFile(File(zip)).use { zipFile ->
- // Iterate entries in a deterministic order to keep related files close together
+ // Iterate entries sorted by text ID to ensure consistent processing order
zipFile.entries.toList()
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
- .sortedBy { it.name }
+ .sortedBy { getTextIdFromPath(it.name) }
.forEach { zipEntry ->
processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
}
@@ -1010,7 +1026,7 @@
ApacheZipFile(File(zipFilePath)).use { zipFile ->
zipFile.entries.toList()
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
- .sortedBy { it.name }
+ .sortedBy { getTextIdFromPath(it.name) }
.forEach { zipEntry ->
processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
}
@@ -1078,11 +1094,11 @@
LOGGER.fine("Collected ${entries.size} entries from ZIP, foundry=$foundry")
if (entries.isEmpty()) return
- // Sort entries by text ID (first path component) to ensure texts complete as early as possible
+ // Sort entries by text ID to ensure texts complete as early as possible
// This is crucial for incremental output - all ZIPs will process texts in the same order
entries.sortBy { entry ->
- // Extract text ID from path like "TEXT.ID/layer/file.xml"
- entry.name.substringBefore('/')
+ // Extract text ID from path like "ZGE24/JAN/00001/base/data.xml" -> "ZGE24_JAN.00001"
+ getTextIdFromPath(entry.name)
}
LOGGER.fine("Sorted entries by text ID for incremental processing")
@@ -1121,33 +1137,42 @@
return
}
- // Submit all entry tasks with priority based on foundry progress
- val latch = java.util.concurrent.CountDownLatch(entries.size)
- LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries with foundry=$foundry")
+ // Group entries by text ID to ensure all files for a text are processed together
+ val entriesByTextId = entries.groupBy { getTextIdFromPath(it.name) }
+ val textIds = entriesByTextId.keys.sorted() // Process text IDs in lexicographic order
- // Initialize counter for this foundry if not exists
- processedTextsPerFoundry.putIfAbsent(foundry, java.util.concurrent.atomic.AtomicInteger(0))
+ LOGGER.info("processZipEntriesWithPool: processing ${entries.size} entries (${textIds.size} texts) with foundry=$foundry")
- // Map foundry to ZIP path for percentage-based priority calculation
- foundryToZipPath.putIfAbsent(foundry, zipPath)
-
- // Log current foundry progress for debugging
- val foundryStats = processedTextsPerFoundry.entries.sortedBy { it.key }.joinToString(", ") { entry ->
- val expected = foundryToZipPath[entry.key]?.let { zipInventory[it]?.size ?: 1 } ?: 1
- val processed = entry.value.get()
- val percentage = (processed.toDouble() / expected.toDouble() * 100).toInt()
- "${entry.key}=$processed/$expected (${percentage}%)"
- }
- if (foundryStats.isNotEmpty()) {
- LOGGER.fine("Foundry progress before submitting $foundry entries: $foundryStats")
+ // Initialize watermark for this foundry if not exists (set to first text ID)
+ if (!foundryWatermarks.containsKey(foundry) && textIds.isNotEmpty()) {
+ foundryWatermarks.putIfAbsent(foundry, textIds.first())
+ LOGGER.fine("Initialized watermark for $foundry to ${textIds.first()}")
}
- entries.forEach { entry ->
- val prioritizedTask = PrioritizedTask(foundry, Runnable {
+ // Log current foundry watermarks for debugging
+ val watermarkStats = foundryWatermarks.entries.sortedBy { it.key }.joinToString(", ") { entry ->
+ "${entry.key}=${entry.value}"
+ }
+ if (watermarkStats.isNotEmpty()) {
+ LOGGER.fine("Foundry watermarks before submitting $foundry entries: $watermarkStats")
+ }
+
+ // Submit one task per text ID (each task processes all entries for that text)
+ val latch = java.util.concurrent.CountDownLatch(textIds.size)
+ textIds.forEach { textId ->
+ val textEntries = entriesByTextId[textId] ?: emptyList()
+
+ val prioritizedTask = PrioritizedTask(foundry, textId, Runnable {
try {
- processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ // Process all entries for this text ID sequentially
+ textEntries.forEach { entry ->
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ }
+
+ // Update watermark after completing this text
+ foundryWatermarks[foundry] = textId
} catch (t: Throwable) {
- LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
+ LOGGER.warning("Failed to process text $textId: ${t.message}")
} finally {
latch.countDown()
}
@@ -1341,8 +1366,6 @@
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
// Mark this text as processed from this ZIP (writer thread will scan periodically)
processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
- // Increment foundry counter for priority-based scheduling
- processedTextsPerFoundry.getOrPut(foundry) { java.util.concurrent.atomic.AtomicInteger(0) }.incrementAndGet()
}
val morphoRequired = when {