Do not sort texts in now output mode
Change-Id: I0c0a16b05bd1287586822f3cb999c63e442e6bef
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index d43ec1f..7b70a6a 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -962,6 +962,39 @@
outputFormat == OutputFormat.WORD2VEC ||
outputFormat == OutputFormat.NOW)
+ internal fun canStreamNowEntriesImmediately(): Boolean =
+ outputFormat == OutputFormat.NOW &&
+ annotationWorkerPool == null &&
+ taggerName == null &&
+ parserName == null
+
+ internal fun registerZipProgress(zipPath: String, size: Long) {
+ zipSizes[zipPath] = size
+ zipProgressBytes[zipPath] = AtomicLong(0)
+ }
+
+ internal fun trackZipProgressBytes(zipPath: String, deltaBytes: Long): Long? {
+ val zipTotal = zipSizes[zipPath] ?: return null
+ if (deltaBytes <= 0) return processedZipBytes.get()
+ val trackedBytes = zipProgressBytes.computeIfAbsent(zipPath) { AtomicLong(0) }
+ var appliedDelta = 0L
+
+ while (true) {
+ val current = trackedBytes.get()
+ if (current >= zipTotal) return processedZipBytes.get()
+ val next = (current + deltaBytes).coerceAtMost(zipTotal)
+ if (trackedBytes.compareAndSet(current, next)) {
+ appliedDelta = next - current
+ break
+ }
+ }
+
+ if (appliedDelta <= 0) return processedZipBytes.get()
+ val doneBytes = processedZipBytes.addAndGet(appliedDelta)
+ updateSizeBasedProgressBar(doneBytes)
+ return doneBytes
+ }
+
private fun writeOutput(content: CharSequence) {
if (textOutputWriter != null) {
textOutputWriter!!.append(content)
@@ -1116,6 +1149,7 @@
private val zipOrdinals: ConcurrentHashMap<String, Int> = ConcurrentHashMap()
private var totalZips: Int = 0
private val zipSizes: ConcurrentHashMap<String, Long> = ConcurrentHashMap()
+ private val zipProgressBytes: ConcurrentHashMap<String, AtomicLong> = ConcurrentHashMap()
private val processedZipBytes: AtomicLong = AtomicLong(0)
private var totalZipBytes: Long = 0
private var startTimeMillis: Long = 0
@@ -1298,6 +1332,9 @@
Thread(r, "KrillWorker-${Thread.currentThread().threadId()}")
}
LOGGER.info("Initialized work-stealing scheduler with $maxThreads worker threads for Krill output")
+ } else if (canStreamNowEntriesImmediately()) {
+ entryExecutor = null
+ LOGGER.info("Initialized NOW streaming mode: archive-order entries, no text-ID scheduling")
} else {
// For other formats, use priority-based executor
entryExecutor = java.util.concurrent.ThreadPoolExecutor(
@@ -1500,7 +1537,10 @@
totalZips = zips.size
zipOrdinals.clear()
zipSizes.clear()
- zips.forEach { zip -> zipSizes[zip] = try { File(zip).length() } catch (_: Exception) { 0L } }
+ zipProgressBytes.clear()
+ zips.forEach { zip ->
+ registerZipProgress(zip, try { File(zip).length() } catch (_: Exception) { 0L })
+ }
totalZipBytes = zipSizes.values.sum()
// In lemma-only mode, process largest zips first
if (lemmaOnly) {
@@ -1553,7 +1593,11 @@
if (maxThreads > 1) {
val foundry = getFoundryFromZipFileNames(zips)
val parallelism = maxThreads.coerceAtLeast(1)
- LOGGER.info("Processing zips with ordered queue; parallelism=$parallelism; entries ${if (sequentialInZip) "sequential" else "parallel"}")
+ if (canStreamNowEntriesImmediately()) {
+ LOGGER.info("Processing zips in NOW streaming mode; zip parallelism=$parallelism; entry order=archive")
+ } else {
+ LOGGER.info("Processing zips with ordered queue; parallelism=$parallelism; entries ${if (sequentialInZip) "sequential" else "parallel"}")
+ }
processZipsWithQueue(zips, foundry, parallelism)
} else {
LOGGER.info("Processing zip files sequentially")
@@ -2296,17 +2340,27 @@
}
LOGGER.fine("About to process ZIP entries: hasCorrespondingBaseZip=${zipFilePath.hasCorrespondingBaseZip()}")
if (zipFilePath.hasCorrespondingBaseZip()) {
- val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+ val baseZip = zipFilePath.correspondingBaseZip()!!
+ val relatedZips = if (canStreamNowEntriesImmediately() && !lemmaOnly) {
+ arrayOf(baseZip, zipFilePath)
+ } else {
+ arrayOf(zipFilePath, baseZip)
+ }
// Process related zips one after another to keep the ZipFile lifetime strictly bounded
relatedZips.forEach { zip ->
// For krill format, use per-ZIP foundry; for other formats, use the original foundry
val zipFoundry = if (outputFormat == OutputFormat.KRILL) {
- if (zip == zipFilePath.correspondingBaseZip()) "base" else foundry
+ if (zip == baseZip) "base" else foundry
} else {
foundry // Keep original foundry for non-krill formats
}
openZipFile(zip).use { zipFile ->
- processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
+ if (canStreamNowEntriesImmediately()) {
+ LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
+ processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
+ } else {
+ processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
+ }
}
}
} else {
@@ -2315,7 +2369,12 @@
// If no corresponding base ZIP exists, this IS the base ZIP
openZipFile(zipFilePath).use { zipFile ->
LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
- processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+ if (canStreamNowEntriesImmediately()) {
+ LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
+ processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
+ } else {
+ processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+ }
LOGGER.fine("Returned from processZipEntriesWithPool")
}
} catch (e: Exception) {
@@ -2344,32 +2403,47 @@
LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
if (zipFilePath.hasCorrespondingBaseZip()) {
// Process the two related zips strictly sequentially to limit memory growth
- val zips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+ val baseZip = zipFilePath.correspondingBaseZip()!!
+ val zips = if (canStreamNowEntriesImmediately() && !lemmaOnly) {
+ arrayOf(baseZip, zipFilePath)
+ } else {
+ arrayOf(zipFilePath, baseZip)
+ }
zips.forEach { zip ->
// For krill format, use per-ZIP foundry; for other formats, use the original foundry
val zipFoundry = if (outputFormat == OutputFormat.KRILL) {
- if (zip == zipFilePath.correspondingBaseZip()) "base" else foundry
+ if (zip == baseZip) "base" else foundry
} else {
foundry // Keep original foundry for non-krill formats
}
openZipFile(zip).use { zipFile ->
- // Iterate entries sorted by text ID to ensure consistent processing order
- zipFile.entries.toList()
- .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
- .sortedBy { getTextIdFromPath(it.name) }
- .forEach { zipEntry ->
- processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
- }
+ if (canStreamNowEntriesImmediately()) {
+ LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
+ processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
+ } else {
+ // Iterate entries sorted by text ID to ensure consistent processing order
+ zipFile.entries.toList()
+ .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
+ .sortedBy { getTextIdFromPath(it.name) }
+ .forEach { zipEntry ->
+ processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
+ }
+ }
}
}
} else {
openZipFile(zipFilePath).use { zipFile ->
- zipFile.entries.toList()
- .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
- .sortedBy { getTextIdFromPath(it.name) }
- .forEach { zipEntry ->
- processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
- }
+ if (canStreamNowEntriesImmediately()) {
+ LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
+ processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
+ } else {
+ zipFile.entries.toList()
+ .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
+ .sortedBy { getTextIdFromPath(it.name) }
+ .forEach { zipEntry ->
+ processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
+ }
+ }
}
}
logZipProgress(zipFilePath)
@@ -2383,7 +2457,13 @@
private fun logZipProgress(zipFilePath: String) {
try {
val size = zipSizes[zipFilePath] ?: 0L
- val done = processedZipBytes.addAndGet(size)
+ val tracked = zipProgressBytes[zipFilePath]?.get() ?: 0L
+ val remainder = (size - tracked).coerceAtLeast(0L)
+ val done = if (remainder > 0L) {
+ trackZipProgressBytes(zipFilePath, remainder) ?: processedZipBytes.get()
+ } else {
+ processedZipBytes.get()
+ }
val total = if (totalZipBytes > 0) totalZipBytes else 1L
val elapsedMs = (System.currentTimeMillis() - startTimeMillis).coerceAtLeast(1)
val speedBytesPerSec = (done * 1000.0) / elapsedMs
@@ -2398,12 +2478,6 @@
"(${humanBytes(size)}). Progress: ${String.format(Locale.ROOT, "%.1f", pct)}%, " +
"ETA ${etaStr} at ${humanSpeed}"
)
-
- // Update progress bar for text output formats (size-based progress in MB)
- if (!quiet && progressBar != null && usesSizeBasedTextProgress()) {
- val doneMB = done / (1024.0 * 1024.0)
- progressBar?.stepTo((doneMB * 100).toLong()) // Multiply by 100 to match initialization
- }
} catch (e: Exception) {
LOGGER.fine("Failed to log zip progress for $zipFilePath: ${e.message}")
}
@@ -2426,6 +2500,35 @@
return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
}
+ private fun updateSizeBasedProgressBar(doneBytes: Long) {
+ if (!quiet && progressBar != null && usesSizeBasedTextProgress()) {
+ val doneMB = doneBytes / (1024.0 * 1024.0)
+ progressBar?.stepTo((doneMB * 100).toLong())
+ }
+ }
+
+ private fun noteZipEntryProgress(zipPath: String, zipEntry: ZipArchiveEntry) {
+ if (!usesSizeBasedTextProgress()) return
+ val zipTotal = zipSizes[zipPath] ?: return
+ val deltaBytes = when {
+ zipEntry.compressedSize > 0L -> zipEntry.compressedSize
+ zipEntry.size > 0L -> zipEntry.size.coerceAtMost(zipTotal)
+ else -> 0L
+ }
+ trackZipProgressBytes(zipPath, deltaBytes)
+ }
+
+ private fun processZipEntriesStreaming(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
+ LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
+ val enumEntries = zipFile.entries
+
+ while (enumEntries.hasMoreElements()) {
+ val entry = enumEntries.nextElement()
+ if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ }
+ }
+
private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
// Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
val entries: MutableList<ZipArchiveEntry> = ArrayList()
@@ -3005,6 +3108,8 @@
}
} catch (e: Exception) {
e.printStackTrace()
+ } finally {
+ noteZipEntryProgress(zipPath, zipEntry)
}
}
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
index bd5dc78..7e7a2bc 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
@@ -136,6 +136,34 @@
assertTrue(!tool.usesSizeBasedTextProgress())
}
+ @Test
+ fun plainNowOutputCanStreamWithoutSorting() {
+ val tool = KorapXmlTool()
+ tool.outputFormat = OutputFormat.NOW
+
+ assertTrue(tool.canStreamNowEntriesImmediately())
+ }
+
+ @Test
+ fun nonNowOutputKeepsOrderedPipeline() {
+ val tool = KorapXmlTool()
+ tool.outputFormat = OutputFormat.CONLLU
+
+ assertTrue(!tool.canStreamNowEntriesImmediately())
+ }
+
+ @Test
+ fun zipProgressTrackingClampsAtRegisteredZipSize() {
+ val tool = KorapXmlTool()
+ tool.outputFormat = OutputFormat.NOW
+ tool.outputFile = "test.now"
+ tool.registerZipProgress("sample.zip", 100L)
+
+ assertEquals(40L, tool.trackZipProgressBytes("sample.zip", 40L))
+ assertEquals(100L, tool.trackZipProgressBytes("sample.zip", 80L))
+ assertEquals(100L, tool.trackZipProgressBytes("sample.zip", 10L))
+ }
+
private fun KorapXmlTool.compareTextIds(a: String, b: String): Int {
val m = KorapXmlTool::class.java.getDeclaredMethod("compareTextIds", String::class.java, String::class.java)
m.isAccessible = true