Use inventory instead of watermark

Change-Id: Id1e4501a2130342d6fd529cecf8c402c5e31f5d2
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index b5c08f5..0395637 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -399,11 +399,30 @@
     val processedFoundries: MutableSet<String> = mutableSetOf()
     var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
 
-    // Watermark-based incremental output: track max textId seen in each ZIP
-    val zipWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
-    val activeZips: ConcurrentHashMap<String, Boolean> = ConcurrentHashMap()
-    val watermarkLock = Any()
-    var lastOutputWatermark: String = ""
+    // Inventory-based incremental output
+    // Per-ZIP inventory: which texts each ZIP should contain
+    val zipInventory: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
+
+    // Track which texts have been processed from which ZIPs
+    val processedTextsPerZip: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
+
+    // Lock for synchronized output
+    val incrementalOutputLock = Any()
+
+    // Progress bar for incremental output
+    var incrementalProgressBar: ProgressBar? = null
+
+    // Track which texts have been output to avoid counting duplicates (thread-safe)
+    val outputTexts: MutableSet<String> = ConcurrentHashMap.newKeySet()
+
+    // Scheduled executor for periodic scanning
+    var incrementalOutputScheduler: java.util.concurrent.ScheduledExecutorService? = null
+
+    // Shutdown flag for writer thread
+    @Volatile var shutdownIncrementalWriter = false
+
+    // Flag to track if TAR stream is still open
+    @Volatile var tarStreamOpen = true
 
     fun String.hasCorrespondingBaseZip(): Boolean {
         if (!this.matches(Regex(".*\\.([^/.]+)\\.zip$"))) return false
@@ -427,9 +446,9 @@
             krillData.clear()
             corpusMetadata.clear()
             docMetadata.clear()
-            zipWatermarks.clear()
-            activeZips.clear()
-            lastOutputWatermark = ""
+            zipInventory.clear()
+            processedTextsPerZip.clear()
+            outputTexts.clear()
         }
 
         // Initialize shared entry executor (used inside each zip)
@@ -459,6 +478,7 @@
 
             val fileOutputStream = FileOutputStream(krillOutputFileName!!)
             krillTarOutputStream = TarArchiveOutputStream(fileOutputStream)
+            tarStreamOpen = true  // Stream is now open
             LOGGER.info("Initialized krill TAR output stream")
 
             // Extract expected foundries from input ZIP filenames for incremental output
@@ -484,6 +504,28 @@
                 }
             }
             LOGGER.info("Expected foundries for Krill output: ${expectedFoundries.sorted()}")
