Fix late expunging of complete texts

Change-Id: Ibdec0a0c2e840279fe48dd172c7061761d4a13fe
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 174ddb2..83bc07e 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1176,11 +1176,18 @@
     val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
     val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
     val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
+    val readyKrillTextIds: java.util.concurrent.BlockingQueue<String> = java.util.concurrent.LinkedBlockingQueue()
     val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
     val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
     val expectedFoundries: MutableSet<String> = mutableSetOf("base")
     val processedFoundries: MutableSet<String> = mutableSetOf()
     var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
+    private val krillPeakRawPending = AtomicInteger(0)
+    private val krillPeakCompressedPending = AtomicInteger(0)
+    private val krillPeakCompressionInFlight = AtomicInteger(0)
+    private val krillPeakReadyQueueDepth = AtomicInteger(0)
+    @Volatile private var lastKrillStateLogNanos = 0L
+    private val krillStateLogIntervalNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30)
 
     // Compression thread pool for parallel GZIP compression
     var compressionExecutor: java.util.concurrent.ExecutorService? = null
@@ -1232,12 +1239,18 @@
             expectedFoundries.add("base")
             processedFoundries.clear()
             krillOutputCount.set(0)
+            krillPeakRawPending.set(0)
+            krillPeakCompressedPending.set(0)
+            krillPeakCompressionInFlight.set(0)
+            krillPeakReadyQueueDepth.set(0)
+            lastKrillStateLogNanos = 0L
             krillData.clear()
             corpusMetadata.clear()
             docMetadata.clear()
             zipInventory.clear()
             processedTextsPerZip.clear()
             outputTexts.clear()
+            readyKrillTextIds.clear()
         }
 
         // Initialize priority-based entry executor
@@ -2015,16 +2028,11 @@
                 // Check if all expected foundries have completed this text
                 val expectedFoundries = expectedFoundriesPerText[textId] ?: emptySet()
                 if (completedFoundries.containsAll(expectedFoundries)) {
-                    // All foundries complete - compress immediately in this worker thread
-                    // This avoids queueing overhead and ensures compression happens ASAP
+                    // All foundries complete - start compression immediately so the writer
+                    // thread can output and free the text without waiting for a global scan.
                     val textData = krillData[textId]
-                    if (textData != null && !krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
-                        // Mark as being compressed to prevent duplicate compression
-                        krillCompressionFutures[textId] = java.util.concurrent.CompletableFuture.completedFuture(null)
-                        
-                        // Compress inline in this worker thread (we have plenty of workers)
-                        compressKrillText(textId, textData)
-                        LOGGER.fine("Compressed completed text inline: $textId")
+                    if (textData != null) {
+                        enqueueKrillCompression(textId, textData)
                     }
                 }
             } catch (t: Throwable) {
@@ -3391,6 +3399,48 @@
         }
     }
 
