Krill output: Introduce watermark based output approach

Change-Id: Ide6ef9dd64df92b90d3f02ebceaea2ca14b0985c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 6825c23..b5c08f5 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -399,6 +399,12 @@
     val processedFoundries: MutableSet<String> = mutableSetOf()
     var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
 
+    // Watermark-based incremental output: track max textId seen in each ZIP
+    val zipWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+    val activeZips: ConcurrentHashMap<String, Boolean> = ConcurrentHashMap()
+    val watermarkLock = Any()
+    var lastOutputWatermark: String = ""
+
     fun String.hasCorrespondingBaseZip(): Boolean {
         if (!this.matches(Regex(".*\\.([^/.]+)\\.zip$"))) return false
         val baseZip = this.replace(Regex("\\.([^/.]+)\\.zip$"), ".zip")
@@ -421,6 +427,9 @@
             krillData.clear()
             corpusMetadata.clear()
             docMetadata.clear()
+            zipWatermarks.clear()
+            activeZips.clear()
+            lastOutputWatermark = ""
         }
 
         // Initialize shared entry executor (used inside each zip)
@@ -772,6 +781,11 @@
     }
 
     private fun processZipFile(zipFilePath: String, foundry: String = "base") {
+        // Mark this ZIP as active for watermark-based incremental output
+        if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+            activeZips[zipFilePath] = true
+        }
+
         val ord = zipOrdinals[zipFilePath] ?: 0
         val size = zipSizes[zipFilePath] ?: 0L
         LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -824,7 +838,7 @@
                     foundry  // Keep original foundry for non-krill formats
                 }
                 ApacheZipFile(File(zip)).use { zipFile ->
-                    processZipEntriesWithPool(zipFile, zipFoundry, true)
+                    processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
                 }
             }
         } else {
@@ -832,7 +846,7 @@
             try {
                 ApacheZipFile(File(zipFilePath)).use { zipFile ->
                     LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
-                    processZipEntriesWithPool(zipFile, foundry, false)
+                    processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
                     LOGGER.fine("Returned from processZipEntriesWithPool")
                 }
             } catch (e: Exception) {
@@ -853,10 +867,19 @@
         if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
             processedFoundries.add(foundry)
             checkAndOutputCompleteTexts()
+
+            // Mark this ZIP as inactive for watermark-based incremental output
+            activeZips.remove(zipFilePath)
+            LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
         }
     }
 
     private fun processZipFileSequentially(zipFilePath: String, foundry: String = "base") {
+        // Mark this ZIP as active for watermark-based incremental output
+        if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+            activeZips[zipFilePath] = true
+        }
+
         val ord = zipOrdinals[zipFilePath] ?: 0
         val size = zipSizes[zipFilePath] ?: 0L
         LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -876,7 +899,7 @@
                         .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
                         .sortedBy { it.name }
                         .forEach { zipEntry ->
-                            processZipEntry(zipFile, zipFoundry, zipEntry, true)
+                            processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
                         }
                 }
             }
@@ -886,7 +909,7 @@
                     .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
                     .sortedBy { it.name }
                     .forEach { zipEntry ->
-                        processZipEntry(zipFile, foundry, zipEntry, false)
+                        processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
                     }
             }
         }
@@ -896,6 +919,10 @@
         if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
             processedFoundries.add(foundry)
             checkAndOutputCompleteTexts()
+
+            // Mark this ZIP as inactive for watermark-based incremental output
+            activeZips.remove(zipFilePath)
+            LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
         }
     }
 
@@ -939,7 +966,7 @@
         return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
     }
 
-    private fun processZipEntriesWithPool(zipFile: ApacheZipFile, foundry: String, waitForMorpho: Boolean) {
+    private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
         // Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
         val entries: MutableList<ZipArchiveEntry> = ArrayList()
         var documentCount = 0
@@ -984,7 +1011,7 @@
 
         // If only one thread requested, do sequential to avoid pool overhead
         if (maxThreads <= 1) {
-            entries.forEach { entry -> processZipEntry(zipFile, foundry, entry, waitForMorpho) }
+            entries.forEach { entry -> processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho) }
             return
         }
 
@@ -994,7 +1021,7 @@
         entries.forEach { entry ->
             entryExecutor?.execute {
                 try {
-                    processZipEntry(zipFile, foundry, entry, waitForMorpho)
+                    processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
                 } catch (t: Throwable) {
                     LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
                 } finally {
@@ -1009,7 +1036,7 @@
         }
     }
 
-    fun processZipEntry(zipFile: ApacheZipFile, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
+    fun processZipEntry(zipFile: ApacheZipFile, zipPath: String, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
         var foundry = _foundry
         var waitForMorpho = passedWaitForMorpho
         LOGGER.finer("Processing ${zipEntry.name} in thread ${Thread.currentThread().threadId()}")
@@ -1185,6 +1212,11 @@
                     }
                 }
 
+                // Advance watermark for incremental output (for Krill format with watermark-based output)
+                if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+                    advanceWatermarkAndOutput(zipPath, docId)
+                }
+
                 val morphoRequired = when {
                     // If tagger or parser is enabled, we generate annotations in processText
                     taggerName != null || parserName != null -> false
@@ -2859,6 +2891,42 @@
         }
     }
 
+    // Watermark-based incremental output: output all texts below the global watermark
+    // Watermark = minimum of max textIds seen across all active ZIPs
+    // This ensures no more annotations will arrive for texts below the watermark
+    private fun advanceWatermarkAndOutput(zipPath: String, currentTextId: String) {
+        if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
+
+        synchronized(watermarkLock) {
+            // Update this ZIP's watermark
+            zipWatermarks[zipPath] = currentTextId
+
+            // Calculate global watermark (minimum across all active ZIPs)
+            val activeWatermarks = activeZips.keys.mapNotNull { zipWatermarks[it] }
+            if (activeWatermarks.isEmpty()) return
+
+            val globalWatermark = activeWatermarks.minOrNull() ?: return
+
+            // Output all texts strictly less than the global watermark
+            if (globalWatermark > lastOutputWatermark) {
+                val textsToOutput = krillData.keys.filter { it < globalWatermark }.sorted()
+
+                textsToOutput.forEach { textId ->
+                    val textData = krillData.remove(textId)
+                    if (textData != null) {
+                        outputKrillText(textId, textData)
+                    }
+                }
+
+                if (textsToOutput.isNotEmpty()) {
+                    LOGGER.info("Watermark advanced to $globalWatermark: output ${textsToOutput.size} texts (${krillData.size} still pending)")
+                }
+
+                lastOutputWatermark = globalWatermark
+            }
+        }
+    }
+
     // Output a single text to Krill TAR (thread-safe)
     private fun outputKrillText(textId: String, textData: KrillTextData) {
         try {