+
+            // Build inventory of which texts exist in which ZIPs for incremental output
+            if (incrementalKrill) {
+                buildZipInventory(args)
+
+                // Initialize progress bar for incremental output
+                if (!quiet) {
+                    val totalTexts = zipInventory.values.flatten().toSet().size
+                    if (totalTexts > 0) {
+                        incrementalProgressBar = ProgressBarBuilder()
+                            .setTaskName("Processing texts")
+                            .setInitialMax(totalTexts.toLong())
+                            .setStyle(ProgressBarStyle.COLORFUL_UNICODE_BAR)
+                            .setUpdateIntervalMillis(500)
+                            .showSpeed()
+                            .build()
+                    }
+                }
+
+                // Start dedicated writer thread for incremental output
+                startIncrementalWriterThread()
+            }
         }
 
         if (annotateWith.isNotEmpty()) {
@@ -653,6 +695,12 @@
         // Shutdown entry executor
         entryExecutor?.shutdown()
 
+        // Stop incremental writer thread if running
+        if (incrementalKrill) {
+            stopIncrementalWriterThread()
+            // Keep incrementalProgressBar open - continue using it for remaining texts
+        }
+
         // Finalize krill output: output any remaining incomplete texts and close TAR
         if (outputFormat == OutputFormat.KRILL && krillTarOutputStream != null) {
             try {
@@ -663,33 +711,41 @@
                     LOGGER.info("All texts already output incrementally ($krillOutputCount total)")
                 }
 
-                // Initialize progress bar for remaining texts
-                val krillProgressBar = if (!quiet && remainingCount > 0) {
-                    ProgressBarBuilder()
-                        .setTaskName("Remaining texts")
-                        .setInitialMax(remainingCount.toLong())
-                        .setStyle(ProgressBarStyle.COLORFUL_UNICODE_BAR)
-                        .setUpdateIntervalMillis(500)
-                        .build()
-                } else null
+                // Continue using the same progress bar for remaining texts (no separate bar)
+                // Output remaining texts (these weren't output incrementally, possibly incomplete)
+                // Copy keys to avoid ConcurrentModificationException if scanner is still running
+                val remainingKeys = krillData.keys.sorted()
+                remainingKeys.forEach { textId ->
+                    val textData = krillData.remove(textId) ?: return@forEach  // Skip if already removed by scanner
+                    val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
 
-                try {
-                    // Output remaining texts (may be incomplete if not all ZIPs covered all texts)
-                    krillData.keys.sorted().forEach { textId ->
-                        val textData = krillData[textId]!!
-                        val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
-                        if (!textFoundries.containsAll(expectedFoundries)) {
-                            LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries} (expected: $expectedFoundries)")
-                        }
-                        outputKrillText(textId, textData)
-                        krillProgressBar?.step()
+                    // Build expected foundries from inventory: which ZIPs contain this text?
+                    val expectedForThisText = if (incrementalKrill) {
+                        // Find which ZIPs should have contained this text and extract their foundries
+                        zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+                            .map { zipPath -> getFoundryFromZipFileName(File(zipPath).name) }
+                            .toSet()
+                    } else {
+                        expectedFoundries
                     }
-                } finally {
-                    krillProgressBar?.close()
+
+                    if (!textFoundries.containsAll(expectedForThisText)) {
+                        LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+                    }
+                    outputKrillText(textId, textData)
+                    // Continue stepping the same progress bar
+                    incrementalProgressBar?.step()
                 }
 
+                // Set flag before closing stream to prevent scanner from trying to write
+                tarStreamOpen = false
+
                 krillTarOutputStream!!.finish()
                 krillTarOutputStream!!.close()
+
+                // Close incremental progress bar if it was initialized
+                incrementalProgressBar?.close()
+
                 LOGGER.info("Closed krill TAR file: $krillOutputFileName (total texts output: $krillOutputCount)")
             } catch (e: Exception) {
                 LOGGER.severe("ERROR generating krill output: ${e.message}")
@@ -781,11 +837,6 @@
     }
 
     private fun processZipFile(zipFilePath: String, foundry: String = "base") {
-        // Mark this ZIP as active for watermark-based incremental output
-        if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
-            activeZips[zipFilePath] = true
-        }
-
         val ord = zipOrdinals[zipFilePath] ?: 0
         val size = zipSizes[zipFilePath] ?: 0L
         LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -863,23 +914,13 @@
         }
         logZipProgress(zipFilePath)
 
-        // For Krill format with incremental mode, check if any texts are now complete and output them
+        // Track foundry as processed
         if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
             processedFoundries.add(foundry)
-            checkAndOutputCompleteTexts()
-
-            // Mark this ZIP as inactive for watermark-based incremental output
-            activeZips.remove(zipFilePath)
-            LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
         }
     }
 
     private fun processZipFileSequentially(zipFilePath: String, foundry: String = "base") {
-        // Mark this ZIP as active for watermark-based incremental output
-        if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
-            activeZips[zipFilePath] = true
-        }
-
         val ord = zipOrdinals[zipFilePath] ?: 0
         val size = zipSizes[zipFilePath] ?: 0L
         LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -915,14 +956,9 @@
         }
         logZipProgress(zipFilePath)
 
-        // For Krill format with incremental mode, check if any texts are now complete and output them
+        // Track foundry as processed
         if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
             processedFoundries.add(foundry)
-            checkAndOutputCompleteTexts()
-
-            // Mark this ZIP as inactive for watermark-based incremental output
-            activeZips.remove(zipFilePath)
-            LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
         }
     }
 
@@ -1212,9 +1248,10 @@
                     }
                 }
 
-                // Advance watermark for incremental output (for Krill format with watermark-based output)
+                // Mark text as processed from this ZIP for incremental output
                 if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