+    private fun updatePeakCounter(counter: AtomicInteger, value: Int) {
+        while (true) {
+            val current = counter.get()
+            if (value <= current) return
+            if (counter.compareAndSet(current, value)) return
+        }
+    }
+
+    private fun logKrillPipelineState(reason: String, force: Boolean = false) {
+        if (outputFormat != OutputFormat.KRILL) return
+
+        val now = System.nanoTime()
+        if (!force && now - lastKrillStateLogNanos < krillStateLogIntervalNanos) return
+
+        val rawPending = krillData.size
+        val compressedPending = krillCompressedData.size
+        val compressionInFlight = krillCompressionFutures.size
+        val readyQueueDepth = readyKrillTextIds.size
+
+        updatePeakCounter(krillPeakRawPending, rawPending)
+        updatePeakCounter(krillPeakCompressedPending, compressedPending)
+        updatePeakCounter(krillPeakCompressionInFlight, compressionInFlight)
+        updatePeakCounter(krillPeakReadyQueueDepth, readyQueueDepth)
+
+        lastKrillStateLogNanos = now
+
+        val rt = Runtime.getRuntime()
+        val usedMB = (rt.totalMemory() - rt.freeMemory()) / (1024 * 1024)
+        val totalMB = rt.totalMemory() / (1024 * 1024)
+        val maxMB = rt.maxMemory() / (1024 * 1024)
+        val outputCount = krillOutputCount.get()
+        val totalTexts = expectedTextOrder.size
+
+        LOGGER.info(
+            "KRILL-STATS reason=$reason output=$outputCount/$totalTexts " +
+                "heapMB{used=$usedMB,total=$totalMB,max=$maxMB} " +
+                "queues{raw=$rawPending,compressed=$compressedPending,inFlight=$compressionInFlight,ready=$readyQueueDepth} " +
+                "peaks{raw=${krillPeakRawPending.get()},compressed=${krillPeakCompressedPending.get()}," +
+                "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}"
+        )
+    }
+
     private fun conlluOutput(foundry: String, docId: String): StringBuilder {
         var token_index = 0
         var real_token_index = 0
@@ -5388,6 +5438,7 @@
         }, 500, 500, java.util.concurrent.TimeUnit.MILLISECONDS)
 
         LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
+        logKrillPipelineState("writer-start", force = true)
     }
 
     // Compress text data in parallel, then write to TAR sequentially
@@ -5475,21 +5526,114 @@
             // large corpora: without this, raw data and compressed bytes both sit in
             // memory simultaneously until the incremental writer catches up.
             krillData.remove(textId)
+            readyKrillTextIds.put(textId)
+            updatePeakCounter(krillPeakCompressedPending, krillCompressedData.size)
+            updatePeakCounter(krillPeakReadyQueueDepth, readyKrillTextIds.size)
             LOGGER.finer("Compressed text $textId (${compressedData.size} bytes), released raw data")
         } catch (e: Exception) {
             LOGGER.severe("ERROR compressing $textId: ${e.message}")
             e.printStackTrace()
+        } finally {
+            krillCompressionFutures.remove(textId)
         }
     }
 
