Stream Krill tail output

Change-Id: I15a971b891004ad3df124946f515d3fd76c2d368
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 5403f5b..7ff2526 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1681,9 +1681,7 @@
                     remainingRawKeys.forEach { textId ->
                         val textData = krillData[textId]
                         if (textData != null && !krillCompressedData.containsKey(textId)) {
-                            compressionExecutor!!.submit {
-                                compressKrillText(textId, textData)
-                            }
+                            enqueueKrillCompression(textId, textData)
                         }
                     }
                     
@@ -1693,10 +1691,18 @@
                         var waitTime = 0L
                         while (!compressionExecutor!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
                             waitTime += 5
-                            val pending = remainingKeys.size - krillCompressedData.size
+                            val pending = remainingRawKeys.count { !krillCompressedData.containsKey(it) }
                             LOGGER.info("Compressing remaining texts... ($pending left, ${waitTime}s elapsed)")
                         }
-                        LOGGER.info("All remaining texts compressed")
+                        val missingCompressed = remainingRawKeys.filterNot { krillCompressedData.containsKey(it) }
+                        if (missingCompressed.isEmpty()) {
+                            LOGGER.info("All remaining texts compressed")
+                        } else {
+                            LOGGER.warning(
+                                "Parallel compression finished with ${missingCompressed.size} texts still missing from cache: " +
+                                    missingCompressed.joinToString(",")
+                            )
+                        }
                     } catch (e: InterruptedException) {
                         LOGGER.warning("Interrupted while compressing remaining texts")
                         Thread.currentThread().interrupt()
@@ -1733,14 +1739,15 @@
                                 krillTarOutputStream!!.closeArchiveEntry()
                             }
                             krillOutputCount.incrementAndGet()
