Use standard java zip for now format output

Change-Id: I1fae09e271df2a7503ec00db4374e3ab7193bdb8
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1bdb58f..498a763 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,12 @@
 # Changelog
 
+## [Unreleased]
+
+### Fixed
+
+- Plain NOW export now opens ZIP input with `java.util.zip.ZipFile` in streaming mode instead of Apache Commons `ZipFile`, removing the multi-minute startup delay on very large archives with huge entry counts and allowing extraction to begin almost immediately
+- Plain NOW export startup and progress diagnostics now log ZIP open time and first-output timing more explicitly, making it easier to distinguish ZIP indexing overhead from actual extraction work
+
 ## [v3.3.0] - 2026-03-26
 
 ### Fixed
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index f402e00..7051227 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1158,6 +1158,7 @@
     private val docsWrittenToZip = java.util.concurrent.atomic.AtomicInteger(0)
     private val totalDocsInInput = java.util.concurrent.atomic.AtomicInteger(0) // Track total documents for progress
     private val annotationStartTime = java.util.concurrent.atomic.AtomicLong(0) // Track when annotation started
+    private val firstTextOutputLogged = java.util.concurrent.atomic.AtomicBoolean(false)
     private var progressBar: ProgressBar? = null
     var taggerToolBridges: ConcurrentHashMap<Long, TaggerToolBridge> = ConcurrentHashMap()
     var parserToolBridges: ConcurrentHashMap<Long, ParserToolBridge> = ConcurrentHashMap()