+    private fun enqueueKrillCompression(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+        if (krillCompressedData.containsKey(textId)) return
+
+        val executor = compressionExecutor
+        val future = if (executor != null && !executor.isShutdown) {
+            executor.submit {
+                compressKrillText(textId, textData)
+            }
+        } else {
+            java.util.concurrent.CompletableFuture.runAsync {
+                compressKrillText(textId, textData)
+            }
+        }
+
+        val existing = krillCompressionFutures.putIfAbsent(textId, future)
+        if (existing != null) {
+            future.cancel(false)
+        } else {
+            updatePeakCounter(krillPeakRawPending, krillData.size)
+            updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
+        }
+    }
+
+    private fun writeReadyKrillTexts(textIds: Collection<String>): Int {
+        var outputCount = 0
+
+        for (textId in textIds) {
+            val compressedData = krillCompressedData[textId] ?: continue
+
+            if (outputTexts.add(textId)) {
+                val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+                tarStreamLock.lock()
+                try {
+                    if (tarStreamOpen) {
+                        try {
+                            synchronized(krillTarOutputStream!!) {
+                                val tarEntry = TarArchiveEntry(compressedData.fileName)
+                                tarEntry.size = compressedData.compressedBytes.size.toLong()
+                                krillTarOutputStream!!.putArchiveEntry(tarEntry)
+                                krillTarOutputStream!!.write(compressedData.compressedBytes)
+                                krillTarOutputStream!!.closeArchiveEntry()
+                            }
+
+                            krillOutputCount.incrementAndGet()
+                            incrementalProgressBar?.step()
+                            outputCount++
+                            LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} raw pending)")
+                        } catch (e: IOException) {
+                            LOGGER.warning("Cannot output text $textId: stream closed")
+                            tarStreamOpen = false
+                            break
+                        }
+                    }
+                } finally {
+                    tarStreamLock.unlock()
+                }
+            }
+
+            krillData.remove(textId)
+            krillCompressedData.remove(textId)
+            krillCompressionFutures.remove(textId)
+
+            val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+            relevantZips.forEach { path ->
+                zipInventory[path]?.remove(textId)
+                processedTextsPerZip[path]?.remove(textId)
+            }
+        }
+
+        return outputCount
+    }
+
     // Scan all texts and output any that are complete
     private fun scanAndOutputCompleteTexts(forceScan: Boolean = false) {
         if ((shutdownIncrementalWriter && !forceScan) || !tarStreamOpen) return
 
         if (expectedTextOrder.isEmpty()) return
 
+        updatePeakCounter(krillPeakRawPending, krillData.size)
+        updatePeakCounter(krillPeakCompressedPending, krillCompressedData.size)
+        updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
+        updatePeakCounter(krillPeakReadyQueueDepth, readyKrillTextIds.size)
+
+        val readyFromCompression = mutableListOf<String>()
+        while (true) {
+            val textId = readyKrillTextIds.poll() ?: break
+            readyFromCompression.add(textId)
+        }
+
+        val writtenFromReadyQueue = if (readyFromCompression.isNotEmpty()) {
+            writeReadyKrillTexts(readyFromCompression)
+        } else {
+            0
+        }
+
         // Phase 1: Find ALL ready texts (order doesn't matter for output)
-        val readyTexts = mutableListOf<String>()
         var checkedCount = 0
         var newFlowCount = 0
         var oldFlowCount = 0
@@ -5518,92 +5662,21 @@
             }
             
             if (allProcessed) {
-                readyTexts.add(textId)
-                
-                // Start compression immediately if not already started
-                if (!krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
-                    val textData = krillData[textId]
-                    if (textData != null) {
-                        val future = compressionExecutor?.submit {
-                            compressKrillText(textId, textData)
-                        }
-                        if (future != null) {
-                            krillCompressionFutures[textId] = future
-                        }
-                    }
+                val textData = krillData[textId]
+                if (textData != null) {
+                    enqueueKrillCompression(textId, textData)
                 }
             }
         }
 
-        // Phase 2: Write compressed texts to TAR (only those already compressed, no waiting)
-        // Event-driven: if compression is done, write it; otherwise skip and try next scan
-        var outputCount = 0
-        var waitingForCompression = 0
-        for (textId in readyTexts) {
-            // Check if already compressed (event-based: poll, don't wait)
-            val compressedData = krillCompressedData[textId]
-            
-            // If not yet compressed, skip for now (will be picked up in next scan)
-            if (compressedData == null) {
-                waitingForCompression++
-                // Check if future is done without blocking
-                val future = krillCompressionFutures[textId]
-                if (future?.isDone == true) {
-                    // Compression just finished but data not yet in map - race condition, will catch next scan
-                    LOGGER.finest("Compression finished for $textId but data not yet stored, retrying next scan")
-                }
-                continue
-            }
-            
-            // Remove future since compression is complete
-            krillCompressionFutures.remove(textId)
-            
-            if (outputTexts.add(textId)) {
-                val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
-                tarStreamLock.lock()
-                try {
-                    if (tarStreamOpen) {
-                        try {
-                            // Write to TAR
-                            synchronized(krillTarOutputStream!!) {
-                                val tarEntry = TarArchiveEntry(compressedData.fileName)
-                                tarEntry.size = compressedData.compressedBytes.size.toLong()
-                                krillTarOutputStream!!.putArchiveEntry(tarEntry)
-                                krillTarOutputStream!!.write(compressedData.compressedBytes)
-                                krillTarOutputStream!!.closeArchiveEntry()
-                            }
-                            
-                            val count = krillOutputCount.incrementAndGet()
-                            incrementalProgressBar?.step()
-                            outputCount++
-                            LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
-                        } catch (e: IOException) {
-                            LOGGER.warning("Cannot output text $textId: stream closed")
-                            tarStreamOpen = false
-                            break
-                        }
-                    }
-                } finally {
-                    tarStreamLock.unlock()
-                }
-            }
-
-            // Clean up all data structures
-            krillData.remove(textId)
-            krillCompressedData.remove(textId)
-            krillCompressionFutures.remove(textId)
-
-            // Clean up tracking data for this text
-            val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
-            relevantZips.forEach { path ->
-                zipInventory[path]?.remove(textId)
-                processedTextsPerZip[path]?.remove(textId)
-            }
+        if (writtenFromReadyQueue > 0 || !readyFromCompression.isNullOrEmpty()) {
+            LOGGER.fine(
+                "Scan: checked=$checkedCount, newFlow=$newFlowCount, oldFlow=$oldFlowCount, " +
+                    "readyQueued=${readyFromCompression.size}, written=$writtenFromReadyQueue, rawPending=${krillData.size}, compressedPending=${krillCompressedData.size}"
+            )
         }
 
-        if (outputCount > 0 || readyTexts.isNotEmpty()) {
-            LOGGER.fine("Scan: checked=$checkedCount, newFlow=$newFlowCount, oldFlow=$oldFlowCount, ready=${readyTexts.size}, compressed=$outputCount, waitingCompression=$waitingForCompression, pending=${krillData.size}")
-        }
+        logKrillPipelineState(if (forceScan) "scan-final" else "scan")
     }
 
     // Stop the incremental writer thread
