Do not sort texts in now output mode

Change-Id: I0c0a16b05bd1287586822f3cb999c63e442e6bef
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index d43ec1f..7b70a6a 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -962,6 +962,39 @@
             outputFormat == OutputFormat.WORD2VEC ||
             outputFormat == OutputFormat.NOW)
 
+    internal fun canStreamNowEntriesImmediately(): Boolean =
+        outputFormat == OutputFormat.NOW &&
+            annotationWorkerPool == null &&
+            taggerName == null &&
+            parserName == null
+
+    internal fun registerZipProgress(zipPath: String, size: Long) {
+        zipSizes[zipPath] = size
+        zipProgressBytes[zipPath] = AtomicLong(0)
+    }
+
+    internal fun trackZipProgressBytes(zipPath: String, deltaBytes: Long): Long? {
+        val zipTotal = zipSizes[zipPath] ?: return null
+        if (deltaBytes <= 0) return processedZipBytes.get()
+        val trackedBytes = zipProgressBytes.computeIfAbsent(zipPath) { AtomicLong(0) }
+        var appliedDelta = 0L
+
+        while (true) {
+            val current = trackedBytes.get()
+            if (current >= zipTotal) return processedZipBytes.get()
+            val next = (current + deltaBytes).coerceAtMost(zipTotal)
+            if (trackedBytes.compareAndSet(current, next)) {
+                appliedDelta = next - current
+                break
+            }
+        }
+
+        if (appliedDelta <= 0) return processedZipBytes.get()
+        val doneBytes = processedZipBytes.addAndGet(appliedDelta)
+        updateSizeBasedProgressBar(doneBytes)
+        return doneBytes
+    }
+
     private fun writeOutput(content: CharSequence) {
         if (textOutputWriter != null) {
             textOutputWriter!!.append(content)
@@ -1116,6 +1149,7 @@
     private val zipOrdinals: ConcurrentHashMap<String, Int> = ConcurrentHashMap()
     private var totalZips: Int = 0
     private val zipSizes: ConcurrentHashMap<String, Long> = ConcurrentHashMap()
+    private val zipProgressBytes: ConcurrentHashMap<String, AtomicLong> = ConcurrentHashMap()
     private val processedZipBytes: AtomicLong = AtomicLong(0)
     private var totalZipBytes: Long = 0
     private var startTimeMillis: Long = 0
@@ -1298,6 +1332,9 @@
                 Thread(r, "KrillWorker-${Thread.currentThread().threadId()}")
             }
             LOGGER.info("Initialized work-stealing scheduler with $maxThreads worker threads for Krill output")
+        } else if (canStreamNowEntriesImmediately()) {
+            entryExecutor = null
+            LOGGER.info("Initialized NOW streaming mode: archive-order entries, no text-ID scheduling")
         } else {
             // For other formats, use priority-based executor
             entryExecutor = java.util.concurrent.ThreadPoolExecutor(
@@ -1500,7 +1537,10 @@
         totalZips = zips.size
         zipOrdinals.clear()
         zipSizes.clear()
-        zips.forEach { zip -> zipSizes[zip] = try { File(zip).length() } catch (_: Exception) { 0L } }
+        zipProgressBytes.clear()
+        zips.forEach { zip ->
+            registerZipProgress(zip, try { File(zip).length() } catch (_: Exception) { 0L })
+        }
         totalZipBytes = zipSizes.values.sum()
         // In lemma-only mode, process largest zips first
         if (lemmaOnly) {
@@ -1553,7 +1593,11 @@
         if (maxThreads > 1) {
             val foundry = getFoundryFromZipFileNames(zips)
             val parallelism = maxThreads.coerceAtLeast(1)
-            LOGGER.info("Processing zips with ordered queue; parallelism=$parallelism; entries ${if (sequentialInZip) "sequential" else "parallel"}")
+            if (canStreamNowEntriesImmediately()) {
+                LOGGER.info("Processing zips in NOW streaming mode; zip parallelism=$parallelism; entry order=archive")
+            } else {
+                LOGGER.info("Processing zips with ordered queue; parallelism=$parallelism; entries ${if (sequentialInZip) "sequential" else "parallel"}")
+            }
             processZipsWithQueue(zips, foundry, parallelism)
         } else {
             LOGGER.info("Processing zip files sequentially")
@@ -2296,17 +2340,27 @@
         }
         LOGGER.fine("About to process ZIP entries: hasCorrespondingBaseZip=${zipFilePath.hasCorrespondingBaseZip()}")
         if (zipFilePath.hasCorrespondingBaseZip()) {
-            val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+            val baseZip = zipFilePath.correspondingBaseZip()!!
+            val relatedZips = if (canStreamNowEntriesImmediately() && !lemmaOnly) {
+                arrayOf(baseZip, zipFilePath)
+            } else {
+                arrayOf(zipFilePath, baseZip)
+            }
             // Process related zips one after another to keep the ZipFile lifetime strictly bounded
             relatedZips.forEach { zip ->
                 // For krill format, use per-ZIP foundry; for other formats, use the original foundry
                 val zipFoundry = if (outputFormat == OutputFormat.KRILL) {
-                    if (zip == zipFilePath.correspondingBaseZip()) "base" else foundry
+                    if (zip == baseZip) "base" else foundry
                 } else {
                     foundry  // Keep original foundry for non-krill formats
                 }
                 openZipFile(zip).use { zipFile ->
-                    processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
+                    if (canStreamNowEntriesImmediately()) {
+                        LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
+                        processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
+                    } else {
+                        processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
+                    }
                 }
             }
         } else {
@@ -2315,7 +2369,12 @@
                 // If no corresponding base ZIP exists, this IS the base ZIP
                 openZipFile(zipFilePath).use { zipFile ->
                     LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
-                    processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+                    if (canStreamNowEntriesImmediately()) {
+                        LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
+                        processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
+                    } else {
+                        processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+                    }
                     LOGGER.fine("Returned from processZipEntriesWithPool")
                 }
             } catch (e: Exception) {
@@ -2344,32 +2403,47 @@
         LOGGER.info("Processing zip ${if (ord>0) ord else "?"}/$totalZips: ${zipFilePath} (${humanBytes(size)}) in thread ${Thread.currentThread().threadId()}")
         if (zipFilePath.hasCorrespondingBaseZip()) {
             // Process the two related zips strictly sequentially to limit memory growth
-            val zips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+            val baseZip = zipFilePath.correspondingBaseZip()!!
+            val zips = if (canStreamNowEntriesImmediately() && !lemmaOnly) {
+                arrayOf(baseZip, zipFilePath)
+            } else {
+                arrayOf(zipFilePath, baseZip)
+            }
             zips.forEach { zip ->
                 // For krill format, use per-ZIP foundry; for other formats, use the original foundry
                 val zipFoundry = if (outputFormat == OutputFormat.KRILL) {
-                    if (zip == zipFilePath.correspondingBaseZip()) "base" else foundry
+                    if (zip == baseZip) "base" else foundry
                 } else {
                     foundry  // Keep original foundry for non-krill formats
                 }
                 openZipFile(zip).use { zipFile ->
-                    // Iterate entries sorted by text ID to ensure consistent processing order
-                    zipFile.entries.toList()
-                        .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
-                        .sortedBy { getTextIdFromPath(it.name) }
-                        .forEach { zipEntry ->
-                            processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
-                        }
+                    if (canStreamNowEntriesImmediately()) {
+                        LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
+                        processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
+                    } else {
+                        // Iterate entries sorted by text ID to ensure consistent processing order
+                        zipFile.entries.toList()
+                            .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
+                            .sortedBy { getTextIdFromPath(it.name) }
+                            .forEach { zipEntry ->
+                                processZipEntry(zipFile, zip, zipFoundry, zipEntry, true)
+                            }
+                    }
                 }
             }
         } else {
             openZipFile(zipFilePath).use { zipFile ->
-                zipFile.entries.toList()
-                    .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
-                    .sortedBy { getTextIdFromPath(it.name) }
-                    .forEach { zipEntry ->
-                        processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
-                    }
+                if (canStreamNowEntriesImmediately()) {
+                    LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
+                    processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
+                } else {
+                    zipFile.entries.toList()
+                        .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
+                        .sortedBy { getTextIdFromPath(it.name) }
+                        .forEach { zipEntry ->
+                            processZipEntry(zipFile, zipFilePath, foundry, zipEntry, false)
+                        }
+                }
             }
         }
         logZipProgress(zipFilePath)
@@ -2383,7 +2457,13 @@
     private fun logZipProgress(zipFilePath: String) {
         try {
             val size = zipSizes[zipFilePath] ?: 0L
-            val done = processedZipBytes.addAndGet(size)
+            val tracked = zipProgressBytes[zipFilePath]?.get() ?: 0L
+            val remainder = (size - tracked).coerceAtLeast(0L)
+            val done = if (remainder > 0L) {
+                trackZipProgressBytes(zipFilePath, remainder) ?: processedZipBytes.get()
+            } else {
+                processedZipBytes.get()
+            }
             val total = if (totalZipBytes > 0) totalZipBytes else 1L
             val elapsedMs = (System.currentTimeMillis() - startTimeMillis).coerceAtLeast(1)
             val speedBytesPerSec = (done * 1000.0) / elapsedMs
@@ -2398,12 +2478,6 @@
                         "(${humanBytes(size)}). Progress: ${String.format(Locale.ROOT, "%.1f", pct)}%, " +
                         "ETA ${etaStr} at ${humanSpeed}"
             )
-            
-            // Update progress bar for text output formats (size-based progress in MB)
-            if (!quiet && progressBar != null && usesSizeBasedTextProgress()) {
-                val doneMB = done / (1024.0 * 1024.0)
-                progressBar?.stepTo((doneMB * 100).toLong())  // Multiply by 100 to match initialization
-            }
         } catch (e: Exception) {
             LOGGER.fine("Failed to log zip progress for $zipFilePath: ${e.message}")
         }
@@ -2426,6 +2500,35 @@
         return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
     }
 
+    private fun updateSizeBasedProgressBar(doneBytes: Long) {
+        if (!quiet && progressBar != null && usesSizeBasedTextProgress()) {
+            val doneMB = doneBytes / (1024.0 * 1024.0)
+            progressBar?.stepTo((doneMB * 100).toLong())
+        }
+    }
+
+    private fun noteZipEntryProgress(zipPath: String, zipEntry: ZipArchiveEntry) {
+        if (!usesSizeBasedTextProgress()) return
+        val zipTotal = zipSizes[zipPath] ?: return
+        val deltaBytes = when {
+            zipEntry.compressedSize > 0L -> zipEntry.compressedSize
+            zipEntry.size > 0L -> zipEntry.size.coerceAtMost(zipTotal)
+            else -> 0L
+        }
+        trackZipProgressBytes(zipPath, deltaBytes)
+    }
+
+    private fun processZipEntriesStreaming(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
+        LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
+        val enumEntries = zipFile.entries
+
+        while (enumEntries.hasMoreElements()) {
+            val entry = enumEntries.nextElement()
+            if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
+            processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+        }
+    }
+
     private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
         // Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
         val entries: MutableList<ZipArchiveEntry> = ArrayList()
@@ -3005,6 +3108,8 @@
             }
         } catch (e: Exception) {
             e.printStackTrace()
+        } finally {
+            noteZipEntryProgress(zipPath, zipEntry)
         }
     }
 
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
index bd5dc78..7e7a2bc 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/GeneralFeaturesTest.kt
@@ -136,6 +136,34 @@
         assertTrue(!tool.usesSizeBasedTextProgress())
     }
 
