Use thread pool for gzipping krill output

Change-Id: Iff2a08e5e7e840bb0760d4f6d307451b99a920a7
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 8708dd8..0178fcc 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -688,13 +688,25 @@
         } catch (_: Exception) {}
     }
 
+    // Data class to hold compressed Krill JSON ready for TAR writing
+    data class CompressedKrillData(
+        val textId: String,
+        val fileName: String,
+        val compressedBytes: ByteArray
+    )
+
     val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
+    val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
+    val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
     val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
     val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
     val expectedFoundries: MutableSet<String> = mutableSetOf("base")
     val processedFoundries: MutableSet<String> = mutableSetOf()
     var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
 
+    // Compression thread pool for parallel GZIP compression
+    var compressionExecutor: java.util.concurrent.ExecutorService? = null
+
     // Inventory-based incremental output
     // Per-ZIP inventory: which texts each ZIP should contain
     val zipInventory: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
@@ -1043,25 +1055,87 @@
                 // Output remaining texts (these weren't output incrementally, possibly incomplete)
                 // Copy keys to avoid ConcurrentModificationException if scanner is still running
                 val remainingKeys = krillData.keys.sortedWith(this::compareTextIds)
-                remainingKeys.forEach { textId ->
-                    val textData = krillData.remove(textId) ?: return@forEach  // Skip if already removed by scanner
-                    val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
-
-                    // Build expected foundries from inventory: which ZIPs contain this text?
-                    val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
-                        .flatMap { zipPath ->
-                            val foundry = getFoundryFromZipFileName(File(zipPath).name)
-                            if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+                
+                // Phase 1: Submit all remaining texts for parallel compression
+                if (compressionExecutor != null && !compressionExecutor!!.isShutdown) {
+                    LOGGER.info("Submitting ${remainingKeys.size} remaining texts for parallel compression")
+                    remainingKeys.forEach { textId ->
+                        val textData = krillData[textId]
+                        if (textData != null && !krillCompressedData.containsKey(textId)) {
+                            compressionExecutor!!.submit {
+                                compressKrillText(textId, textData)
+                            }
                         }
-                        .toSet()
-
-                    if (!textFoundries.containsAll(expectedForThisText)) {
-                        LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
                     }
                     
-                    outputKrillText(textId, textData)
-                    // Continue stepping the same progress bar
-                    incrementalProgressBar?.step()
+                    // Wait for compressions to complete
+                    compressionExecutor!!.shutdown()
+                    try {
+                        var waitTime = 0L
+                        while (!compressionExecutor!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
+                            waitTime += 5
+                            val pending = remainingKeys.size - krillCompressedData.size
+                            LOGGER.info("Compressing remaining texts... ($pending left, ${waitTime}s elapsed)")
+                        }
+                        LOGGER.info("All remaining texts compressed")
+                    } catch (e: InterruptedException) {
+                        LOGGER.warning("Interrupted while compressing remaining texts")
+                        Thread.currentThread().interrupt()
+                    }
+                }
+                
+                // Phase 2: Write compressed data to TAR sequentially
+                remainingKeys.forEach { textId ->
+                    val textData = krillData.remove(textId) ?: return@forEach  // Skip if already removed
+                    val compressedData = krillCompressedData.remove(textId)
+                    
+                    if (compressedData != null) {
+                        // Already compressed - just write to TAR
+                        val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
+                        val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+                            .flatMap { zipPath ->
+                                val foundry = getFoundryFromZipFileName(File(zipPath).name)
+                                if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+                            }
+                            .toSet()
+
+                        if (!textFoundries.containsAll(expectedForThisText)) {
+                            LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+                        }
+                        
+                        // Write pre-compressed data directly to TAR
+                        try {
+                            synchronized(krillTarOutputStream!!) {
+                                val tarEntry = TarArchiveEntry(compressedData.fileName)
+                                tarEntry.size = compressedData.compressedBytes.size.toLong()
+                                krillTarOutputStream!!.putArchiveEntry(tarEntry)
+                                krillTarOutputStream!!.write(compressedData.compressedBytes)
+                                krillTarOutputStream!!.closeArchiveEntry()
+                            }
+                            krillOutputCount.incrementAndGet()
+                            incrementalProgressBar?.step()
+                        } catch (e: Exception) {
+                            LOGGER.severe("ERROR writing $textId to TAR: ${e.message}")
+                            e.printStackTrace()
+                        }
+                    } else {
+                        // Fallback: compress inline if not in compressed cache (shouldn't happen)
+                        LOGGER.warning("Text $textId not in compressed cache, compressing inline")
+                        val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
+                        val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+                            .flatMap { zipPath ->
+                                val foundry = getFoundryFromZipFileName(File(zipPath).name)
+                                if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+                            }
+                            .toSet()
+
+                        if (!textFoundries.containsAll(expectedForThisText)) {
+                            LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+                        }
+                        
+                        outputKrillText(textId, textData)
+                        incrementalProgressBar?.step()
+                    }
                 }
 
                 // Acquire lock before closing stream to prevent concurrent scanner access
@@ -3507,6 +3581,15 @@
         if (outputFormat != OutputFormat.KRILL || krillTarOutputStream == null) return
 
         shutdownIncrementalWriter = false
+        
+        // Start compression thread pool (parallel compression)
+        val compressionThreads = maxThreads.coerceAtLeast(2)
+        compressionExecutor = java.util.concurrent.Executors.newFixedThreadPool(compressionThreads) { r ->
+            Thread(r, "KrillCompressor-${Thread.currentThread().id}")
+        }
+        LOGGER.info("Started compression thread pool with $compressionThreads threads")
+        
+        // Start single writer thread (sequential TAR writing)
         incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
             Thread(r, "KrillWriterThread")
         }
@@ -3524,51 +3607,142 @@
         LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
     }
 
+    // Compress text data in parallel, then write to TAR sequentially
+    private fun compressKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+        try {
+            val corpusSigle = textId.substringBefore('_')
+            val docSigle = textId.substringBeforeLast('.')
+
+            // Apply corpus-level metadata
+            corpusMetadata[corpusSigle]?.forEach { (key, value) ->
+                if (!textData.headerMetadata.containsKey(key)) {
+                    textData.headerMetadata[key] = value
+                }
+            }
+
+            // Apply doc-level metadata
+            docMetadata[docSigle]?.forEach { (key, value) ->
+                if (!textData.headerMetadata.containsKey(key)) {
+                    textData.headerMetadata[key] = value
+                }
+            }
+
+            val json = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
+            
+            // Choose compression format based on --lz4 flag
+            val (jsonFileName, compressedData) = if (useLz4) {
+                val fileName = textId.replace("_", "-").replace(".", "-") + ".json.lz4"
+                val jsonBytes = json.toByteArray(Charsets.UTF_8)
+                val byteOut = ByteArrayOutputStream()
+                net.jpountz.lz4.LZ4FrameOutputStream(byteOut).use { lz4Out ->
+                    lz4Out.write(jsonBytes)
+                }
+                Pair(fileName, byteOut.toByteArray())
+            } else {
+                // Use fast GZIP (level 1) for better performance
+                val fileName = textId.replace("_", "-").replace(".", "-") + ".json.gz"
+                val jsonBytes = json.toByteArray(Charsets.UTF_8)
+                val byteOut = ByteArrayOutputStream()
+                val deflater = java.util.zip.Deflater(1, true) // level 1, nowrap=true for raw deflate
+                java.util.zip.DeflaterOutputStream(byteOut, deflater).use { deflateOut ->
+                    // Write GZIP header
+                    byteOut.write(byteArrayOf(0x1f, 0x8b.toByte(), 8, 0, 0, 0, 0, 0, 0, 0))
+                    val crc = java.util.zip.CRC32()
+                    crc.update(jsonBytes)
+                    deflateOut.write(jsonBytes)
+                    deflateOut.finish()
+                    // Write GZIP trailer (CRC32 and uncompressed size)
+                    val trailer = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
+                    trailer.putInt(crc.value.toInt())
+                    trailer.putInt(jsonBytes.size)
+                    byteOut.write(trailer.array())
+                }
+                Pair(fileName, byteOut.toByteArray())
+            }
+
+            // Store compressed data for sequential TAR writing
+            krillCompressedData[textId] = CompressedKrillData(textId, jsonFileName, compressedData)
+            LOGGER.finer("Compressed text $textId (${compressedData.size} bytes)")
+        } catch (e: Exception) {
+            LOGGER.severe("ERROR compressing $textId: ${e.message}")
+            e.printStackTrace()
+        }
+    }
+
     // Scan all texts and output any that are complete
     private fun scanAndOutputCompleteTexts(forceScan: Boolean = false) {
         if ((shutdownIncrementalWriter && !forceScan) || !tarStreamOpen) return
 
         if (expectedTextOrder.isEmpty()) return
 
+        // Phase 1: Find ALL ready texts (order doesn't matter for output)
         val readyTexts = mutableListOf<String>()
-        var outputCount = 0
-        while (nextTextOrderIndex < expectedTextOrder.size) {
-            val textId = expectedTextOrder[nextTextOrderIndex]
-            // Check if shutdown was requested, thread was interrupted, or stream was closed
-            if (shutdownIncrementalWriter || Thread.currentThread().isInterrupted || !tarStreamOpen) break
-
+        
+        for (textId in krillData.keys) {
             // Skip if already output
             if (outputTexts.contains(textId)) continue
-
-            // Find all ZIPs that should contain this text
+            
+            // Check if this text has all expected data
             val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
-
-            // Check if all relevant ZIPs have processed this text
             val allProcessed = relevantZips.all { path ->
                 processedTextsPerZip[path]?.contains(textId) == true
             }
-
-            if (!allProcessed || !krillData.containsKey(textId)) {
-                // Maintain global ordering across foundries: stop at first not-ready text
-                break
+            
+            if (allProcessed) {
+                readyTexts.add(textId)
+                
+                // Start compression immediately if not already started
+                if (!krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
+                    val textData = krillData[textId]
+                    if (textData != null) {
+                        val future = compressionExecutor?.submit {
+                            compressKrillText(textId, textData)
+                        }
+                        if (future != null) {
+                            krillCompressionFutures[textId] = future
+                        }
+                    }
+                }
             }
-
-            readyTexts.add(textId)
-            nextTextOrderIndex++
         }
 
-        readyTexts.sortWith { a, b -> compareTextIds(a, b) }
-
-        // Output ready texts in month-aware order (already in order because allTexts is sorted month-aware)
+        // Phase 2: Write compressed texts to TAR (only those already compressed, no waiting)
+        // Event-driven: if compression is done, write it; otherwise skip and try next scan
+        var outputCount = 0
         for (textId in readyTexts) {
-            val textData = krillData[textId] ?: continue
-            val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+            // Check if already compressed (event-based: poll, don't wait)
+            val compressedData = krillCompressedData[textId]
+            
+            // If not yet compressed, skip for now (will be picked up in next scan)
+            if (compressedData == null) {
+                // Check if future is done without blocking
+                val future = krillCompressionFutures[textId]
+                if (future?.isDone == true) {
+                    // Compression just finished but data not yet in map - race condition, will catch next scan
+                    LOGGER.finest("Compression finished for $textId but data not yet stored, retrying next scan")
+                }
+                continue
+            }
+            
+            // Remove future since compression is complete
+            krillCompressionFutures.remove(textId)
+            
             if (outputTexts.add(textId)) {
+                val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
                 tarStreamLock.lock()
                 try {
                     if (tarStreamOpen) {
                         try {
-                            outputKrillText(textId, textData)
+                            // Write to TAR
+                            synchronized(krillTarOutputStream!!) {
+                                val tarEntry = TarArchiveEntry(compressedData.fileName)
+                                tarEntry.size = compressedData.compressedBytes.size.toLong()
+                                krillTarOutputStream!!.putArchiveEntry(tarEntry)
+                                krillTarOutputStream!!.write(compressedData.compressedBytes)
+                                krillTarOutputStream!!.closeArchiveEntry()
+                            }
+                            
+                            val count = krillOutputCount.incrementAndGet()
                             incrementalProgressBar?.step()
                             outputCount++
                             LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
@@ -3583,9 +3757,13 @@
                 }
             }
 
+            // Clean up all data structures
             krillData.remove(textId)
+            krillCompressedData.remove(textId)
+            krillCompressionFutures.remove(textId)
 
             // Clean up tracking data for this text
+            val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
             relevantZips.forEach { path ->
                 zipInventory[path]?.remove(textId)
                 processedTextsPerZip[path]?.remove(textId)
@@ -3629,9 +3807,14 @@
             val remainingBeforeScan = krillData.size
             LOGGER.info("Doing final scan for remaining texts...")
             scanAndOutputCompleteTexts(forceScan = true)
+            
+            // Note: Don't shutdown compressionExecutor here - let the main finalization code
+            // handle remaining texts with parallel compression, then shut it down there
+            
             LOGGER.info("Final scan completed, output ${remainingBeforeScan - krillData.size} texts")
 
             incrementalOutputScheduler = null
+            // Don't null compressionExecutor - it will be used for final compression
         }
     }