@@ -5638,6 +5711,7 @@
             val remainingBeforeScan = krillData.size
             LOGGER.info("Doing final scan for remaining texts...")
             scanAndOutputCompleteTexts(forceScan = true)
+            logKrillPipelineState("writer-stop", force = true)
             
             // Note: Don't shutdown compressionExecutor here - let the main finalization code
             // handle remaining texts with parallel compression, then shut it down there
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
index c859c64..180b332 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
@@ -74,6 +74,7 @@
                 extractDir.deleteRecursively()
             }
         }
+
     }
 
     private val outContent = ByteArrayOutputStream(10000000)
@@ -236,6 +237,26 @@
     }
 
     @Test
+    fun krillHandlesSparseFoundryWithoutBlockingOtherTexts() {
+        val baseZip = loadResource("ndy_sample.zip").path
+        val sparseCmcZip = loadResource("ndy_sample.cmc.zip").path
+        val emptyGenderZip = loadResource("ndy_sample.gender.zip").path
+
+        val generatedTar = ensureKrillTar("ndy_sparse_foundries", "ndy_sample.krill.tar") { outputDir ->
+            arrayOf("-t", "krill", "-q", "-D", outputDir.path, baseZip, sparseCmcZip, emptyGenderZip)
+        }
+        assertTrue(generatedTar.exists())
+
+        val jsonByFile = readKrillJson(generatedTar)
+        assertEquals(
+            setOf("NDY-115-005255.json", "NDY-266-006701.json", "NDY-269-017376.json"),
+            jsonByFile.keys,
+            "Sparse and empty foundry ZIPs must not block base-only texts from being written"
+        )
+        assertFalse(jsonByFile.values.any { it.contains("\"gender\"") }, "The empty gender ZIP must not add foundry expectations or block output")
+    }
+
+    @Test
     fun krillOutputContainsBaseStructureSpans() {
         val baseZip = loadResource("wud24_sample.zip").path
         val spacyZip = loadResource("wud24_sample.spacy.zip").path
diff --git a/app/src/test/resources/ndy_sample.cmc.zip b/app/src/test/resources/ndy_sample.cmc.zip
new file mode 100644
index 0000000..d48d1aa
--- /dev/null
+++ b/app/src/test/resources/ndy_sample.cmc.zip
Binary files differ
diff --git a/app/src/test/resources/ndy_sample.gender.zip b/app/src/test/resources/ndy_sample.gender.zip
new file mode 100644
index 0000000..15cb0ec
--- /dev/null
+++ b/app/src/test/resources/ndy_sample.gender.zip
Binary files differ