Use inventory instead of watermark
Change-Id: Id1e4501a2130342d6fd529cecf8c402c5e31f5d2
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index b5c08f5..0395637 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -399,11 +399,30 @@
val processedFoundries: MutableSet<String> = mutableSetOf()
var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
- // Watermark-based incremental output: track max textId seen in each ZIP
- val zipWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
- val activeZips: ConcurrentHashMap<String, Boolean> = ConcurrentHashMap()
- val watermarkLock = Any()
- var lastOutputWatermark: String = ""
+ // Inventory-based incremental output
+ // Per-ZIP inventory: which texts each ZIP should contain
+ val zipInventory: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
+
+ // Track which texts have been processed from which ZIPs
+ val processedTextsPerZip: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
+
+ // Lock for synchronized output
+ val incrementalOutputLock = Any()
+
+ // Progress bar for incremental output
+ var incrementalProgressBar: ProgressBar? = null
+
+ // Track which texts have been output to avoid counting duplicates (thread-safe)
+ val outputTexts: MutableSet<String> = ConcurrentHashMap.newKeySet()
+
+ // Scheduled executor for periodic scanning
+ var incrementalOutputScheduler: java.util.concurrent.ScheduledExecutorService? = null
+
+ // Shutdown flag for writer thread
+ @Volatile var shutdownIncrementalWriter = false
+
+ // Flag to track if TAR stream is still open
+ @Volatile var tarStreamOpen = true
fun String.hasCorrespondingBaseZip(): Boolean {
if (!this.matches(Regex(".*\\.([^/.]+)\\.zip$"))) return false
@@ -427,9 +446,9 @@
krillData.clear()
corpusMetadata.clear()
docMetadata.clear()
- zipWatermarks.clear()
- activeZips.clear()
- lastOutputWatermark = ""
+ zipInventory.clear()
+ processedTextsPerZip.clear()
+ outputTexts.clear()
}
// Initialize shared entry executor (used inside each zip)
@@ -459,6 +478,7 @@
val fileOutputStream = FileOutputStream(krillOutputFileName!!)
krillTarOutputStream = TarArchiveOutputStream(fileOutputStream)
+ tarStreamOpen = true // Stream is now open
LOGGER.info("Initialized krill TAR output stream")
// Extract expected foundries from input ZIP filenames for incremental output
@@ -484,6 +504,28 @@
}
}
LOGGER.info("Expected foundries for Krill output: ${expectedFoundries.sorted()}")
+
+ // Build inventory of which texts exist in which ZIPs for incremental output
+ if (incrementalKrill) {
+ buildZipInventory(args)
+
+ // Initialize progress bar for incremental output
+ if (!quiet) {
+ val totalTexts = zipInventory.values.flatten().toSet().size
+ if (totalTexts > 0) {
+ incrementalProgressBar = ProgressBarBuilder()
+ .setTaskName("Processing texts")
+ .setInitialMax(totalTexts.toLong())
+ .setStyle(ProgressBarStyle.COLORFUL_UNICODE_BAR)
+ .setUpdateIntervalMillis(500)
+ .showSpeed()
+ .build()
+ }
+ }
+
+ // Start dedicated writer thread for incremental output
+ startIncrementalWriterThread()
+ }
}
if (annotateWith.isNotEmpty()) {
@@ -653,6 +695,12 @@
// Shutdown entry executor
entryExecutor?.shutdown()
+ // Stop incremental writer thread if running
+ if (incrementalKrill) {
+ stopIncrementalWriterThread()
+ // Keep incrementalProgressBar open - continue using it for remaining texts
+ }
+
// Finalize krill output: output any remaining incomplete texts and close TAR
if (outputFormat == OutputFormat.KRILL && krillTarOutputStream != null) {
try {
@@ -663,33 +711,41 @@
LOGGER.info("All texts already output incrementally ($krillOutputCount total)")
}
- // Initialize progress bar for remaining texts
- val krillProgressBar = if (!quiet && remainingCount > 0) {
- ProgressBarBuilder()
- .setTaskName("Remaining texts")
- .setInitialMax(remainingCount.toLong())
- .setStyle(ProgressBarStyle.COLORFUL_UNICODE_BAR)
- .setUpdateIntervalMillis(500)
- .build()
- } else null
+ // Continue using the same progress bar for remaining texts (no separate bar)
+ // Output remaining texts (these weren't output incrementally, possibly incomplete)
+ // Copy keys to avoid ConcurrentModificationException if scanner is still running
+ val remainingKeys = krillData.keys.sorted()
+ remainingKeys.forEach { textId ->
+ val textData = krillData.remove(textId) ?: return@forEach // Skip if already removed by scanner
+ val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
- try {
- // Output remaining texts (may be incomplete if not all ZIPs covered all texts)
- krillData.keys.sorted().forEach { textId ->
- val textData = krillData[textId]!!
- val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
- if (!textFoundries.containsAll(expectedFoundries)) {
- LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries} (expected: $expectedFoundries)")
- }
- outputKrillText(textId, textData)
- krillProgressBar?.step()
+ // Build expected foundries from inventory: which ZIPs contain this text?
+ val expectedForThisText = if (incrementalKrill) {
+ // Find which ZIPs should have contained this text and extract their foundries
+ zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ .map { zipPath -> getFoundryFromZipFileName(File(zipPath).name) }
+ .toSet()
+ } else {
+ expectedFoundries
}
- } finally {
- krillProgressBar?.close()
+
+ if (!textFoundries.containsAll(expectedForThisText)) {
+ LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+ }
+ outputKrillText(textId, textData)
+ // Continue stepping the same progress bar
+ incrementalProgressBar?.step()
}
+ // Set flag before closing stream to prevent scanner from trying to write
+ tarStreamOpen = false
+
krillTarOutputStream!!.finish()
krillTarOutputStream!!.close()
+
+ // Close incremental progress bar if it was initialized
+ incrementalProgressBar?.close()
+
LOGGER.info("Closed krill TAR file: $krillOutputFileName (total texts output: $krillOutputCount)")
} catch (e: Exception) {
LOGGER.severe("ERROR generating krill output: ${e.message}")
@@ -781,11 +837,6 @@
}
private fun processZipFile(zipFilePath: String, foundry: String = "base") {
- // Mark this ZIP as active for watermark-based incremental output
- if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
- activeZips[zipFilePath] = true
- }
-
val ord = zipOrdinals[zipFilePath] ?: 0
val size = zipSizes[zipFilePath] ?: 0L
LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -863,23 +914,13 @@
}
logZipProgress(zipFilePath)
- // For Krill format with incremental mode, check if any texts are now complete and output them
+ // Track foundry as processed
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
processedFoundries.add(foundry)
- checkAndOutputCompleteTexts()
-
- // Mark this ZIP as inactive for watermark-based incremental output
- activeZips.remove(zipFilePath)
- LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
}
}
private fun processZipFileSequentially(zipFilePath: String, foundry: String = "base") {
- // Mark this ZIP as active for watermark-based incremental output
- if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
- activeZips[zipFilePath] = true
- }
-
val ord = zipOrdinals[zipFilePath] ?: 0
val size = zipSizes[zipFilePath] ?: 0L
LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -915,14 +956,9 @@
}
logZipProgress(zipFilePath)
- // For Krill format with incremental mode, check if any texts are now complete and output them
+ // Track foundry as processed
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
processedFoundries.add(foundry)
- checkAndOutputCompleteTexts()
-
- // Mark this ZIP as inactive for watermark-based incremental output
- activeZips.remove(zipFilePath)
- LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
}
}
@@ -1212,9 +1248,10 @@
}
}
- // Advance watermark for incremental output (for Krill format with watermark-based output)
+ // Mark text as processed from this ZIP for incremental output
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
- advanceWatermarkAndOutput(zipPath, docId)
+ // Mark this text as processed from this ZIP (writer thread will scan periodically)
+ processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
}
val morphoRequired = when {
@@ -2248,6 +2285,9 @@
// Collect structural spans from structure.xml for krill format
private fun collectKrillStructureSpans(docId: String, spans: NodeList) {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
val textData = krillData.getOrPut(docId) {
KrillTextData(textId = docId)
}
@@ -2317,6 +2357,9 @@
// Collect rich metadata from header.xml for krill format
private fun collectKrillMetadata(docId: String, headerXml: String) {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
val textData = krillData.getOrPut(docId) {
KrillTextData(textId = docId)
}
@@ -2619,6 +2662,9 @@
// Collect base text data (text, tokens, sentences) for krill format
private fun collectKrillBaseData(docId: String) {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
LOGGER.info("Collecting krill base data for $docId: text=${texts[docId] != null}, tokens=${tokens[docId] != null}, sentences=${sentences[docId] != null}")
val textData = krillData.getOrPut(docId) {
@@ -2671,6 +2717,9 @@
// Collect morpho data directly from parsed data (for krill format, bypasses shared morpho map)
// This version takes the morpho data as a parameter to avoid contamination from other foundries
private fun collectKrillMorphoDataDirect(docId: String, foundry: String, morphoDataMap: MutableMap<String, MorphoSpan>, annotationType: String = "morpho") {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
LOGGER.info("Collecting krill $annotationType data (direct) for $docId, foundry=$foundry, morpho=${morphoDataMap.size}")
val textData = krillData.getOrPut(docId) {
@@ -2740,6 +2789,9 @@
// Collect morpho data from a specific foundry for krill format (OLD VERSION - reads from shared morpho map)
// annotationType: "morpho" = collect POS/lemma/features, "dependency" = collect head/deprel only
private fun collectKrillMorphoData(docId: String, foundry: String, annotationType: String = "morpho") {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
LOGGER.info("Collecting krill $annotationType data for $docId, foundry=$foundry, morpho=${morpho[docId]?.size ?: 0}")
val textData = krillData.getOrPut(docId) {
@@ -2817,6 +2869,9 @@
// Old collectKrillData - no longer used, kept for reference
private fun collectKrillData(docId: String, foundry: String) {
+ // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+ if (incrementalKrill && outputTexts.contains(docId)) return
+
LOGGER.info("Collecting krill data for $docId, foundry=$foundry, morpho=${morpho[docId]?.size ?: 0}")
// Get or create KrillTextData for this text
@@ -2864,67 +2919,160 @@
}
}
- // Check if a text has all expected foundries and output it if complete
- private fun checkAndOutputCompleteTexts() {
- if (outputFormat != OutputFormat.KRILL || krillTarOutputStream == null) return
+ // Start timer-based scanner for incremental output
+ private fun startIncrementalWriterThread() {
+ if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
- val textsToOutput = mutableListOf<String>()
+ shutdownIncrementalWriter = false
+ incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
+ Thread(r, "KrillWriterThread")
+ }
- // Find texts that have all expected foundries
- krillData.forEach { (textId, textData) ->
- val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
- if (textFoundries.containsAll(expectedFoundries)) {
- textsToOutput.add(textId)
+ // Scan for complete texts every 500ms
+ incrementalOutputScheduler?.scheduleAtFixedRate({
+ try {
+ scanAndOutputCompleteTexts()
+ } catch (e: Exception) {
+ LOGGER.severe("Error in incremental writer: ${e.message}")
+ e.printStackTrace()
+ }
+ }, 500, 500, java.util.concurrent.TimeUnit.MILLISECONDS)
+
+ LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
+ }
+
+ // Scan all texts and output any that are complete
+ private fun scanAndOutputCompleteTexts() {
+ if (shutdownIncrementalWriter || !tarStreamOpen) return
+
+ // Get all texts that we know about (from zipInventory)
+ val allTexts = zipInventory.values.flatten().toSet()
+
+ var outputCount = 0
+ for (textId in allTexts) {
+ // Check again if stream is still open (may have been closed during iteration)
+ if (!tarStreamOpen) break
+
+ // Skip if already output
+ if (outputTexts.contains(textId)) continue
+
+ // Find all ZIPs that should contain this text
+ val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+
+ // Check if all relevant ZIPs have processed this text
+ val allProcessed = relevantZips.all { path ->
+ processedTextsPerZip[path]?.contains(textId) == true
+ }
+
+ if (allProcessed && krillData.containsKey(textId)) {
+ // Atomically claim this text for output (add returns false if already present)
+ if (outputTexts.add(textId)) {
+ // We successfully claimed it - output now
+ val textData = krillData.remove(textId)
+ if (textData != null) {
+ // Double-check stream is still open before writing
+ if (tarStreamOpen) {
+ try {
+ outputKrillText(textId, textData)
+ incrementalProgressBar?.step()
+ outputCount++
+ LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
+ } catch (e: IOException) {
+ // Stream may have been closed - stop trying to output
+ LOGGER.warning("Cannot output text $textId: stream closed")
+ tarStreamOpen = false
+ break
+ }
+ }
+ }
+
+ // Clean up tracking data for this text
+ relevantZips.forEach { path ->
+ zipInventory[path]?.remove(textId)
+ processedTextsPerZip[path]?.remove(textId)
+ }
+ }
}
}
- // Output and remove complete texts
- textsToOutput.sorted().forEach { textId ->
- val textData = krillData.remove(textId)
- if (textData != null) {
- outputKrillText(textId, textData)
- }
- }
-
- if (textsToOutput.isNotEmpty()) {
- LOGGER.info("Output ${textsToOutput.size} complete texts (${krillData.size} still pending)")
+ if (outputCount > 0) {
+ LOGGER.fine("Batch output: $outputCount texts (${krillData.size} still pending)")
}
}
- // Watermark-based incremental output: output all texts below the global watermark
- // Watermark = minimum of max textIds seen across all active ZIPs
- // This ensures no more annotations will arrive for texts below the watermark
- private fun advanceWatermarkAndOutput(zipPath: String, currentTextId: String) {
- if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
+ // Stop the incremental writer thread
+ private fun stopIncrementalWriterThread() {
+ if (incrementalOutputScheduler != null) {
+ LOGGER.info("Stopping incremental writer scheduler")
+ shutdownIncrementalWriter = true
- synchronized(watermarkLock) {
- // Update this ZIP's watermark
- zipWatermarks[zipPath] = currentTextId
+ // Do one final scan before shutting down
+ scanAndOutputCompleteTexts()
- // Calculate global watermark (minimum across all active ZIPs)
- val activeWatermarks = activeZips.keys.mapNotNull { zipWatermarks[it] }
- if (activeWatermarks.isEmpty()) return
+ // Use shutdownNow() to cancel any scheduled tasks immediately
+ incrementalOutputScheduler?.shutdownNow()
+ try {
+ if (!incrementalOutputScheduler!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
+ LOGGER.warning("Writer scheduler did not terminate within timeout")
+ }
+ } catch (e: InterruptedException) {
+ LOGGER.warning("Interrupted while stopping writer scheduler")
+ Thread.currentThread().interrupt()
+ }
+ incrementalOutputScheduler = null
+ }
+ }
- val globalWatermark = activeWatermarks.minOrNull() ?: return
+ // Build per-ZIP inventory of which texts each ZIP contains
+ // This allows us to know when a text has been processed by all ZIPs that should contain it
+ private fun buildZipInventory(zipPaths: Array<String>) {
+ LOGGER.info("Building per-ZIP inventory to track text completeness...")
+ zipInventory.clear()
- // Output all texts strictly less than the global watermark
- if (globalWatermark > lastOutputWatermark) {
- val textsToOutput = krillData.keys.filter { it < globalWatermark }.sorted()
+ val dbFactory = DocumentBuilderFactory.newInstance()
+ val dBuilder = dbFactory.newDocumentBuilder()
- textsToOutput.forEach { textId ->
- val textData = krillData.remove(textId)
- if (textData != null) {
- outputKrillText(textId, textData)
+ zipPaths.forEach { zipPath ->
+ val textsInThisZip = mutableSetOf<String>()
+ LOGGER.info("Scanning $zipPath...")
+
+ try {
+ ApacheZipFile(File(zipPath)).use { zipFile ->
+ val entries = zipFile.entries
+ while (entries.hasMoreElements()) {
+ val entry = entries.nextElement()
+ // Look for data.xml or tokens.xml to identify texts
+ if (entry.name.matches(Regex(".*(data|tokens)\\.xml$"))) {
+ try {
+ // Parse XML to extract docId attribute
+ val doc = zipFile.getInputStream(entry).use { inputStream ->
+ XMLCommentFilterReader(inputStream, "UTF-8").use { reader ->
+ dBuilder.parse(InputSource(reader))
+ }
+ }
+ doc.documentElement.normalize()
+ val docId = doc.documentElement.getAttribute("docid")
+ if (docId.isNotEmpty()) {
+ textsInThisZip.add(docId)
+ }
+ } catch (e: Exception) {
+ // Skip entries that can't be parsed
+ LOGGER.fine("Skipped entry ${entry.name}: ${e.message}")
+ }
+ }
}
}
-
- if (textsToOutput.isNotEmpty()) {
- LOGGER.info("Watermark advanced to $globalWatermark: output ${textsToOutput.size} texts (${krillData.size} still pending)")
- }
-
- lastOutputWatermark = globalWatermark
+ zipInventory[zipPath] = textsInThisZip
+ LOGGER.info(" $zipPath contains ${textsInThisZip.size} texts")
+ } catch (e: Exception) {
+ LOGGER.warning("Failed to scan $zipPath: ${e.message}")
}
}
+
+ LOGGER.info("ZIP inventory built: ${zipPaths.size} ZIPs scanned")
+ // Calculate total unique texts
+ val allTexts = zipInventory.values.flatten().toSet()
+ LOGGER.info(" Total unique texts across all ZIPs: ${allTexts.size}")
}
// Output a single text to Krill TAR (thread-safe)