Stream Krill tail output
Change-Id: I15a971b891004ad3df124946f515d3fd76c2d368
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 5403f5b..7ff2526 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1681,9 +1681,7 @@
remainingRawKeys.forEach { textId ->
val textData = krillData[textId]
if (textData != null && !krillCompressedData.containsKey(textId)) {
- compressionExecutor!!.submit {
- compressKrillText(textId, textData)
- }
+ enqueueKrillCompression(textId, textData)
}
}
@@ -1693,10 +1691,18 @@
var waitTime = 0L
while (!compressionExecutor!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
waitTime += 5
- val pending = remainingKeys.size - krillCompressedData.size
+ val pending = remainingRawKeys.count { !krillCompressedData.containsKey(it) }
LOGGER.info("Compressing remaining texts... ($pending left, ${waitTime}s elapsed)")
}
- LOGGER.info("All remaining texts compressed")
+ val missingCompressed = remainingRawKeys.filterNot { krillCompressedData.containsKey(it) }
+ if (missingCompressed.isEmpty()) {
+ LOGGER.info("All remaining texts compressed")
+ } else {
+ LOGGER.warning(
+ "Parallel compression finished with ${missingCompressed.size} texts still missing from cache: " +
+ missingCompressed.joinToString(",")
+ )
+ }
} catch (e: InterruptedException) {
LOGGER.warning("Interrupted while compressing remaining texts")
Thread.currentThread().interrupt()
@@ -1733,14 +1739,15 @@
krillTarOutputStream!!.closeArchiveEntry()
}
krillOutputCount.incrementAndGet()
+ noteKrillOutputProgress()
incrementalProgressBar?.step()
} catch (e: Exception) {
LOGGER.severe("ERROR writing $textId to TAR: ${e.message}")
e.printStackTrace()
}
} else if (textData != null) {
- // Fallback: compress inline if not in compressed cache (shouldn't happen)
- LOGGER.warning("Text $textId not in compressed cache, compressing inline")
+ // Fallback: retry inline if parallel compression failed to populate the cache.
+ LOGGER.warning("Text $textId not in compressed cache after parallel compression, compressing inline")
val textFoundries = textData.morphoByFoundry.keys.toSet() + setOf("base")
val expectedForThisText = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
.flatMap { zipPath ->
@@ -5537,56 +5544,12 @@
// Compress text data in parallel, then write to TAR sequentially
private fun compressKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
try {
- val json = synchronized(textData) {
+ val (jsonFileName, compressedData) = synchronized(textData) {
// Synchronize access to textData to prevent ConcurrentModificationException
- // during JSON generation while other threads might still be modifying it
- val corpusSigle = textId.substringBefore('_')
- val docSigle = textId.substringBeforeLast('.')
-
- // Apply corpus-level metadata (only if not already set with a non-empty value)
- corpusMetadata[corpusSigle]?.forEach { (key, value) ->
- val currentValue = textData.headerMetadata[key]
- // Inherit if: key doesn't exist, OR current value is empty/blank
- val shouldInherit = when (currentValue) {
- null -> true
- is String -> currentValue.isBlank()
- else -> false
- }
- if (shouldInherit && value != null) {
- // Only set non-empty values
- when (value) {
- is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
- is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
- is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
- else -> textData.headerMetadata[key] = value
- }
- }
- }
-
- // Apply doc-level metadata (only if not already set with a non-empty value)
- docMetadata[docSigle]?.forEach { (key, value) ->
- val currentValue = textData.headerMetadata[key]
- // Inherit if: key doesn't exist, OR current value is empty/blank
- val shouldInherit = when (currentValue) {
- null -> true
- is String -> currentValue.isBlank()
- else -> false
- }
- if (shouldInherit) {
- // Only set non-empty values
- when (value) {
- is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
- is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
- is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
- else -> textData.headerMetadata[key] = value
- }
- }
- }
-
- KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
+ // during JSON generation while other threads might still be modifying it.
+ applyInheritedKrillMetadata(textId, textData)
+ compressKrillJson(textId, textData)
}
-
- val (jsonFileName, compressedData) = compressKrillJson(textId, json)
// Store compressed data for sequential TAR writing
krillCompressedData[textId] = CompressedKrillData(textId, jsonFileName, compressedData)
@@ -5608,6 +5571,45 @@
}
}
+ private fun applyInheritedKrillMetadata(textId: String, textData: KrillJsonGenerator.KrillTextData) {
+ val corpusSigle = textId.substringBefore('_')
+ val docSigle = textId.substringBeforeLast('.')
+
+ corpusMetadata[corpusSigle]?.forEach { (key, value) ->
+ val currentValue = textData.headerMetadata[key]
+ val shouldInherit = when (currentValue) {
+ null -> true
+ is String -> currentValue.isBlank()
+ else -> false
+ }
+ if (shouldInherit && value != null) {
+ when (value) {
+ is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
+ is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+ is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+ else -> textData.headerMetadata[key] = value
+ }
+ }
+ }
+
+ docMetadata[docSigle]?.forEach { (key, value) ->
+ val currentValue = textData.headerMetadata[key]
+ val shouldInherit = when (currentValue) {
+ null -> true
+ is String -> currentValue.isBlank()
+ else -> false
+ }
+ if (shouldInherit && value != null) {
+ when (value) {
+ is String -> if (value.isNotBlank()) textData.headerMetadata[key] = value
+ is List<*> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+ is Map<*, *> -> if (value.isNotEmpty()) textData.headerMetadata[key] = value
+ else -> textData.headerMetadata[key] = value
+ }
+ }
+ }
+ }
+
private fun enqueueKrillCompression(textId: String, textData: KrillJsonGenerator.KrillTextData) {
if (krillCompressedData.containsKey(textId)) return
@@ -5632,13 +5634,16 @@
}
}
- private fun compressKrillJson(textId: String, json: String): Pair<String, ByteArray> {
+ private fun compressKrillJson(
+ textId: String,
+ textData: KrillJsonGenerator.KrillTextData
+ ): Pair<String, ByteArray> {
return if (useLz4) {
val fileName = textId.replace("_", "-").replace(".", "-") + ".json.lz4"
val byteOut = ByteArrayOutputStream()
net.jpountz.lz4.LZ4FrameOutputStream(byteOut).use { lz4Out ->
OutputStreamWriter(lz4Out, StandardCharsets.UTF_8).use { writer ->
- writer.write(json)
+ KrillJsonGenerator.generateTo(writer, textData, corpusMetadata, docMetadata, includeNonWordTokens)
}
}
Pair(fileName, byteOut.toByteArray())
@@ -5652,7 +5657,7 @@
}
gzipOut.use { gzip ->
OutputStreamWriter(gzip, StandardCharsets.UTF_8).use { writer ->
- writer.write(json)
+ KrillJsonGenerator.generateTo(writer, textData, corpusMetadata, docMetadata, includeNonWordTokens)
}
}
Pair(fileName, byteOut.toByteArray())
@@ -5919,29 +5924,11 @@
// Output a single text to Krill TAR (thread-safe)
private fun outputKrillText(textId: String, textData: KrillJsonGenerator.KrillTextData) {
try {
- // Merge corpus and doc metadata
- val textIdWithSlashes = textData.textId.replace("_", "/").replace(".", "/")
- val corpusSigle = textIdWithSlashes.split("/")[0]
- val docSigle = textIdWithSlashes.split("/").take(2).joinToString("/")
-
- // Apply corpus-level metadata
- corpusMetadata[corpusSigle]?.forEach { (key, value) ->
- if (!textData.headerMetadata.containsKey(key)) {
- textData.headerMetadata[key] = value
- }
+ val (jsonFileName, compressedData) = synchronized(textData) {
+ applyInheritedKrillMetadata(textId, textData)
+ compressKrillJson(textId, textData)
}
- // Apply doc-level metadata
- docMetadata[docSigle]?.forEach { (key, value) ->
- if (!textData.headerMetadata.containsKey(key)) {
- textData.headerMetadata[key] = value
- }
- }
-
- val json = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens)
-
- val (jsonFileName, compressedData) = compressKrillJson(textId, json)
-
// Write to TAR (synchronized for thread safety)
synchronized(krillTarOutputStream!!) {
val tarEntry = TarArchiveEntry(jsonFileName)
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
index 295265a..cf4a7d8 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/formatters/KrillJsonGenerator.kt
@@ -60,6 +60,18 @@
includeNonWordTokens: Boolean
): String {
val sb = StringBuilder()
+ generateTo(sb, textData, corpusMetadata, docMetadata, includeNonWordTokens)
+ return sb.toString()
+ }
+
+ fun generateTo(
+ out: Appendable,
+ textData: KrillTextData,
+ corpusMetadata: Map<String, MutableMap<String, Any>>,
+ docMetadata: Map<String, MutableMap<String, Any>>,
+ includeNonWordTokens: Boolean
+ ) {
+ val sb = StringBuilder()
sb.append("{")
// @context, @type, and version
@@ -378,20 +390,37 @@
// stream - token-level annotations
sb.append("\"stream\":[")
- if (textData.tokens != null) {
- val streamItems = generateStream(textData, includeNonWordTokens)
- sb.append(streamItems.joinToString(","))
- }
- sb.append("]")
+ out.append(sb.toString())
+ appendStream(out, textData, includeNonWordTokens)
+ out.append("]")
- sb.append("}") // close data
- sb.append("}") // close root
-
- return sb.toString()
+ out.append("}") // close data
+ out.append("}") // close root
}
private fun generateStream(textData: KrillTextData, includeNonWordTokens: Boolean): List<String> {
- val rawTokens = textData.tokens ?: return emptyList()
+ val result = mutableListOf<String>()
+ forEachStreamItem(textData, includeNonWordTokens) { result.add(it) }
+ return result
+ }
+
+ private fun appendStream(out: Appendable, textData: KrillTextData, includeNonWordTokens: Boolean) {
+ var first = true
+ forEachStreamItem(textData, includeNonWordTokens) { item ->
+ if (!first) {
+ out.append(",")
+ }
+ out.append(item)
+ first = false
+ }
+ }
+
+ private fun forEachStreamItem(
+ textData: KrillTextData,
+ includeNonWordTokens: Boolean,
+ emit: (String) -> Unit
+ ) {
+ val rawTokens = textData.tokens ?: return
val text = textData.textContent ?: NonBmpString("")
val sentences = textData.sentences ?: emptyArray()
val tokens: List<Span> = if (includeNonWordTokens || text.length == 0) {
@@ -401,9 +430,8 @@
}
if (tokens.isEmpty()) {
LOGGER.fine("No tokens remained for ${textData.textId} after filtering non-word tokens")
- return emptyList()
+ return
}
- val result = mutableListOf<String>()
data class FoundryMorphoData(
val foundry: String,
val prefix: String?,
@@ -787,10 +815,8 @@
// Surface form (always last)
tokenAnnotations.add(jsonString("s:${surfaceForm.escapeKrillValue()}"))
- result.add(jsonArray(tokenAnnotations))
+ emit(jsonArray(tokenAnnotations))
}
-
- return result
}
private fun lowerBoundTokenFrom(tokens: List<Span>, target: Int): Int {
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
index a703331..94a0af7 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KrillJsonGeneratorTest.kt
@@ -1,5 +1,6 @@
package de.ids_mannheim.korapxmltools
+import de.ids_mannheim.korapxmltools.formatters.KrillJsonGenerator
import net.jpountz.lz4.LZ4FrameInputStream
import org.junit.After
import org.junit.AfterClass
@@ -692,6 +693,50 @@
}
@Test
+ fun krillGenerateToMatchesGenerate() {
+ val textData = KrillJsonGenerator.KrillTextData(
+ textId = "TEST_DOC.1",
+ textContent = NonBmpString("Alpha beta."),
+ headerMetadata = mutableMapOf("title" to "Synthetic test")
+ ).apply {
+ tokens = arrayOf(
+ KorapXmlTool.Span(0, 5),
+ KorapXmlTool.Span(6, 10),
+ KorapXmlTool.Span(10, 11)
+ )
+ sentences = arrayOf(KorapXmlTool.Span(0, 11))
+ morphoByFoundry["spacy"] = mutableMapOf(
+ "0-5" to KorapXmlTool.MorphoSpan(lemma = "Alpha", upos = "PROPN", xpos = "NE"),
+ "6-10" to KorapXmlTool.MorphoSpan(lemma = "beta", upos = "NOUN", xpos = "NN")
+ )
+ structureSpans.add(
+ KrillJsonGenerator.StructureSpan(
+ layer = "dereko/s:p",
+ from = 0,
+ to = 11,
+ tokenFrom = 0,
+ tokenTo = 3,
+ depth = 1
+ )
+ )
+ }
+
+ val corpusMetadata = mutableMapOf<String, MutableMap<String, Any>>(
+ "TEST" to mutableMapOf("publisher" to "Publisher")
+ )
+ val docMetadata = mutableMapOf<String, MutableMap<String, Any>>(
+ "TEST_DOC" to mutableMapOf("docTitle" to "Doc title")
+ )
+
+ val generated = KrillJsonGenerator.generate(textData, corpusMetadata, docMetadata, includeNonWordTokens = true)
+ val streamed = buildString {
+ KrillJsonGenerator.generateTo(this, textData, corpusMetadata, docMetadata, includeNonWordTokens = true)
+ }
+
+ assertEquals(generated, streamed)
+ }
+
+ @Test
fun testProbabilitySortingInKrillJsonOutput() {
// Test that multiple POS annotations are sorted by descending probability in Krill JSON output
// Use the base sample ZIP which should contain POS annotations with probabilities