+                            noteKrillOutputProgress()
                             incrementalProgressBar?.step()
                         } catch (e: Exception) {
                             LOGGER.severe("ERROR writing $textId to TAR: ${e.message}")
                             e.printStackTrace()
                         }
                     } else if (textData != null) {
-                        // Fallback: compress inline if not in compressed cache (shouldn't happen)
-                        LOGGER.warning("Text $textId not in compressed cache, compressing inline")
+                        // Fallback: retry inline if parallel compression failed to populate the cache.
+                        LOGGER.warning("Text $textId not in compressed cache after parallel compression, compressing inline")
                         val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
                         val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
                             .flatMap { zipPath ->
@@ -5537,56 +5544,12 @@
     // Compress text data in parallel, then write to TAR sequentially
     private fun compressKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
         try {
-            val json = synchronized(textData) {
+            val (jsonFileName, compressedData) = synchronized(textData) {
                 // Synchronize access to textData to prevent ConcurrentModificationException
-                // during JSON generation while other threads might still be modifying it
-                val corpusSigle = textId.substringBefore('_')
-                val docSigle = textId.substringBeforeLast('.')
-
-                // Apply corpus-level metadata (only if not already set with a non-empty value)
-                corpusMetadata[corpusSigle]?.forEach { (key, value) ->
-                    val currentValue = textData.headerMetadata[key]
-                    // Inherit if: key doesn't exist, OR current value is empty/blank
-                    val shouldInherit = when (currentValue) {
-                        null -> true
-                        is String -> currentValue.isBlank()
-                        else -> false
-                    }
-                    if (shouldInherit && value != null) {
-                        // Only set non-empty values
-                        when (value) {
-                            is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
-                            is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
-                            is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
-                            else -> textData.headerMetadata[key] = value
-                        }
-                    }
-                }
-
-                // Apply doc-level metadata (only if not already set with a non-empty value)
-                docMetadata[docSigle]?.forEach { (key, value) ->
-                    val currentValue = textData.headerMetadata[key]
-                    // Inherit if: key doesn't exist, OR current value is empty/blank
-                    val shouldInherit = when (currentValue) {
-                        null -> true
-                        is String -> currentValue.isBlank()
-                        else -> false
-                    }
-                    if (shouldInherit) {
-                        // Only set non-empty values
-                        when (value) {
-                            is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
-                            is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
-                            is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
-                            else -> textData.headerMetadata[key] = value
-                        }
-                    }
-                }
-
-                KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
+                // during JSON generation while other threads might still be modifying it.
+                applyInheritedKrillMetadata(textId, textData)
+                compressKrillJson(textId, textData)
             }
-            
-            val (jsonFileName, compressedData) = compressKrillJson(textId, json)
 
             // Store compressed data for sequential TAR writing
             krillCompressedData[textId] = CompressedKrillData(textId, jsonFileName, compressedData)
@@ -5608,6 +5571,45 @@
         }
     }
 
+    private fun applyInheritedKrillMetadata(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+        val corpusSigle = textId.substringBefore('_')
+        val docSigle = textId.substringBeforeLast('.')
+
+        corpusMetadata[corpusSigle]?.forEach { (key, value) ->
+            val currentValue = textData.headerMetadata[key]
+            val shouldInherit = when (currentValue) {
+                null -> true
+                is String -> currentValue.isBlank()
+                else -> false
+            }
+            if (shouldInherit && value != null) {
+                when (value) {
+                    is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
+                    is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+                    is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+                    else -> textData.headerMetadata[key] = value
+                }
+            }
+        }
+
+        docMetadata[docSigle]?.forEach { (key, value) ->
+            val currentValue = textData.headerMetadata[key]
+            val shouldInherit = when (currentValue) {
+                null -> true
+                is String -> currentValue.isBlank()
+                else -> false
+            }
+            if (shouldInherit && value != null) {
+                when (value) {
+                    is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
+                    is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+                    is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+                    else -> textData.headerMetadata[key] = value
+                }
+            }
+        }
+    }
+
     private fun enqueueKrillCompression(textId: String, textData: KrillJsonGenerator.KrillTextData) {
         if (krillCompressedData.containsKey(textId)) return
 
@@ -5632,13 +5634,16 @@
         }
     }
 
-    private fun compressKrillJson(textId: String, json: String): Pair<String, ByteArray> {
+    private fun compressKrillJson(
+        textId: String,
+        textData: KrillJsonGenerator.KrillTextData
+    ): Pair<String, ByteArray> {
         return if (useLz4) {
             val fileName = textId.replace("_", "-").replace(".", "-") + ".json.lz4"
             val byteOut = ByteArrayOutputStream()
             net.jpountz.lz4.LZ4FrameOutputStream(byteOut).use { lz4Out ->
                 OutputStreamWriter(lz4Out, StandardCharsets.UTF_8).use { writer ->
-                    writer.write(json)
+                    KrillJsonGenerator.generateTo(writer, textData, corpusMetadata, docMetadata, includeNonWordTokens)
                 }
             }
             Pair(fileName, byteOut.toByteArray())
@@ -5652,7 +5657,7 @@
             }
             gzipOut.use { gzip ->
                 OutputStreamWriter(gzip, StandardCharsets.UTF_8).use { writer ->
-                    writer.write(json)
+                    KrillJsonGenerator.generateTo(writer, textData, corpusMetadata, docMetadata, includeNonWordTokens)
                 }
             }
             Pair(fileName, byteOut.toByteArray())
@@ -5919,29 +5924,11 @@
     // Output a single text to Krill TAR (thread-safe)
     private fun outputKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
         try {
-            // Merge corpus and doc metadata
-            val textIdWithSlashes = textData.textId.replace("_", "/").replace(".", "/")
-            val corpusSigle = textIdWithSlashes.split("/")[0]
-            val docSigle = textIdWithSlashes.split("/").take(2).joinToString("/")
-
-            // Apply corpus-level metadata
-            corpusMetadata[corpusSigle]?.forEach { (key, value) ->
-                if (!textData.headerMetadata.containsKey(key)) {
-                    textData.headerMetadata[key] = value
-                }
+            val (jsonFileName, compressedData) = synchronized(textData) {
+                applyInheritedKrillMetadata(textId, textData)
+                compressKrillJson(textId, textData)
             }
 
-            // Apply doc-level metadata
-            docMetadata[docSigle]?.forEach { (key, value) ->
-                if (!textData.headerMetadata.containsKey(key)) {
-                    textData.headerMetadata[key] = value
-                }
-            }
-
-            val json = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
-            
-            val (jsonFileName, compressedData) = compressKrillJson(textId, json)
-
             // Write to TAR (synchronized for thread safety)
             synchronized(krillTarOutputStream!!) {
                 val tarEntry = TarArchiveEntry(jsonFileName)
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
index 295265a..cf4a7d8 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
@@ -60,6 +60,18 @@
         includeNonWordTokens: Boolean
     ): String {
         val sb = StringBuilder()
+        generateTo(sb, textData, corpusMetadata, docMetadata, includeNonWordTokens)
+        return sb.toString()
+    }
+
+    fun generateTo(
+        out: Appendable,
+        textData: KrillTextData,
+        corpusMetadata: Map<String, MutableMap<String, Any>>,
+        docMetadata: Map<String, MutableMap<String, Any>>,
+        includeNonWordTokens: Boolean
+    ) {
+        val sb = StringBuilder()
         sb.append("{")
 
         // @context, @type, and version
@@ -378,20 +390,37 @@
 
         // stream - token-level annotations
         sb.append("\"stream\":[")
-        if (textData.tokens != null) {
-            val streamItems = generateStream(textData, includeNonWordTokens)
-            sb.append(streamItems.joinToString(","))
-        }
-        sb.append("]")
+        out.append(sb.toString())
+        appendStream(out, textData, includeNonWordTokens)
+        out.append("]")
 
-        sb.append("}")  // close data
-        sb.append("}")  // close root
-
-        return sb.toString()
+        out.append("}")  // close data
+        out.append("}")  // close root
     }
 
     private fun generateStream(textData: KrillTextData, includeNonWordTokens: Boolean): List<String> {
-        val rawTokens = textData.tokens ?: return emptyList()
+        val result = mutableListOf<String>()
+        forEachStreamItem(textData, includeNonWordTokens) { result.add(it) }
+        return result
+    }
+
+    private fun appendStream(out: Appendable, textData: KrillTextData, includeNonWordTokens: Boolean) {
+        var first = true
+        forEachStreamItem(textData, includeNonWordTokens) { item ->
+            if (!first) {
+                out.append(",")
+            }
+            out.append(item)
+            first = false
+        }
+    }
+
+    private fun forEachStreamItem(
+        textData: KrillTextData,
+        includeNonWordTokens: Boolean,
+        emit: (String) -> Unit
+    ) {
+        val rawTokens = textData.tokens ?: return
         val text = textData.textContent ?: NonBmpString("")
         val sentences = textData.sentences ?: emptyArray()
         val tokens: List<Span> = if (includeNonWordTokens || text.length == 0) {
@@ -401,9 +430,8 @@
         }
         if (tokens.isEmpty()) {
             LOGGER.fine("No tokens remained for ${textData.textId} after filtering non-word tokens")
-            return emptyList()
+            return
         }
-        val result = mutableListOf<String>()
         data class FoundryMorphoData(
             val foundry: String,
             val prefix: String?,
@@ -787,10 +815,8 @@
             // Surface form (always last)
             tokenAnnotations.add(jsonString("s:${surfaceForm.escapeKrillValue()}"))
 
-            result.add(jsonArray(tokenAnnotations))
+            emit(jsonArray(tokenAnnotations))
         }
-
-        return result
     }
 
     private fun lowerBoundTokenFrom(tokens: List<Span>, target: Int): Int {
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
index a703331..94a0af7 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
@@ -1,5 +1,6 @@
 package de.ids_mannheim.korapxmltools
 
+import de.ids_mannheim.korapxmltools.formatters.KrillJsonGenerator
 import net.jpountz.lz4.LZ4FrameInputStream
 import org.junit.After
 import org.junit.AfterClass
@@ -692,6 +693,50 @@
     }
 
     @Test
+    fun krillGenerateToMatchesGenerate() {
+        val textData = KrillJsonGenerator.KrillTextData(
+            textId = "TEST_DOC.1",
+            textContent = NonBmpString("Alpha beta."),
+            headerMetadata = mutableMapOf("title" to "Synthetic test")
+        ).apply {
+            tokens = arrayOf(
+                KorapXmlTool.Span(0, 5),
+                KorapXmlTool.Span(6, 10),
+                KorapXmlTool.Span(10, 11)
+            )
+            sentences = arrayOf(KorapXmlTool.Span(0, 11))
+            morphoByFoundry["spacy"] = mutableMapOf(
+                "0-5" to KorapXmlTool.MorphoSpan(lemma = "Alpha", upos = "PROPN", xpos = "NE"),
+                "6-10" to KorapXmlTool.MorphoSpan(lemma = "beta", upos = "NOUN", xpos = "NN")
+            )
+            structureSpans.add(
+                KrillJsonGenerator.StructureSpan(
+                    layer = "dereko/s:p",
+                    from = 0,
+                    to = 11,
+                    tokenFrom = 0,
+                    tokenTo = 3,
+                    depth = 1
+                )
+            )
+        }
+
+        val corpusMetadata = mutableMapOf<String, MutableMap<String, Any>>(
+            "TEST" to mutableMapOf("publisher" to "Publisher")
+        )
+        val docMetadata = mutableMapOf<String, MutableMap<String, Any>>(
+            "TEST_DOC" to mutableMapOf("docTitle" to "Doc title")
+        )
+
+        val generated = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens = true)
+        val streamed = buildString {
+            KrillJsonGenerator.generateTo(this, textData, corpusMetadata, docMetadata, includeNonWordTokens = true)
+        }
+
+        assertEquals(generated, streamed)
+    }
+
+    @Test
     fun testProbabilitySortingInKrillJsonOutput() {
         // Test that multiple POS annotations are sorted by descending probability in Krill JSON output
         // Use the base sample ZIP which should contain POS annotations with probabilities