-                    advanceWatermarkAndOutput(zipPath, docId)
+                    // Mark this text as processed from this ZIP (writer thread will scan periodically)
+                    processedTextsPerZip.getOrPut(zipPath) { mutableSetOf() }.add(docId)
                 }
 
                 val morphoRequired = when {
@@ -2248,6 +2285,9 @@
 
     // Collect structural spans from structure.xml for krill format
     private fun collectKrillStructureSpans(docId: String, spans: NodeList) {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         val textData = krillData.getOrPut(docId) {
             KrillTextData(textId = docId)
         }
@@ -2317,6 +2357,9 @@
 
     // Collect rich metadata from header.xml for krill format
     private fun collectKrillMetadata(docId: String, headerXml: String) {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         val textData = krillData.getOrPut(docId) {
             KrillTextData(textId = docId)
         }
@@ -2619,6 +2662,9 @@
 
     // Collect base text data (text, tokens, sentences) for krill format
     private fun collectKrillBaseData(docId: String) {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         LOGGER.info("Collecting krill base data for $docId: text=${texts[docId] != null}, tokens=${tokens[docId] != null}, sentences=${sentences[docId] != null}")
 
         val textData = krillData.getOrPut(docId) {
@@ -2671,6 +2717,9 @@
     // Collect morpho data directly from parsed data (for krill format, bypasses shared morpho map)
     // This version takes the morpho data as a parameter to avoid contamination from other foundries
     private fun collectKrillMorphoDataDirect(docId: String, foundry: String, morphoDataMap: MutableMap<String, MorphoSpan>, annotationType: String = "morpho") {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         LOGGER.info("Collecting krill $annotationType data (direct) for $docId, foundry=$foundry, morpho=${morphoDataMap.size}")
 
         val textData = krillData.getOrPut(docId) {
@@ -2740,6 +2789,9 @@
     // Collect morpho data from a specific foundry for krill format (OLD VERSION - reads from shared morpho map)
     // annotationType: "morpho" = collect POS/lemma/features, "dependency" = collect head/deprel only
     private fun collectKrillMorphoData(docId: String, foundry: String, annotationType: String = "morpho") {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         LOGGER.info("Collecting krill $annotationType data for $docId, foundry=$foundry, morpho=${morpho[docId]?.size ?: 0}")
 
         val textData = krillData.getOrPut(docId) {
@@ -2817,6 +2869,9 @@
 
     // Old collectKrillData - no longer used, kept for reference
     private fun collectKrillData(docId: String, foundry: String) {
+        // Skip if already output (thread-safe check with ConcurrentHashMap.KeySet)
+        if (incrementalKrill && outputTexts.contains(docId)) return
+
         LOGGER.info("Collecting krill data for $docId, foundry=$foundry, morpho=${morpho[docId]?.size ?: 0}")
 
         // Get or create KrillTextData for this text
@@ -2864,67 +2919,160 @@
         }
     }
 
-    // Check if a text has all expected foundries and output it if complete
-    private fun checkAndOutputCompleteTexts() {
-        if (outputFormat != OutputFormat.KRILL || krillTarOutputStream == null) return
+    // Start timer-based scanner for incremental output
+    private fun startIncrementalWriterThread() {
+        if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
 
-        val textsToOutput = mutableListOf<String>()
+        shutdownIncrementalWriter = false
+        incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
+            Thread(r, "KrillWriterThread")
+        }
 
-        // Find texts that have all expected foundries
-        krillData.forEach { (textId, textData) ->
-            val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
-            if (textFoundries.containsAll(expectedFoundries)) {
-                textsToOutput.add(textId)
+        // Scan for complete texts every 500ms
+        incrementalOutputScheduler?.scheduleAtFixedRate({
+            try {
+                scanAndOutputCompleteTexts()
+            } catch (e: Exception) {
+                LOGGER.severe("Error in incremental writer: ${e.message}")
+                e.printStackTrace()
+            }
+        }, 500, 500, java.util.concurrent.TimeUnit.MILLISECONDS)
+
+        LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
+    }
+
+    // Scan all texts and output any that are complete
+    private fun scanAndOutputCompleteTexts() {
+        if (shutdownIncrementalWriter || !tarStreamOpen) return
+
+        // Get all texts that we know about (from zipInventory)
+        val allTexts = zipInventory.values.flatten().toSet()
+
+        var outputCount = 0
+        for (textId in allTexts) {
+            // Check again if stream is still open (may have been closed during iteration)
+            if (!tarStreamOpen) break
+
+            // Skip if already output
+            if (outputTexts.contains(textId)) continue
+
+            // Find all ZIPs that should contain this text
+            val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+
+            // Check if all relevant ZIPs have processed this text
+            val allProcessed = relevantZips.all { path ->
+                processedTextsPerZip[path]?.contains(textId) == true
+            }
+
+            if (allProcessed && krillData.containsKey(textId)) {
+                // Atomically claim this text for output (add returns false if already present)
+                if (outputTexts.add(textId)) {
+                    // We successfully claimed it - output now
+                    val textData = krillData.remove(textId)
+                    if (textData != null) {
+                        // Double-check stream is still open before writing
+                        if (tarStreamOpen) {
+                            try {
+                                outputKrillText(textId, textData)
+                                incrementalProgressBar?.step()
+                                outputCount++
+                                LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
+                            } catch (e: IOException) {
+                                // Stream may have been closed - stop trying to output
+                                LOGGER.warning("Cannot output text $textId: stream closed")
+                                tarStreamOpen = false
+                                break
+                            }
+                        }
+                    }
+
+                    // Clean up tracking data for this text
+                    relevantZips.forEach { path ->
+                        zipInventory[path]?.remove(textId)
+                        processedTextsPerZip[path]?.remove(textId)
+                    }
+                }
             }
         }
 
-        // Output and remove complete texts
-        textsToOutput.sorted().forEach { textId ->
-            val textData = krillData.remove(textId)
-            if (textData != null) {
-                outputKrillText(textId, textData)
-            }
-        }
-
-        if (textsToOutput.isNotEmpty()) {
-            LOGGER.info("Output ${textsToOutput.size} complete texts (${krillData.size} still pending)")
+        if (outputCount > 0) {
+            LOGGER.fine("Batch output: $outputCount texts (${krillData.size} still pending)")
         }
     }
 
-    // Watermark-based incremental output: output all texts below the global watermark
-    // Watermark = minimum of max textIds seen across all active ZIPs
-    // This ensures no more annotations will arrive for texts below the watermark
-    private fun advanceWatermarkAndOutput(zipPath: String, currentTextId: String) {
-        if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
+    // Stop the incremental writer thread
+    private fun stopIncrementalWriterThread() {
+        if (incrementalOutputScheduler != null) {
+            LOGGER.info("Stopping incremental writer scheduler")
+            shutdownIncrementalWriter = true
 
-        synchronized(watermarkLock) {
-            // Update this ZIP's watermark
-            zipWatermarks[zipPath] = currentTextId
+            // Do one final scan before shutting down
+            scanAndOutputCompleteTexts()
 
-            // Calculate global watermark (minimum across all active ZIPs)
-            val activeWatermarks = activeZips.keys.mapNotNull { zipWatermarks[it] }
-            if (activeWatermarks.isEmpty()) return
+            // Use shutdownNow() to cancel any scheduled tasks immediately
+            incrementalOutputScheduler?.shutdownNow()
+            try {
+                if (!incrementalOutputScheduler!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
+                    LOGGER.warning("Writer scheduler did not terminate within timeout")
+                }
+            } catch (e: InterruptedException) {
+                LOGGER.warning("Interrupted while stopping writer scheduler")
+                Thread.currentThread().interrupt()
+            }
+            incrementalOutputScheduler = null
+        }
+    }
 
-            val globalWatermark = activeWatermarks.minOrNull() ?: return
+    // Build per-ZIP inventory of which texts each ZIP contains
+    // This allows us to know when a text has been processed by all ZIPs that should contain it
+    private fun buildZipInventory(zipPaths: Array<String>) {
+        LOGGER.info("Building per-ZIP inventory to track text completeness...")
+        zipInventory.clear()
 
-            // Output all texts strictly less than the global watermark
-            if (globalWatermark > lastOutputWatermark) {
-                val textsToOutput = krillData.keys.filter { it < globalWatermark }.sorted()
+        val dbFactory = DocumentBuilderFactory.newInstance()
+        val dBuilder = dbFactory.newDocumentBuilder()
 
-                textsToOutput.forEach { textId ->
-                    val textData = krillData.remove(textId)
-                    if (textData != null) {
-                        outputKrillText(textId, textData)
+        zipPaths.forEach { zipPath ->
+            val textsInThisZip = mutableSetOf<String>()
+            LOGGER.info("Scanning $zipPath...")
+
+            try {
+                ApacheZipFile(File(zipPath)).use { zipFile ->
+                    val entries = zipFile.entries
+                    while (entries.hasMoreElements()) {
+                        val entry = entries.nextElement()
+                        // Look for data.xml or tokens.xml to identify texts
+                        if (entry.name.matches(Regex(".*(data|tokens)\\.xml$"))) {
+                            try {
+                                // Parse XML to extract docId attribute
+                                val doc = zipFile.getInputStream(entry).use { inputStream ->
+                                    XMLCommentFilterReader(inputStream, "UTF-8").use { reader ->
+                                        dBuilder.parse(InputSource(reader))
+                                    }
+                                }
+                                doc.documentElement.normalize()
+                                val docId = doc.documentElement.getAttribute("docid")
+                                if (docId.isNotEmpty()) {
+                                    textsInThisZip.add(docId)
+                                }
+                            } catch (e: Exception) {
+                                // Skip entries that can't be parsed
+                                LOGGER.fine("Skipped entry ${entry.name}: ${e.message}")
+                            }
+                        }
                     }
                 }
-
-                if (textsToOutput.isNotEmpty()) {
-                    LOGGER.info("Watermark advanced to $globalWatermark: output ${textsToOutput.size} texts (${krillData.size} still pending)")
-                }
-
-                lastOutputWatermark = globalWatermark
+                zipInventory[zipPath] = textsInThisZip
+                LOGGER.info("  $zipPath contains ${textsInThisZip.size} texts")
+            } catch (e: Exception) {
+                LOGGER.warning("Failed to scan $zipPath: ${e.message}")
             }
         }
+
+        LOGGER.info("ZIP inventory built: ${zipPaths.size} ZIPs scanned")
+        // Calculate total unique texts
+        val allTexts = zipInventory.values.flatten().toSet()
+        LOGGER.info("  Total unique texts across all ZIPs: ${allTexts.size}")
     }
 
     // Output a single text to Krill TAR (thread-safe)