Use thread pool for gzipping krill output
Change-Id: Iff2a08e5e7e840bb0760d4f6d307451b99a920a7
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 8708dd8..0178fcc 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -688,13 +688,25 @@
} catch (_: Exception) {}
}
+ // Data class to hold compressed Krill JSON ready for TAR writing
+ data class CompressedKrillData(
+ val textId: String,
+ val fileName: String,
+ val compressedBytes: ByteArray
+ )
+
val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
+ val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
+ val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
val expectedFoundries: MutableSet<String> = mutableSetOf("base")
val processedFoundries: MutableSet<String> = mutableSetOf()
var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
+ // Compression thread pool for parallel GZIP compression
+ var compressionExecutor: java.util.concurrent.ExecutorService? = null
+
// Inventory-based incremental output
// Per-ZIP inventory: which texts each ZIP should contain
val zipInventory: ConcurrentHashMap<String, MutableSet<String>> = ConcurrentHashMap()
@@ -1043,25 +1055,87 @@
// Output remaining texts (these weren't output incrementally, possibly incomplete)
// Copy keys to avoid ConcurrentModificationException if scanner is still running
val remainingKeys = krillData.keys.sortedWith(this::compareTextIds)
- remainingKeys.forEach { textId ->
- val textData = krillData.remove(textId) ?: return@forEach // Skip if already removed by scanner
- val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
-
- // Build expected foundries from inventory: which ZIPs contain this text?
- val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
- .flatMap { zipPath ->
- val foundry = getFoundryFromZipFileName(File(zipPath).name)
- if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+
+ // Phase 1: Submit all remaining texts for parallel compression
+ if (compressionExecutor != null && !compressionExecutor!!.isShutdown) {
+ LOGGER.info("Submitting ${remainingKeys.size} remaining texts for parallel compression")
+ remainingKeys.forEach { textId ->
+ val textData = krillData[textId]
+ if (textData != null && !krillCompressedData.containsKey(textId)) {
+ compressionExecutor!!.submit {
+ compressKrillText(textId, textData)
+ }
}
- .toSet()
-
- if (!textFoundries.containsAll(expectedForThisText)) {
- LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
}
- outputKrillText(textId, textData)
- // Continue stepping the same progress bar
- incrementalProgressBar?.step()
+ // Wait for compressions to complete
+ compressionExecutor!!.shutdown()
+ try {
+ var waitTime = 0L
+ while (!compressionExecutor!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
+ waitTime += 5
+ val pending = remainingKeys.size - krillCompressedData.size
+ LOGGER.info("Compressing remaining texts... ($pending left, ${waitTime}s elapsed)")
+ }
+ LOGGER.info("All remaining texts compressed")
+ } catch (e: InterruptedException) {
+ LOGGER.warning("Interrupted while compressing remaining texts")
+ Thread.currentThread().interrupt()
+ }
+ }
+
+ // Phase 2: Write compressed data to TAR sequentially
+ remainingKeys.forEach { textId ->
+ val textData = krillData.remove(textId) ?: return@forEach // Skip if already removed
+ val compressedData = krillCompressedData.remove(textId)
+
+ if (compressedData != null) {
+ // Already compressed - just write to TAR
+ val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
+ val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ .flatMap { zipPath ->
+ val foundry = getFoundryFromZipFileName(File(zipPath).name)
+ if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+ }
+ .toSet()
+
+ if (!textFoundries.containsAll(expectedForThisText)) {
+ LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+ }
+
+ // Write pre-compressed data directly to TAR
+ try {
+ synchronized(krillTarOutputStream!!) {
+ val tarEntry = TarArchiveEntry(compressedData.fileName)
+ tarEntry.size = compressedData.compressedBytes.size.toLong()
+ krillTarOutputStream!!.putArchiveEntry(tarEntry)
+ krillTarOutputStream!!.write(compressedData.compressedBytes)
+ krillTarOutputStream!!.closeArchiveEntry()
+ }
+ krillOutputCount.incrementAndGet()
+ incrementalProgressBar?.step()
+ } catch (e: Exception) {
+ LOGGER.severe("ERROR writing $textId to TAR: ${e.message}")
+ e.printStackTrace()
+ }
+ } else {
+ // Fallback: compress inline if not in compressed cache (shouldn't happen)
+ LOGGER.warning("Text $textId not in compressed cache, compressing inline")
+ val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
+ val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ .flatMap { zipPath ->
+ val foundry = getFoundryFromZipFileName(File(zipPath).name)
+ if (foundry.contains("-")) foundry.split("-") else listOf(foundry)
+ }
+ .toSet()
+
+ if (!textFoundries.containsAll(expectedForThisText)) {
+ LOGGER.warning("Outputting incomplete text $textId with foundries ${textFoundries.sorted()} (expected: ${expectedForThisText.sorted()})")
+ }
+
+ outputKrillText(textId, textData)
+ incrementalProgressBar?.step()
+ }
}
// Acquire lock before closing stream to prevent concurrent scanner access
@@ -3507,6 +3581,15 @@
if (outputFormat != OutputFormat.KRILL || krillTarOutputStream == null) return
shutdownIncrementalWriter = false
+
+ // Start compression thread pool (parallel compression)
+ val compressionThreads = maxThreads.coerceAtLeast(2)
+ compressionExecutor = java.util.concurrent.Executors.newFixedThreadPool(compressionThreads) { r ->
+ Thread(r, "KrillCompressor-${Thread.currentThread().id}")
+ }
+ LOGGER.info("Started compression thread pool with $compressionThreads threads")
+
+ // Start single writer thread (sequential TAR writing)
incrementalOutputScheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor { r ->
Thread(r, "KrillWriterThread")
}
@@ -3524,51 +3607,142 @@
LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
}
+ // Compress text data in parallel, then write to TAR sequentially
+ private fun compressKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+ try {
+ val corpusSigle = textId.substringBefore('_')
+ val docSigle = textId.substringBeforeLast('.')
+
+ // Apply corpus-level metadata
+ corpusMetadata[corpusSigle]?.forEach { (key, value) ->
+ if (!textData.headerMetadata.containsKey(key)) {
+ textData.headerMetadata[key] = value
+ }
+ }
+
+ // Apply doc-level metadata
+ docMetadata[docSigle]?.forEach { (key, value) ->
+ if (!textData.headerMetadata.containsKey(key)) {
+ textData.headerMetadata[key] = value
+ }
+ }
+
+ val json = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
+
+ // Choose compression format based on --lz4 flag
+ val (jsonFileName, compressedData) = if (useLz4) {
+ val fileName = textId.replace("_", "-").replace(".", "-") + ".json.lz4"
+ val jsonBytes = json.toByteArray(Charsets.UTF_8)
+ val byteOut = ByteArrayOutputStream()
+ net.jpountz.lz4.LZ4FrameOutputStream(byteOut).use { lz4Out ->
+ lz4Out.write(jsonBytes)
+ }
+ Pair(fileName, byteOut.toByteArray())
+ } else {
+ // Use fast GZIP (level 1) for better performance
+ val fileName = textId.replace("_", "-").replace(".", "-") + ".json.gz"
+ val jsonBytes = json.toByteArray(Charsets.UTF_8)
+ val byteOut = ByteArrayOutputStream()
+ val deflater = java.util.zip.Deflater(1, true) // level 1, nowrap=true for raw deflate
+ java.util.zip.DeflaterOutputStream(byteOut, deflater).use { deflateOut ->
+ // Write GZIP header
+ byteOut.write(byteArrayOf(0x1f, 0x8b.toByte(), 8, 0, 0, 0, 0, 0, 0, 0))
+ val crc = java.util.zip.CRC32()
+ crc.update(jsonBytes)
+ deflateOut.write(jsonBytes)
+ deflateOut.finish()
+ // Write GZIP trailer (CRC32 and uncompressed size)
+ val trailer = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
+ trailer.putInt(crc.value.toInt())
+ trailer.putInt(jsonBytes.size)
+ byteOut.write(trailer.array())
+ }
+ Pair(fileName, byteOut.toByteArray())
+ }
+
+ // Store compressed data for sequential TAR writing
+ krillCompressedData[textId] = CompressedKrillData(textId, jsonFileName, compressedData)
+ LOGGER.finer("Compressed text $textId (${compressedData.size} bytes)")
+ } catch (e: Exception) {
+ LOGGER.severe("ERROR compressing $textId: ${e.message}")
+ e.printStackTrace()
+ }
+ }
+
// Scan all texts and output any that are complete
private fun scanAndOutputCompleteTexts(forceScan: Boolean = false) {
if ((shutdownIncrementalWriter && !forceScan) || !tarStreamOpen) return
if (expectedTextOrder.isEmpty()) return
+ // Phase 1: Find ALL ready texts (order doesn't matter for output)
val readyTexts = mutableListOf<String>()
- var outputCount = 0
- while (nextTextOrderIndex < expectedTextOrder.size) {
- val textId = expectedTextOrder[nextTextOrderIndex]
- // Check if shutdown was requested, thread was interrupted, or stream was closed
- if (shutdownIncrementalWriter || Thread.currentThread().isInterrupted || !tarStreamOpen) break
-
+
+ for (textId in krillData.keys) {
// Skip if already output
if (outputTexts.contains(textId)) continue
-
- // Find all ZIPs that should contain this text
+
+ // Check if this text has all expected data
val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
-
- // Check if all relevant ZIPs have processed this text
val allProcessed = relevantZips.all { path ->
processedTextsPerZip[path]?.contains(textId) == true
}
-
- if (!allProcessed || !krillData.containsKey(textId)) {
- // Maintain global ordering across foundries: stop at first not-ready text
- break
+
+ if (allProcessed) {
+ readyTexts.add(textId)
+
+ // Start compression immediately if not already started
+ if (!krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
+ val textData = krillData[textId]
+ if (textData != null) {
+ val future = compressionExecutor?.submit {
+ compressKrillText(textId, textData)
+ }
+ if (future != null) {
+ krillCompressionFutures[textId] = future
+ }
+ }
+ }
}
-
- readyTexts.add(textId)
- nextTextOrderIndex++
}
- readyTexts.sortWith { a, b -> compareTextIds(a, b) }
-
- // Output ready texts in month-aware order (already in order because allTexts is sorted month-aware)
+ // Phase 2: Write compressed texts to TAR (only those already compressed, no waiting)
+ // Event-driven: if compression is done, write it; otherwise skip and try next scan
+ var outputCount = 0
for (textId in readyTexts) {
- val textData = krillData[textId] ?: continue
- val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ // Check if already compressed (event-based: poll, don't wait)
+ val compressedData = krillCompressedData[textId]
+
+ // If not yet compressed, skip for now (will be picked up in next scan)
+ if (compressedData == null) {
+ // Check if future is done without blocking
+ val future = krillCompressionFutures[textId]
+ if (future?.isDone == true) {
+ // Compression just finished but data not yet in map - race condition, will catch next scan
+ LOGGER.finest("Compression finished for $textId but data not yet stored, retrying next scan")
+ }
+ continue
+ }
+
+ // Remove future since compression is complete
+ krillCompressionFutures.remove(textId)
+
if (outputTexts.add(textId)) {
+ val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
tarStreamLock.lock()
try {
if (tarStreamOpen) {
try {
- outputKrillText(textId, textData)
+ // Write to TAR
+ synchronized(krillTarOutputStream!!) {
+ val tarEntry = TarArchiveEntry(compressedData.fileName)
+ tarEntry.size = compressedData.compressedBytes.size.toLong()
+ krillTarOutputStream!!.putArchiveEntry(tarEntry)
+ krillTarOutputStream!!.write(compressedData.compressedBytes)
+ krillTarOutputStream!!.closeArchiveEntry()
+ }
+
+ val count = krillOutputCount.incrementAndGet()
incrementalProgressBar?.step()
outputCount++
LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
@@ -3583,9 +3757,13 @@
}
}
+ // Clean up all data structures
krillData.remove(textId)
+ krillCompressedData.remove(textId)
+ krillCompressionFutures.remove(textId)
// Clean up tracking data for this text
+ val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
relevantZips.forEach { path ->
zipInventory[path]?.remove(textId)
processedTextsPerZip[path]?.remove(textId)
@@ -3629,9 +3807,14 @@
val remainingBeforeScan = krillData.size
LOGGER.info("Doing final scan for remaining texts...")
scanAndOutputCompleteTexts(forceScan = true)
+
+ // Note: Don't shutdown compressionExecutor here - let the main finalization code
+ // handle remaining texts with parallel compression, then shut it down there
+
LOGGER.info("Final scan completed, output ${remainingBeforeScan - krillData.size} texts")
incrementalOutputScheduler = null
+ // Don't null compressionExecutor - it will be used for final compression
}
}