+    @Test
+    fun plainNowOutputCanStreamWithoutSorting() {
+        val tool = KorapXmlTool()
+        tool.outputFormat = OutputFormat.NOW
+
+        assertTrue(tool.canStreamNowEntriesImmediately())
+    }
+
+    @Test
+    fun nonNowOutputKeepsOrderedPipeline() {
+        val tool = KorapXmlTool()
+        tool.outputFormat = OutputFormat.CONLLU
+
+        assertTrue(!tool.canStreamNowEntriesImmediately())
+    }
+
+    @Test
+    fun zipProgressTrackingClampsAtRegisteredZipSize() {
+        val tool = KorapXmlTool()
+        tool.outputFormat = OutputFormat.NOW
+        tool.outputFile = "test.now"
+        tool.registerZipProgress("sample.zip", 100L)
+
+        assertEquals(40L, tool.trackZipProgressBytes("sample.zip", 40L))
+        assertEquals(100L, tool.trackZipProgressBytes("sample.zip", 80L))
+        assertEquals(100L, tool.trackZipProgressBytes("sample.zip", 10L))
+    }
+
     private fun KorapXmlTool.compareTextIds(a: String, b: String): Int {
         val m = KorapXmlTool::class.java.getDeclaredMethod("compareTextIds", String::class.java, String::class.java)
         m.isAccessible = true