Krill output: Introduce watermark based output approach
Change-Id: Ide6ef9dd64df92b90d3f02ebceaea2ca14b0985c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 6825c23..b5c08f5 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -399,6 +399,12 @@
val processedFoundries: MutableSet<String> = mutableSetOf()
var krillOutputCount = java.util.concurrent.atomic.AtomicInteger(0)
+ // Watermark-based incremental output: track max textId seen in each ZIP
+ val zipWatermarks: ConcurrentHashMap<String, String> = ConcurrentHashMap()
+ val activeZips: ConcurrentHashMap<String, Boolean> = ConcurrentHashMap()
+ val watermarkLock = Any()
+ var lastOutputWatermark: String = ""
+
fun String.hasCorrespondingBaseZip(): Boolean {
if (!this.matches(Regex(".*\\.([^/.]+)\\.zip$"))) return false
val baseZip = this.replace(Regex("\\.([^/.]+)\\.zip$"), ".zip")
@@ -421,6 +427,9 @@
krillData.clear()
corpusMetadata.clear()
docMetadata.clear()
+ zipWatermarks.clear()
+ activeZips.clear()
+ lastOutputWatermark = ""
}
// Initialize shared entry executor (used inside each zip)
@@ -772,6 +781,11 @@
}
private fun processZipFile(zipFilePath: String, foundry: String = "base") {
+ // Mark this ZIP as active for watermark-based incremental output
+ if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+ activeZips[zipFilePath] = true
+ }
+
val ord = zipOrdinals[zipFilePath] ?: 0
val size = zipSizes[zipFilePath] ?: 0L
LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -824,7 +838,7 @@
foundry // Keep original foundry for non-krill formats
}
ApacheZipFile(File(zip)).use { zipFile ->
- processZipEntriesWithPool(zipFile, zipFoundry, true)
+ processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
}
}
} else {
@@ -832,7 +846,7 @@
try {
ApacheZipFile(File(zipFilePath)).use { zipFile ->
LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
- processZipEntriesWithPool(zipFile, foundry, false)
+ processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
LOGGER.fine("Returned from processZipEntriesWithPool")
}
} catch (e: Exception) {
@@ -853,10 +867,19 @@
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
processedFoundries.add(foundry)
checkAndOutputCompleteTexts()
+
+ // Mark this ZIP as inactive for watermark-based incremental output
+ activeZips.remove(zipFilePath)
+ LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
}
}
private fun processZipFileSequentially(zipFilePath: String, foundry: String = "base") {
+ // Mark this ZIP as active for watermark-based incremental output
+ if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+ activeZips[zipFilePath] = true
+ }
+
val ord = zipOrdinals[zipFilePath] ?: 0
val size = zipSizes[zipFilePath] ?: 0L
LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
@@ -876,7 +899,7 @@
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
.sortedBy { it.name }
.forEach { zipEntry ->
- processZipEntry(zipFile, zipFoundry, zipEntry, true)
+ processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
}
}
}
@@ -886,7 +909,7 @@
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
.sortedBy { it.name }
.forEach { zipEntry ->
- processZipEntry(zipFile, foundry, zipEntry, false)
+ processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
}
}
}
@@ -896,6 +919,10 @@
if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
processedFoundries.add(foundry)
checkAndOutputCompleteTexts()
+
+ // Mark this ZIP as inactive for watermark-based incremental output
+ activeZips.remove(zipFilePath)
+ LOGGER.info("Marked ZIP as inactive: $zipFilePath (${activeZips.size} ZIPs still active)")
}
}
@@ -939,7 +966,7 @@
return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
}
- private fun processZipEntriesWithPool(zipFile: ApacheZipFile, foundry: String, waitForMorpho: Boolean) {
+ private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
// Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
val entries: MutableList<ZipArchiveEntry> = ArrayList()
var documentCount = 0
@@ -984,7 +1011,7 @@
// If only one thread requested, do sequential to avoid pool overhead
if (maxThreads <= 1) {
- entries.forEach { entry -> processZipEntry(zipFile, foundry, entry, waitForMorpho) }
+ entries.forEach { entry -> processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho) }
return
}
@@ -994,7 +1021,7 @@
entries.forEach { entry ->
entryExecutor?.execute {
try {
- processZipEntry(zipFile, foundry, entry, waitForMorpho)
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
} catch (t: Throwable) {
LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
} finally {
@@ -1009,7 +1036,7 @@
}
}
- fun processZipEntry(zipFile: ApacheZipFile, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
+ fun processZipEntry(zipFile: ApacheZipFile, zipPath: String, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
var foundry = _foundry
var waitForMorpho = passedWaitForMorpho
LOGGER.finer("Processing ${zipEntry.name} in thread ${Thread.currentThread().threadId()}")
@@ -1185,6 +1212,11 @@
}
}
+ // Advance watermark for incremental output (for Krill format with watermark-based output)
+ if (outputFormat == OutputFormat.KRILL && incrementalKrill) {
+ advanceWatermarkAndOutput(zipPath, docId)
+ }
+
val morphoRequired = when {
// If tagger or parser is enabled, we generate annotations in processText
taggerName != null || parserName != null -> false
@@ -2859,6 +2891,42 @@
}
}
+ // Watermark-based incremental output: output all texts below the global watermark
+ // Watermark = minimum of max textIds seen across all active ZIPs
+ // This ensures no more annotations will arrive for texts below the watermark
+ private fun advanceWatermarkAndOutput(zipPath: String, currentTextId: String) {
+ if (outputFormat != OutputFormat.KRILL || !incrementalKrill || krillTarOutputStream == null) return
+
+ synchronized(watermarkLock) {
+ // Update this ZIP's watermark
+ zipWatermarks[zipPath] = currentTextId
+
+ // Calculate global watermark (minimum across all active ZIPs)
+ val activeWatermarks = activeZips.keys.mapNotNull { zipWatermarks[it] }
+ if (activeWatermarks.isEmpty()) return
+
+ val globalWatermark = activeWatermarks.minOrNull() ?: return
+
+ // Output all texts strictly less than the global watermark
+ if (globalWatermark > lastOutputWatermark) {
+ val textsToOutput = krillData.keys.filter { it < globalWatermark }.sorted()
+
+ textsToOutput.forEach { textId ->
+ val textData = krillData.remove(textId)
+ if (textData != null) {
+ outputKrillText(textId, textData)
+ }
+ }
+
+ if (textsToOutput.isNotEmpty()) {
+ LOGGER.info("Watermark advanced to $globalWatermark: output ${textsToOutput.size} texts (${krillData.size} still pending)")
+ }
+
+ lastOutputWatermark = globalWatermark
+ }
+ }
+ }
+
// Output a single text to Krill TAR (thread-safe)
private fun outputKrillText(textId: String, textData: KrillTextData) {
try {