Use standard java zip for now format output
Change-Id: I1fae09e271df2a7503ec00db4374e3ab7193bdb8
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1bdb58f..498a763 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,12 @@
# Changelog
+## [Unreleased]
+
+### Fixed
+
+- Plain NOW export now opens ZIP input with `java.util.zip.ZipFile` in streaming mode instead of Apache Commons `ZipFile`, removing the multi-minute startup delay on very large archives with huge entry counts and allowing extraction to begin almost immediately
+- Plain NOW export startup and progress diagnostics now log ZIP open time and first-output timing more explicitly, making it easier to distinguish ZIP indexing overhead from actual extraction work
+
## [v3.3.0] - 2026-03-26
### Fixed
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index f402e00..7051227 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1158,6 +1158,7 @@
private val docsWrittenToZip = java.util.concurrent.atomic.AtomicInteger(0)
private val totalDocsInInput = java.util.concurrent.atomic.AtomicInteger(0) // Track total documents for progress
private val annotationStartTime = java.util.concurrent.atomic.AtomicLong(0) // Track when annotation started
+ private val firstTextOutputLogged = java.util.concurrent.atomic.AtomicBoolean(false)
private var progressBar: ProgressBar? = null
var taggerToolBridges: ConcurrentHashMap<Long, TaggerToolBridge> = ConcurrentHashMap()
var parserToolBridges: ConcurrentHashMap<Long, ParserToolBridge> = ConcurrentHashMap()
@@ -1303,6 +1304,8 @@
}
fun korapxml2conllu(args: Array<String>) {
+ outputTexts.clear()
+ firstTextOutputLogged.set(false)
// Reset Krill state for fresh run (important for tests)
if (outputFormat == OutputFormat.KRILL) {
expectedFoundries.clear()
@@ -1351,9 +1354,14 @@
}
LOGGER.info("Initialized work-stealing scheduler with $maxThreads worker threads for Krill output")
} else if (canStreamNowEntriesImmediately()) {
- entryExecutor = null
+ entryExecutor = java.util.concurrent.Executors.newFixedThreadPool(maxThreads) { r ->
+ Thread(r, "NowEntryWorker-${Thread.currentThread().threadId()}")
+ }
val textParserMode = if (shouldParseDataXmlWithStax()) "StAX" else "DOM"
- LOGGER.info("Initialized NOW streaming mode: archive-order entries, no text-ID scheduling, data.xml via $textParserMode")
+ LOGGER.info(
+ "Initialized NOW streaming mode: archive-order entries, parallel entry processing, " +
+ "no text-ID scheduling, data.xml via $textParserMode"
+ )
} else {
// For other formats, use priority-based executor
entryExecutor = java.util.concurrent.ThreadPoolExecutor(
@@ -2214,12 +2222,57 @@
private fun compareTextIds(a: String, b: String): Int =
monthAwareSortKey(a).compareTo(monthAwareSortKey(b))
- private fun openZipFile(path: String): ApacheZipFile =
- ApacheZipFile.builder()
+ private fun shouldUseUnicodeExtraFieldsOnOpen(): Boolean = !canStreamNowEntriesImmediately()
+
+ private interface ReadableZipEntry {
+ val name: String
+ val size: Long
+ val compressedSize: Long
+ fun openInputStream(): InputStream
+ }
+
+ private class ApacheReadableZipEntry(
+ private val zipFile: ApacheZipFile,
+ private val entry: ZipArchiveEntry
+ ) : ReadableZipEntry {
+ override val name: String get() = entry.name
+ override val size: Long get() = entry.size
+ override val compressedSize: Long get() = entry.compressedSize
+ override fun openInputStream(): InputStream = zipFile.getInputStream(entry)
+ }
+
+ private class JavaReadableZipEntry(
+ private val zipFile: ZipFile,
+ private val entry: java.util.zip.ZipEntry
+ ) : ReadableZipEntry {
+ override val name: String get() = entry.name
+ override val size: Long get() = entry.size
+ override val compressedSize: Long get() = entry.compressedSize
+ override fun openInputStream(): InputStream = zipFile.getInputStream(entry)
+ }
+
+ private fun openZipFile(path: String): ApacheZipFile {
+ val startedAt = System.currentTimeMillis()
+ val useUnicodeExtraFields = shouldUseUnicodeExtraFieldsOnOpen()
+ LOGGER.info("Opening ZIP file: $path (unicodeExtraFields=${if (useUnicodeExtraFields) "on" else "off"})")
+ val zipFile = ApacheZipFile.builder()
.setFile(File(path))
.setCharset(StandardCharsets.UTF_8)
- .setUseUnicodeExtraFields(true)
+ .setUseUnicodeExtraFields(useUnicodeExtraFields)
.get()
+ val elapsedSeconds = ((System.currentTimeMillis() - startedAt).coerceAtLeast(0L)) / 1000L
+ LOGGER.info("Opened ZIP file: $path in ${formatDuration(elapsedSeconds)}")
+ return zipFile
+ }
+
+ private fun openJavaZipFile(path: String): ZipFile {
+ val startedAt = System.currentTimeMillis()
+ LOGGER.info("Opening ZIP file with java.util.zip.ZipFile: $path")
+ val zipFile = ZipFile(path, StandardCharsets.UTF_8)
+ val elapsedSeconds = ((System.currentTimeMillis() - startedAt).coerceAtLeast(0L)) / 1000L
+ LOGGER.info("Opened ZIP file with java.util.zip.ZipFile: $path in ${formatDuration(elapsedSeconds)}")
+ return zipFile
+ }
private fun getFoundryFromZipFileName(zipFileName: String): String {
if (!zipFileName.matches(Regex(".*\\.([^/.]+)\\.zip$"))) {
@@ -2238,6 +2291,8 @@
return "base"
}
+ private fun useJavaZipForNow(): Boolean = canStreamNowEntriesImmediately()
+
private fun processZipFile(zipFilePath: String, foundry: String = "base") {
val ord = zipOrdinals[zipFilePath] ?: 0
val size = zipSizes[zipFilePath] ?: 0L
@@ -2373,11 +2428,13 @@
} else {
foundry // Keep original foundry for non-krill formats
}
- openZipFile(zip).use { zipFile ->
- if (canStreamNowEntriesImmediately()) {
+ if (useJavaZipForNow()) {
+ openJavaZipFile(zip).use { zipFile ->
LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
- } else {
+ }
+ } else {
+ openZipFile(zip).use { zipFile ->
processZipEntriesWithPool(zipFile, zip, zipFoundry, true)
}
}
@@ -2386,16 +2443,18 @@
LOGGER.fine("Opening ZipFile for processing: $zipFilePath")
try {
// If no corresponding base ZIP exists, this IS the base ZIP
- openZipFile(zipFilePath).use { zipFile ->
- LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
- if (canStreamNowEntriesImmediately()) {
+ if (useJavaZipForNow()) {
+ openJavaZipFile(zipFilePath).use { zipFile ->
LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
- } else {
- processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
}
- LOGGER.fine("Returned from processZipEntriesWithPool")
- }
+ } else {
+ openZipFile(zipFilePath).use { zipFile ->
+ LOGGER.fine("Calling processZipEntriesWithPool, foundry=$foundry")
+ processZipEntriesWithPool(zipFile, zipFilePath, foundry, false)
+ LOGGER.fine("Returned from processZipEntriesWithPool")
+ }
+ }
} catch (e: Exception) {
LOGGER.severe("Error processing ZIP: ${e.message}")
e.printStackTrace()
@@ -2435,11 +2494,13 @@
} else {
foundry // Keep original foundry for non-krill formats
}
- openZipFile(zip).use { zipFile ->
- if (canStreamNowEntriesImmediately()) {
+ if (useJavaZipForNow()) {
+ openJavaZipFile(zip).use { zipFile ->
LOGGER.info("Using NOW streaming mode for $zip: archive-order entries, no text-ID sorting")
processZipEntriesStreaming(zipFile, zip, zipFoundry, true)
- } else {
+ }
+ } else {
+ openZipFile(zip).use { zipFile ->
// Iterate entries sorted by text ID to ensure consistent processing order
zipFile.entries.toList()
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
@@ -2451,11 +2512,13 @@
}
}
} else {
- openZipFile(zipFilePath).use { zipFile ->
- if (canStreamNowEntriesImmediately()) {
+ if (useJavaZipForNow()) {
+ openJavaZipFile(zipFilePath).use { zipFile ->
LOGGER.info("Using NOW streaming mode for $zipFilePath: archive-order entries, no text-ID sorting")
processZipEntriesStreaming(zipFile, zipFilePath, foundry, false)
- } else {
+ }
+ } else {
+ openZipFile(zipFilePath).use { zipFile ->
zipFile.entries.toList()
.filter { extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") }
.sortedBy { getTextIdFromPath(it.name) }
@@ -2526,7 +2589,7 @@
}
}
- private fun noteZipEntryProgress(zipPath: String, zipEntry: ZipArchiveEntry) {
+ private fun noteZipEntryProgress(zipPath: String, zipEntry: ReadableZipEntry) {
if (!usesSizeBasedTextProgress()) return
val zipTotal = zipSizes[zipPath] ?: return
val deltaBytes = when {
@@ -2537,15 +2600,81 @@
trackZipProgressBytes(zipPath, deltaBytes)
}
+ private fun noteFirstTextOutput(docId: String) {
+ if (firstTextOutputLogged.compareAndSet(false, true)) {
+ val elapsedSeconds = ((System.currentTimeMillis() - startTimeMillis).coerceAtLeast(0L)) / 1000L
+ LOGGER.info("First text output after ${formatDuration(elapsedSeconds)}: $docId")
+ }
+ }
+
+ private fun tryProcessReadyText(docId: String, foundry: String): Boolean {
+ if (!outputTexts.add(docId)) return false
+ return try {
+ processText(docId, foundry)
+ noteFirstTextOutput(docId)
+ true
+ } catch (t: Throwable) {
+ outputTexts.remove(docId)
+ throw t
+ }
+ }
+
private fun processZipEntriesStreaming(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
val enumEntries = zipFile.entries
+ val phaser = java.util.concurrent.Phaser(1)
+ val maxInFlight = maxOf(maxThreads * 4, 32)
+ val permits = java.util.concurrent.Semaphore(maxInFlight)
while (enumEntries.hasMoreElements()) {
val entry = enumEntries.nextElement()
if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
- processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ if (entryExecutor != null && maxThreads > 1 && !sequentialInZip) {
+ permits.acquireUninterruptibly()
+ phaser.register()
+ entryExecutor!!.execute {
+ try {
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ } finally {
+ permits.release()
+ phaser.arriveAndDeregister()
+ }
+ }
+ } else {
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ }
}
+
+ phaser.arriveAndAwaitAdvance()
+ }
+
+ private fun processZipEntriesStreaming(zipFile: ZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
+ LOGGER.fine("Streaming NOW entries in archive order for $zipPath without text-ID sorting")
+ val enumEntries = zipFile.entries()
+ val phaser = java.util.concurrent.Phaser(1)
+ val maxInFlight = maxOf(maxThreads * 4, 32)
+ val permits = java.util.concurrent.Semaphore(maxInFlight)
+
+ while (enumEntries.hasMoreElements()) {
+ val entry = enumEntries.nextElement()
+ if (extractMetadataRegex.isEmpty() && entry.name.contains("header.xml")) continue
+ if (entryExecutor != null && maxThreads > 1 && !sequentialInZip) {
+ permits.acquireUninterruptibly()
+ phaser.register()
+ entryExecutor!!.execute {
+ try {
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ } finally {
+ permits.release()
+ phaser.arriveAndDeregister()
+ }
+ }
+ } else {
+ processZipEntry(zipFile, zipPath, foundry, entry, waitForMorpho)
+ }
+ }
+
+ phaser.arriveAndAwaitAdvance()
}
private fun processZipEntriesWithPool(zipFile: ApacheZipFile, zipPath: String, foundry: String, waitForMorpho: Boolean) {
@@ -2795,6 +2924,14 @@
}
fun processZipEntry(zipFile: ApacheZipFile, zipPath: String, _foundry: String, zipEntry: ZipArchiveEntry, passedWaitForMorpho: Boolean) {
+ processZipEntry(zipPath, _foundry, ApacheReadableZipEntry(zipFile, zipEntry), passedWaitForMorpho)
+ }
+
+ fun processZipEntry(zipFile: ZipFile, zipPath: String, _foundry: String, zipEntry: java.util.zip.ZipEntry, passedWaitForMorpho: Boolean) {
+ processZipEntry(zipPath, _foundry, JavaReadableZipEntry(zipFile, zipEntry), passedWaitForMorpho)
+ }
+
+ private fun processZipEntry(zipPath: String, _foundry: String, zipEntry: ReadableZipEntry, passedWaitForMorpho: Boolean) {
var foundry = _foundry
var waitForMorpho = passedWaitForMorpho
LOGGER.finer("Processing ${zipEntry.name} in thread ${Thread.currentThread().threadId()}")
@@ -2849,7 +2986,7 @@
// Use DOM for data.xml (large text content) and structure/constituency (complex parsing)
// Use StAX for annotation files (morpho, dependency, tokens, sentences) for better performance
if (!needsDom && !isConstituency && (!isData || useStaxForData)) {
- processXmlEntryStax(zipFile, zipPath, zipEntry, foundry, waitForMorpho)
+ processXmlEntryStax(zipPath, zipEntry, foundry, waitForMorpho)
return
}
@@ -2864,7 +3001,7 @@
return
}
val doc: Document = try {
- zipFile.getInputStream(zipEntry).use { inputStream ->
+ zipEntry.openInputStream().use { inputStream ->
XMLCommentFilterReader(inputStream, "UTF-8").use { reader ->
dBuilder.parse(InputSource(reader))
}
@@ -3061,15 +3198,14 @@
if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
&& (!morphoRequired || morpho[docId] != null)
&& (extractMetadataRegex.isEmpty() || metadata[docId] != null)
- && !outputTexts.contains(docId) // Skip if already output
) {
LOGGER.fine("All data ready for $docId, calling processText")
- processText(docId, foundry)
+ tryProcessReadyText(docId, foundry)
} else {
LOGGER.fine("NOT ready to process $docId yet: textOK=${texts[docId] != null || !textRequired}, sentencesOK=${sentences[docId] != null}, tokensOK=${tokens[docId] != null}, morphoOK=${!morphoRequired || morpho[docId] != null}")
}
} else if ((extractMetadataRegex.isNotEmpty() || outputFormat == OutputFormat.KRILL) && zipEntry.name.matches(Regex(".*/header\\.xml$"))) {
- val headerBytes = zipFile.getInputStream(zipEntry).use { it.readBytes() }
+ val headerBytes = zipEntry.openInputStream().use { it.readBytes() }
val text = headerBytes.toString(Charsets.UTF_8)
val headerDoc = safeDomFactory.newDocumentBuilder().parse(ByteArrayInputStream(headerBytes))
val headerRoot = headerDoc.documentElement
@@ -3119,10 +3255,9 @@
}
if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
&& (!morphoRequired || morpho[docId] != null)
- && !outputTexts.contains(docId) // Skip if already output
) {
LOGGER.info("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
- processText(docId, foundry)
+ tryProcessReadyText(docId, foundry)
}
}
}
@@ -3133,10 +3268,10 @@
}
}
- private fun processXmlEntryStax(zipFile: ApacheZipFile, zipPath: String, zipEntry: ZipArchiveEntry, foundry: String, waitForMorpho: Boolean) {
+ private fun processXmlEntryStax(zipPath: String, zipEntry: ReadableZipEntry, foundry: String, waitForMorpho: Boolean) {
LOGGER.finer("Processing entry (StAX): ${zipEntry.name}, foundry=$foundry")
val factory = xmlInputFactory.get()
- val inputStream = zipFile.getInputStream(zipEntry)
+ val inputStream = zipEntry.openInputStream()
val entryFileName = zipEntry.name.replace(Regex(".*?/([^/]+\\.xml)$"), "$1")
// For krill output and morpho.xml files, bypass XMLCommentFilterReader: large files (80+ MB)
// cause Xerces to fall back to the single-char read() which throws UnsupportedOperationException
@@ -3275,7 +3410,7 @@
&& (!finalMorphoRequired || morpho[docId] != null)
&& (extractMetadataRegex.isEmpty() || metadata[docId] != null)
) {
- processText(docId, foundry)
+ tryProcessReadyText(docId, foundry)
}
} catch (e: Exception) {