Fix late expunging of complete texts
Change-Id: Ibdec0a0c2e840279fe48dd172c7061761d4a13fe
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 174ddb2..83bc07e 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1176,11 +1176,18 @@
val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
+ val readyKrillTextIds: java.util.concurrent.BlockingQueue<String> = java.util.concurrent.LinkedBlockingQueue()
val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
val expectedFoundries: MutableSet<String> = mutableSetOf("base")
val processedFoundries: MutableSet<String> = mutableSetOf()
var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
+ private val krillPeakRawPending = AtomicInteger(0)
+ private val krillPeakCompressedPending = AtomicInteger(0)
+ private val krillPeakCompressionInFlight = AtomicInteger(0)
+ private val krillPeakReadyQueueDepth = AtomicInteger(0)
+ @Volatile private var lastKrillStateLogNanos = 0L
+ private val krillStateLogIntervalNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30)
// Compression thread pool for parallel GZIP compression
var compressionExecutor: java.util.concurrent.ExecutorService? = null
@@ -1232,12 +1239,18 @@
expectedFoundries.add("base")
processedFoundries.clear()
krillOutputCount.set(0)
+ krillPeakRawPending.set(0)
+ krillPeakCompressedPending.set(0)
+ krillPeakCompressionInFlight.set(0)
+ krillPeakReadyQueueDepth.set(0)
+ lastKrillStateLogNanos = 0L
krillData.clear()
corpusMetadata.clear()
docMetadata.clear()
zipInventory.clear()
processedTextsPerZip.clear()
outputTexts.clear()
+ readyKrillTextIds.clear()
}
// Initialize priority-based entry executor
@@ -2015,16 +2028,11 @@
// Check if all expected foundries have completed this text
val expectedFoundries = expectedFoundriesPerText[textId] ?: emptySet()
if (completedFoundries.containsAll(expectedFoundries)) {
- // All foundries complete - compress immediately in this worker thread
- // This avoids queueing overhead and ensures compression happens ASAP
+ // All foundries complete - start compression immediately so the writer
+ // thread can output and free the text without waiting for a global scan.
val textData = krillData[textId]
- if (textData != null && !krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
- // Mark as being compressed to prevent duplicate compression
- krillCompressionFutures[textId] = java.util.concurrent.CompletableFuture.completedFuture(null)
-
- // Compress inline in this worker thread (we have plenty of workers)
- compressKrillText(textId, textData)
- LOGGER.fine("Compressed completed text inline: $textId")
+ if (textData != null) {
+ enqueueKrillCompression(textId, textData)
}
}
} catch (t: Throwable) {
@@ -3391,6 +3399,48 @@
}
}
+ private fun updatePeakCounter(counter: AtomicInteger, value: Int) {
+ while (true) {
+ val current = counter.get()
+ if (value <= current) return
+ if (counter.compareAndSet(current, value)) return
+ }
+ }
+
+ private fun logKrillPipelineState(reason: String, force: Boolean = false) {
+ if (outputFormat != OutputFormat.KRILL) return
+
+ val now = System.nanoTime()
+ if (!force && now - lastKrillStateLogNanos < krillStateLogIntervalNanos) return
+
+ val rawPending = krillData.size
+ val compressedPending = krillCompressedData.size
+ val compressionInFlight = krillCompressionFutures.size
+ val readyQueueDepth = readyKrillTextIds.size
+
+ updatePeakCounter(krillPeakRawPending, rawPending)
+ updatePeakCounter(krillPeakCompressedPending, compressedPending)
+ updatePeakCounter(krillPeakCompressionInFlight, compressionInFlight)
+ updatePeakCounter(krillPeakReadyQueueDepth, readyQueueDepth)
+
+ lastKrillStateLogNanos = now
+
+ val rt = Runtime.getRuntime()
+ val usedMB = (rt.totalMemory() - rt.freeMemory()) / (1024 * 1024)
+ val totalMB = rt.totalMemory() / (1024 * 1024)
+ val maxMB = rt.maxMemory() / (1024 * 1024)
+ val outputCount = krillOutputCount.get()
+ val totalTexts = expectedTextOrder.size
+
+ LOGGER.info(
+ "KRILL-STATS reason=$reason output=$outputCount/$totalTexts " +
+ "heapMB{used=$usedMB,total=$totalMB,max=$maxMB} " +
+ "queues{raw=$rawPending,compressed=$compressedPending,inFlight=$compressionInFlight,ready=$readyQueueDepth} " +
+ "peaks{raw=${krillPeakRawPending.get()},compressed=${krillPeakCompressedPending.get()}," +
+ "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}"
+ )
+ }
+
private fun conlluOutput(foundry: String, docId: String): StringBuilder {
var token_index = 0
var real_token_index = 0
@@ -5388,6 +5438,7 @@
}, 500, 500, java.util.concurrent.TimeUnit.MILLISECONDS)
LOGGER.info("Incremental writer scheduler started (scanning every 500ms)")
+ logKrillPipelineState("writer-start", force = true)
}
// Compress text data in parallel, then write to TAR sequentially
@@ -5475,21 +5526,114 @@
// large corpora: without this, raw data and compressed bytes both sit in
// memory simultaneously until the incremental writer catches up.
krillData.remove(textId)
+ readyKrillTextIds.put(textId)
+ updatePeakCounter(krillPeakCompressedPending, krillCompressedData.size)
+ updatePeakCounter(krillPeakReadyQueueDepth, readyKrillTextIds.size)
LOGGER.finer("Compressed text $textId (${compressedData.size} bytes), released raw data")
} catch (e: Exception) {
LOGGER.severe("ERROR compressing $textId: ${e.message}")
e.printStackTrace()
+ } finally {
+ krillCompressionFutures.remove(textId)
}
}
+ private fun enqueueKrillCompression(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+ if (krillCompressedData.containsKey(textId)) return
+
+ val executor = compressionExecutor
+ val future = if (executor != null && !executor.isShutdown) {
+ executor.submit {
+ compressKrillText(textId, textData)
+ }
+ } else {
+ java.util.concurrent.CompletableFuture.runAsync {
+ compressKrillText(textId, textData)
+ }
+ }
+
+ val existing = krillCompressionFutures.putIfAbsent(textId, future)
+ if (existing != null) {
+ future.cancel(false)
+ } else {
+ updatePeakCounter(krillPeakRawPending, krillData.size)
+ updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
+ }
+ }
+
+ private fun writeReadyKrillTexts(textIds: Collection<String>): Int {
+ var outputCount = 0
+
+ for (textId in textIds) {
+ val compressedData = krillCompressedData[textId] ?: continue
+
+ if (outputTexts.add(textId)) {
+ val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ tarStreamLock.lock()
+ try {
+ if (tarStreamOpen) {
+ try {
+ synchronized(krillTarOutputStream!!) {
+ val tarEntry = TarArchiveEntry(compressedData.fileName)
+ tarEntry.size = compressedData.compressedBytes.size.toLong()
+ krillTarOutputStream!!.putArchiveEntry(tarEntry)
+ krillTarOutputStream!!.write(compressedData.compressedBytes)
+ krillTarOutputStream!!.closeArchiveEntry()
+ }
+
+ krillOutputCount.incrementAndGet()
+ incrementalProgressBar?.step()
+ outputCount++
+ LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} raw pending)")
+ } catch (e: IOException) {
+ LOGGER.warning("Cannot output text $textId: stream closed")
+ tarStreamOpen = false
+ break
+ }
+ }
+ } finally {
+ tarStreamLock.unlock()
+ }
+ }
+
+ krillData.remove(textId)
+ krillCompressedData.remove(textId)
+ krillCompressionFutures.remove(textId)
+
+ val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
+ relevantZips.forEach { path ->
+ zipInventory[path]?.remove(textId)
+ processedTextsPerZip[path]?.remove(textId)
+ }
+ }
+
+ return outputCount
+ }
+
// Scan all texts and output any that are complete
private fun scanAndOutputCompleteTexts(forceScan: Boolean = false) {
if ((shutdownIncrementalWriter && !forceScan) || !tarStreamOpen) return
if (expectedTextOrder.isEmpty()) return
+ updatePeakCounter(krillPeakRawPending, krillData.size)
+ updatePeakCounter(krillPeakCompressedPending, krillCompressedData.size)
+ updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
+ updatePeakCounter(krillPeakReadyQueueDepth, readyKrillTextIds.size)
+
+ val readyFromCompression = mutableListOf<String>()
+ while (true) {
+ val textId = readyKrillTextIds.poll() ?: break
+ readyFromCompression.add(textId)
+ }
+
+ val writtenFromReadyQueue = if (readyFromCompression.isNotEmpty()) {
+ writeReadyKrillTexts(readyFromCompression)
+ } else {
+ 0
+ }
+
// Phase 1: Find ALL ready texts (order doesn't matter for output)
- val readyTexts = mutableListOf<String>()
var checkedCount = 0
var newFlowCount = 0
var oldFlowCount = 0
@@ -5518,92 +5662,21 @@
}
if (allProcessed) {
- readyTexts.add(textId)
-
- // Start compression immediately if not already started
- if (!krillCompressedData.containsKey(textId) && !krillCompressionFutures.containsKey(textId)) {
- val textData = krillData[textId]
- if (textData != null) {
- val future = compressionExecutor?.submit {
- compressKrillText(textId, textData)
- }
- if (future != null) {
- krillCompressionFutures[textId] = future
- }
- }
+ val textData = krillData[textId]
+ if (textData != null) {
+ enqueueKrillCompression(textId, textData)
}
}
}
- // Phase 2: Write compressed texts to TAR (only those already compressed, no waiting)
- // Event-driven: if compression is done, write it; otherwise skip and try next scan
- var outputCount = 0
- var waitingForCompression = 0
- for (textId in readyTexts) {
- // Check if already compressed (event-based: poll, don't wait)
- val compressedData = krillCompressedData[textId]
-
- // If not yet compressed, skip for now (will be picked up in next scan)
- if (compressedData == null) {
- waitingForCompression++
- // Check if future is done without blocking
- val future = krillCompressionFutures[textId]
- if (future?.isDone == true) {
- // Compression just finished but data not yet in map - race condition, will catch next scan
- LOGGER.finest("Compression finished for $textId but data not yet stored, retrying next scan")
- }
- continue
- }
-
- // Remove future since compression is complete
- krillCompressionFutures.remove(textId)
-
- if (outputTexts.add(textId)) {
- val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
- tarStreamLock.lock()
- try {
- if (tarStreamOpen) {
- try {
- // Write to TAR
- synchronized(krillTarOutputStream!!) {
- val tarEntry = TarArchiveEntry(compressedData.fileName)
- tarEntry.size = compressedData.compressedBytes.size.toLong()
- krillTarOutputStream!!.putArchiveEntry(tarEntry)
- krillTarOutputStream!!.write(compressedData.compressedBytes)
- krillTarOutputStream!!.closeArchiveEntry()
- }
-
- val count = krillOutputCount.incrementAndGet()
- incrementalProgressBar?.step()
- outputCount++
- LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
- } catch (e: IOException) {
- LOGGER.warning("Cannot output text $textId: stream closed")
- tarStreamOpen = false
- break
- }
- }
- } finally {
- tarStreamLock.unlock()
- }
- }
-
- // Clean up all data structures
- krillData.remove(textId)
- krillCompressedData.remove(textId)
- krillCompressionFutures.remove(textId)
-
- // Clean up tracking data for this text
- val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
- relevantZips.forEach { path ->
- zipInventory[path]?.remove(textId)
- processedTextsPerZip[path]?.remove(textId)
- }
+ if (writtenFromReadyQueue > 0 || !readyFromCompression.isNullOrEmpty()) {
+ LOGGER.fine(
+ "Scan: checked=$checkedCount, newFlow=$newFlowCount, oldFlow=$oldFlowCount, " +
+ "readyQueued=${readyFromCompression.size}, written=$writtenFromReadyQueue, rawPending=${krillData.size}, compressedPending=${krillCompressedData.size}"
+ )
}
- if (outputCount > 0 || readyTexts.isNotEmpty()) {
- LOGGER.fine("Scan: checked=$checkedCount, newFlow=$newFlowCount, oldFlow=$oldFlowCount, ready=${readyTexts.size}, compressed=$outputCount, waitingCompression=$waitingForCompression, pending=${krillData.size}")
- }
+ logKrillPipelineState(if (forceScan) "scan-final" else "scan")
}
// Stop the incremental writer thread
@@ -5638,6 +5711,7 @@
val remainingBeforeScan = krillData.size
LOGGER.info("Doing final scan for remaining texts...")
scanAndOutputCompleteTexts(forceScan = true)
+ logKrillPipelineState("writer-stop", force = true)
// Note: Don't shutdown compressionExecutor here - let the main finalization code
// handle remaining texts with parallel compression, then shut it down there
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
index c859c64..180b332 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
@@ -74,6 +74,7 @@
extractDir.deleteRecursively()
}
}
+
}
private val outContent = ByteArrayOutputStream(10000000)
@@ -236,6 +237,26 @@
}
@Test
+ fun krillHandlesSparseFoundryWithoutBlockingOtherTexts() {
+ val baseZip = loadResource("ndy_sample.zip").path
+ val sparseCmcZip = loadResource("ndy_sample.cmc.zip").path
+ val emptyGenderZip = loadResource("ndy_sample.gender.zip").path
+
+ val generatedTar = ensureKrillTar("ndy_sparse_foundries", "ndy_sample.krill.tar") { outputDir ->
+ arrayOf("-t", "krill", "-q", "-D", outputDir.path, baseZip, sparseCmcZip, emptyGenderZip)
+ }
+ assertTrue(generatedTar.exists())
+
+ val jsonByFile = readKrillJson(generatedTar)
+ assertEquals(
+ setOf("NDY-115-005255.json", "NDY-266-006701.json", "NDY-269-017376.json"),
+ jsonByFile.keys,
+ "Sparse and empty foundry ZIPs must not block base-only texts from being written"
+ )
+ assertFalse(jsonByFile.values.any { it.contains("\"gender\"") }, "The empty gender ZIP must not add foundry expectations or block output")
+ }
+
+ @Test
fun krillOutputContainsBaseStructureSpans() {
val baseZip = loadResource("wud24_sample.zip").path
val spacyZip = loadResource("wud24_sample.spacy.zip").path
diff --git a/app/src/test/resources/ndy_sample.cmc.zip b/app/src/test/resources/ndy_sample.cmc.zip
new file mode 100644
index 0000000..d48d1aa
--- /dev/null
+++ b/app/src/test/resources/ndy_sample.cmc.zip
Binary files differ
diff --git a/app/src/test/resources/ndy_sample.gender.zip b/app/src/test/resources/ndy_sample.gender.zip
new file mode 100644
index 0000000..15cb0ec
--- /dev/null
+++ b/app/src/test/resources/ndy_sample.gender.zip
Binary files differ