@@ -1303,6 +1304,8 @@
     }
 
     fun korapxml2conllu(args: Array<String>) {
+        outputTexts.clear()
+        firstTextOutputLogged.set(false)
         // Reset Krill state for fresh run (important for tests)
         if (outputFormat == OutputFormat.KRILL) {
             expectedFoundries.clear()
@@ -1351,9 +1354,14 @@
             }
             LOGGER.info("Initialized work-stealing scheduler with $maxThreads worker threads for Krill output")
         } else if (canStreamNowEntriesImmediately()) {
-            entryExecutor = null
+            entryExecutor = java.util.concurrent.Executors.newFixedThreadPool(maxThreads) { r ->
+                Thread(r, "NowEntryWorker-${Thread.currentThread().threadId()}")
+            }
             val textParserMode = if (shouldParseDataXmlWithStax()) "StAX" else "DOM"
-            LOGGER.info("Initialized NOW streaming mode: archive-order entries, no text-ID scheduling, data.xml via $textParserMode")
+            LOGGER.info(
+                "Initialized NOW streaming mode: archive-order entries, parallel entry processing, " +
+                    "no text-ID scheduling, data.xml via $textParserMode"
+            )
         } else {
             // For other formats, use priority-based executor
             entryExecutor = java.util.concurrent.ThreadPoolExecutor(
@@ -2214,12 +2222,57 @@
     private fun compareTextIds(a: String, b: String): Int =
         monthAwareSortKey(a).compareTo(monthAwareSortKey(b))
 
-    private fun openZipFile(path: String): ApacheZipFile =
-        ApacheZipFile.builder()
+    private fun shouldUseUnicodeExtraFieldsOnOpen(): Boolean = !canStreamNowEntriesImmediately()
+
+    private interface ReadableZipEntry {
+        val name: String
+        val size: Long
+        val compressedSize: Long
+        fun openInputStream(): InputStream
+    }
+
+    private class ApacheReadableZipEntry(
+        private val zipFile: ApacheZipFile,
+        private val entry: ZipArchiveEntry
+    ) : ReadableZipEntry {
+        override val name: String get() = entry.name
+        override val size: Long get() = entry.size
+        override val compressedSize: Long get() = entry.compressedSize
+        override fun openInputStream(): InputStream = zipFile.getInputStream(entry)
+    }
+
+    private class JavaReadableZipEntry(
+        private val zipFile: ZipFile,
+        private val entry: java.util.zip.ZipEntry
+    ) : ReadableZipEntry {
+        override val name: String get() = entry.name
+        override val size: Long get() = entry.size
+        override val compressedSize: Long get() = entry.compressedSize
+        override fun openInputStream(): InputStream = zipFile.getInputStream(entry)
+    }
+
+    private fun openZipFile(path: String): ApacheZipFile {
+        val startedAt = System.currentTimeMillis()
+        val useUnicodeExtraFields = shouldUseUnicodeExtraFieldsOnOpen()
+        LOGGER.info("Opening ZIP file: $path (unicodeExtraFields=${if (useUnicodeExtraFields) "on" else "off"})")
+        val zipFile = ApacheZipFile.builder()
             .setFile(File(path))
             .setCharset(StandardCharsets.UTF_8)
-            .setUseUnicodeExtraFields(true)
+            .setUseUnicodeExtraFields(useUnicodeExtraFields)
             .get()
+        val elapsedSeconds = ((System.currentTimeMillis() - startedAt).coerceAtLeast(0L)) / 1000L
+        LOGGER.info("Opened ZIP file: $path in ${formatDuration(elapsedSeconds)}")
+        return zipFile
+    }
+
+    private fun openJavaZipFile(path: String): ZipFile {
+        val startedAt = System.currentTimeMillis()
+        LOGGER.info("Opening ZIP file with java.util.zip.ZipFile: $path")
+        val zipFile = ZipFile(path, StandardCharsets.UTF_8)
+        val elapsedSeconds = ((System.currentTimeMillis() - startedAt).coerceAtLeast(0L)) / 1000L
+        LOGGER.info("Opened ZIP file with java.util.zip.ZipFile: $path in ${formatDuration(elapsedSeconds)}")
+        return zipFile
+    }
 
     private fun getFoundryFromZipFileName(zipFileName: String): String {
         if (!zipFileName.matches(Regex(".*\\.([^/.]+)\\.zip$"))) {
@@ -2238,6 +2291,8 @@
         return "base"
     }
 
+    private fun useJavaZipForNow(): Boolean = canStreamNowEntriesImmediately()
+
     private fun processZipFile(zipFilePath: String, foundry: String = "base") {
         val ord = zipOrdinals[zipFilePath] ?: 0
         val size = zipSizes[zipFilePath] ?: 0L
@@ -2373,11 +2428,13 @@
                 } else {
                     foundry  // Keep original foundry for non-krill formats
                 }
-                openZipFile(zip).use { zipFile ->
-                    if (canStreamNowEntriesImmediately()) {
+                if (useJavaZipForNow()) {
+                    openJavaZipFile(zip).use { zipFile ->
                         LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
                         processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
-                    } else {
+                    }
+                } else {
+                    openZipFile(zip).use { zipFile ->
                         processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
                     }
                 }
@@ -2386,16 +2443,18 @@
             LOGGER.fine("Opening ZipFile for processing: $zipFilePath")
             try {
                 // If no corresponding base ZIP exists, this IS the base ZIP
-                openZipFile(zipFilePath).use { zipFile ->
-                    LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
-                    if (canStreamNowEntriesImmediately()) {
+                if (useJavaZipForNow()) {
+                    openJavaZipFile(zipFilePath).use { zipFile ->
                         LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
                         processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
-                    } else {
-                        processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
                     }
-                    LOGGER.fine("Returned from processZipEntriesWithPool")
-                }
+                } else {
+                    openZipFile(zipFilePath).use { zipFile ->
+                        LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
+                        processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+                        LOGGER.fine("Returned from processZipEntriesWithPool")
+                    }
+                } 
             } catch (e: Exception) {
                 LOGGER.severe("Error processing ZIP: ${e.message}")
                 e.printStackTrace()
@@ -2435,11 +2494,13 @@
                 } else {
                     foundry  // Keep original foundry for non-krill formats
                 }
-                openZipFile(zip).use { zipFile ->
-                    if (canStreamNowEntriesImmediately()) {
+                if (useJavaZipForNow()) {
+                    openJavaZipFile(zip).use { zipFile ->
                         LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
                         processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
-                    } else {
+                    }
+                } else {
+                    openZipFile(zip).use { zipFile ->
                         // Iterate entries sorted by text ID to ensure consistent processing order
                         zipFile.entries.toList()
                             .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
@@ -2451,11 +2512,13 @@
                 }
             }
         } else {
-            openZipFile(zipFilePath).use { zipFile ->
-                if (canStreamNowEntriesImmediately()) {
+            if (useJavaZipForNow()) {
+                openJavaZipFile(zipFilePath).use { zipFile ->
                     LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
                     processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
-                } else {
+                }
+            } else {
+                openZipFile(zipFilePath).use { zipFile ->
                     zipFile.entries.toList()
                         .filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
                         .sortedBy { getTextIdFromPath(it.name) }
@@ -2526,7 +2589,7 @@
         }
     }
 
-    private fun noteZipEntryProgress(zipPath: String, zipEntry: ZipArchiveEntry) {
+    private fun noteZipEntryProgress(zipPath: String, zipEntry: ReadableZipEntry) {
         if (!usesSizeBasedTextProgress()) return
         val zipTotal = zipSizes[zipPath] ?: return
         val deltaBytes = when {
@@ -2537,15 +2600,81 @@
         trackZipProgressBytes(zipPath, deltaBytes)
     }
 
+    private fun noteFirstTextOutput(docId: String) {
+        if (firstTextOutputLogged.compareAndSet(false, true)) {
+            val elapsedSeconds = ((System.currentTimeMillis() - startTimeMillis).coerceAtLeast(0L)) / 1000L
+            LOGGER.info("First text output after ${formatDuration(elapsedSeconds)}: $docId")
+        }
+    }
+
+    private fun tryProcessReadyText(docId: String, foundry: String): Boolean {
+        if (!outputTexts.add(docId)) return false
+        return try {
+            processText(docId, foundry)
+            noteFirstTextOutput(docId)
+            true
+        } catch (t: Throwable) {
+            outputTexts.remove(docId)
+            throw t
+        }
+    }
+
     private fun processZipEntriesStreaming(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
         LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
         val enumEntries = zipFile.entries
+        val phaser = java.util.concurrent.Phaser(1)
+        val maxInFlight = maxOf(maxThreads * 4, 32)
+        val permits = java.util.concurrent.Semaphore(maxInFlight)
 
         while (enumEntries.hasMoreElements()) {
             val entry = enumEntries.nextElement()
             if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
-            processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+            if (entryExecutor != null && maxThreads > 1 && !sequentialInZip) {
+                permits.acquireUninterruptibly()
+                phaser.register()
+                entryExecutor!!.execute {
+                    try {
+                        processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+                    } finally {
+                        permits.release()
+                        phaser.arriveAndDeregister()
+                    }
+                }
+            } else {
+                processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+            }
         }
+
+        phaser.arriveAndAwaitAdvance()
+    }
+
+    private fun processZipEntriesStreaming(zipFile: ZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
+        LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
+        val enumEntries = zipFile.entries()
+        val phaser = java.util.concurrent.Phaser(1)
+        val maxInFlight = maxOf(maxThreads * 4, 32)
+        val permits = java.util.concurrent.Semaphore(maxInFlight)
+
+        while (enumEntries.hasMoreElements()) {
+            val entry = enumEntries.nextElement()
+            if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
+            if (entryExecutor != null && maxThreads > 1 && !sequentialInZip) {
+                permits.acquireUninterruptibly()
+                phaser.register()
+                entryExecutor!!.execute {
+                    try {
+                        processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+                    } finally {
+                        permits.release()
+                        phaser.arriveAndDeregister()
+                    }
+                }
+            } else {
+                processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+            }
+        }
+
+        phaser.arriveAndAwaitAdvance()
     }
 
     private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
@@ -2795,6 +2924,14 @@
     }
 
     fun processZipEntry(zipFile: ApacheZipFile, zipPath: String, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
+        processZipEntry(zipPath, _foundry, ApacheReadableZipEntry(zipFile, zipEntry), passedWaitForMorpho)
+    }
+
+    fun processZipEntry(zipFile: ZipFile, zipPath: String, _foundry: String, zipEntry: java.util.zip.ZipEntry, passedWaitForMorpho: Boolean) {
+        processZipEntry(zipPath, _foundry, JavaReadableZipEntry(zipFile, zipEntry), passedWaitForMorpho)
+    }
+
+    private fun processZipEntry(zipPath: String, _foundry: String, zipEntry: ReadableZipEntry, passedWaitForMorpho: Boolean) {
         var foundry = _foundry
         var waitForMorpho = passedWaitForMorpho
         LOGGER.finer("Processing ${zipEntry.name} in thread ${Thread.currentThread().threadId()}")
@@ -2849,7 +2986,7 @@
                  // Use DOM for data.xml (large text content) and structure/constituency (complex parsing)
                  // Use StAX for annotation files (morpho, dependency, tokens, sentences) for better performance
                  if (!needsDom && !isConstituency && (!isData || useStaxForData)) {
-                     processXmlEntryStax(zipFile, zipPath, zipEntry, foundry, waitForMorpho)
+                     processXmlEntryStax(zipPath, zipEntry, foundry, waitForMorpho)
                      return
                  }
 
@@ -2864,7 +3001,7 @@
                     return
                 }
                 val doc: Document = try {
-                    zipFile.getInputStream(zipEntry).use { inputStream ->
+                    zipEntry.openInputStream().use { inputStream ->
                         XMLCommentFilterReader(inputStream, "UTF-8").use { reader ->
                             dBuilder.parse(InputSource(reader))
                         }
@@ -3061,15 +3198,14 @@
                 if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
                     && (!morphoRequired || morpho[docId] != null)
                     && (extractMetadataRegex.isEmpty() || metadata[docId] != null)
-                    && !outputTexts.contains(docId)  // Skip if already output
                 ) {
                     LOGGER.fine("All data ready for $docId, calling processText")
-                    processText(docId, foundry)
+                    tryProcessReadyText(docId, foundry)
                 } else {
                     LOGGER.fine("NOT ready to process $docId yet: textOK=${texts[docId] != null || !textRequired}, sentencesOK=${sentences[docId] != null}, tokensOK=${tokens[docId] != null}, morphoOK=${!morphoRequired || morpho[docId] != null}")
                 }
             } else if ((extractMetadataRegex.isNotEmpty() || outputFormat == OutputFormat.KRILL) && zipEntry.name.matches(Regex(".*/header\\.xml$"))) {
-                val headerBytes = zipFile.getInputStream(zipEntry).use { it.readBytes() }
+                val headerBytes = zipEntry.openInputStream().use { it.readBytes() }
                 val text = headerBytes.toString(Charsets.UTF_8)
                 val headerDoc = safeDomFactory.newDocumentBuilder().parse(ByteArrayInputStream(headerBytes))
                 val headerRoot = headerDoc.documentElement
@@ -3119,10 +3255,9 @@
                     }
                     if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
                         && (!morphoRequired || morpho[docId] != null)
-                        && !outputTexts.contains(docId)  // Skip if already output
                     ) {
                         LOGGER.info("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
-                        processText(docId, foundry)
+                        tryProcessReadyText(docId, foundry)
                     }
                 }
             }
@@ -3133,10 +3268,10 @@
         }
     }
 
-    private fun processXmlEntryStax(zipFile: ApacheZipFile, zipPath: String, zipEntry: ZipArchiveEntry, foundry: String, waitForMorpho: Boolean) {
+    private fun processXmlEntryStax(zipPath: String, zipEntry: ReadableZipEntry, foundry: String, waitForMorpho: Boolean) {
         LOGGER.finer("Processing entry (StAX): ${zipEntry.name}, foundry=$foundry")
         val factory = xmlInputFactory.get()
-        val inputStream = zipFile.getInputStream(zipEntry)
+        val inputStream = zipEntry.openInputStream()
         val entryFileName = zipEntry.name.replace(Regex(".*?/([^/]+\\.xml)$"), "$1")
         // For krill output and morpho.xml files, bypass XMLCommentFilterReader: large files (80+ MB)
         // cause Xerces to fall back to the single-char read() which throws UnsupportedOperationException
@@ -3275,7 +3410,7 @@
                 && (!finalMorphoRequired || morpho[docId] != null)
                 && (extractMetadataRegex.isEmpty() || metadata[docId] != null)
             ) {
-                processText(docId, foundry)
+                tryProcessReadyText(docId, foundry)
             }
 
         } catch (e: Exception) {