Fix premature input zip closing with multpile threads
Change-Id: Ib775ac899d74553533e49b60b2be8250d613aaf6
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 869fe63..389ab32 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -306,6 +306,8 @@
private val LOGGER: Logger = Logger.getLogger(KorapXmlTool::class.java.name)
private var annotationWorkerPool : AnnotationWorkerPool? = null
+ // Shared executor for entry-level parallelism across all zips
+ private var entryExecutor: java.util.concurrent.ExecutorService? = null
val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
@@ -347,7 +349,8 @@
LOGGER.severe("Shell command annotation is not yet supported with output format $outputFormat")
exitProcess(1)
}
- Executors.newFixedThreadPool(maxThreads)
+ // Initialize shared entry executor (used inside each zip)
+ entryExecutor = Executors.newFixedThreadPool(maxThreads)
if (annotateWith.isNotEmpty()) {
annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER)
@@ -410,6 +413,8 @@
LOGGER.info("closing worker pool")
annotationWorkerPool?.close()
}
+ // Shutdown entry executor
+ entryExecutor?.shutdown()
}
private fun processZipsWithQueue(zips: Array<String>, foundry: String, parallelism: Int) {
@@ -519,21 +524,16 @@
}
}
if (zipFilePath.hasCorrespondingBaseZip()) {
- val zips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
- Arrays.stream(zips).parallel().forEach { zip ->
+ val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+ // Process related zips one after another to keep the ZipFile lifetime strictly bounded
+ relatedZips.forEach { zip ->
ZipFile(zip).use { zipFile ->
- zipFile.stream().filter({ extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") })
- .parallel().forEach { zipEntry ->
- processZipEntry(zipFile, foundry, zipEntry, true)
- }
+ processZipEntriesWithPool(zipFile, foundry, true)
}
}
} else {
ZipFile(zipFilePath).use { zipFile ->
- zipFile.stream().filter({ extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") })
- .parallel().forEach { zipEntry ->
- processZipEntry(zipFile, foundry, zipEntry, false)
- }
+ processZipEntriesWithPool(zipFile, foundry, false)
}
}
if (outputFormat == OutputFormat.KORAPXML) {
@@ -613,6 +613,43 @@
return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
}
+ private fun processZipEntriesWithPool(zipFile: ZipFile, foundry: String, waitForMorpho: Boolean) {
+ // Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
+ val entries: MutableList<ZipEntry> = ArrayList()
+ val enumEntries = zipFile.entries()
+ while (enumEntries.hasMoreElements()) {
+ val e = enumEntries.nextElement()
+ if (extractMetadataRegex.isEmpty() && e.name.contains("header.xml")) continue
+ entries.add(e)
+ }
+ if (entries.isEmpty()) return
+
+ // If only one thread requested, do sequential to avoid pool overhead
+ if (maxThreads <= 1) {
+ entries.forEach { entry -> processZipEntry(zipFile, foundry, entry, waitForMorpho) }
+ return
+ }
+
+ // Submit all entry tasks to the shared executor and await completion before closing the zip
+ val latch = java.util.concurrent.CountDownLatch(entries.size)
+ entries.forEach { entry ->
+ entryExecutor?.execute {
+ try {
+ processZipEntry(zipFile, foundry, entry, waitForMorpho)
+ } catch (t: Throwable) {
+ LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
+ } finally {
+ latch.countDown()
+ }
+ }
+ }
+ try {
+ latch.await()
+ } catch (ie: InterruptedException) {
+ Thread.currentThread().interrupt()
+ }
+ }
+
fun processZipEntry(zipFile: ZipFile, _foundry: String, zipEntry: ZipEntry, passedWaitForMorpho: Boolean) {
var foundry = _foundry
var waitForMorpho = passedWaitForMorpho