Fix premature input zip closing with multpile threads

Change-Id: Ib775ac899d74553533e49b60b2be8250d613aaf6
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 869fe63..389ab32 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -306,6 +306,8 @@
     private val LOGGER: Logger = Logger.getLogger(KorapXmlTool::class.java.name)
 
     private var annotationWorkerPool : AnnotationWorkerPool? = null
+    // Shared executor for entry-level parallelism across all zips
+    private var entryExecutor: java.util.concurrent.ExecutorService? = null
 
     val texts: ConcurrentHashMap<String, NonBmpString> = ConcurrentHashMap()
     val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
@@ -347,7 +349,8 @@
             LOGGER.severe("Shell command annotation is not yet supported with output format $outputFormat")
             exitProcess(1)
         }
-        Executors.newFixedThreadPool(maxThreads)
+        // Initialize shared entry executor (used inside each zip)
+        entryExecutor = Executors.newFixedThreadPool(maxThreads)
 
         if (annotateWith.isNotEmpty()) {
             annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER)
@@ -410,6 +413,8 @@
             LOGGER.info("closing worker pool")
             annotationWorkerPool?.close()
         }
+        // Shutdown entry executor
+        entryExecutor?.shutdown()
     }
 
     private fun processZipsWithQueue(zips: Array<String>, foundry: String, parallelism: Int) {
@@ -519,21 +524,16 @@
             }
         }
         if (zipFilePath.hasCorrespondingBaseZip()) {
-            val zips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
-            Arrays.stream(zips).parallel().forEach { zip ->
+            val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
+            // Process related zips one after another to keep the ZipFile lifetime strictly bounded
+            relatedZips.forEach { zip ->
                 ZipFile(zip).use { zipFile ->
-                    zipFile.stream().filter({ extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") })
-                        .parallel().forEach { zipEntry ->
-                            processZipEntry(zipFile, foundry, zipEntry, true)
-                        }
+                    processZipEntriesWithPool(zipFile, foundry, true)
                 }
             }
         } else {
             ZipFile(zipFilePath).use { zipFile ->
-                zipFile.stream().filter({ extractMetadataRegex.isNotEmpty() || !it.name.contains("header.xml") })
-                    .parallel().forEach { zipEntry ->
-                        processZipEntry(zipFile, foundry, zipEntry, false)
-                    }
+                processZipEntriesWithPool(zipFile, foundry, false)
             }
         }
         if (outputFormat == OutputFormat.KORAPXML) {
@@ -613,6 +613,43 @@
         return String.format(Locale.ROOT, "%02d:%02d:%02d", h, m, sec)
     }
 
+    private fun processZipEntriesWithPool(zipFile: ZipFile, foundry: String, waitForMorpho: Boolean) {
+        // Collect entries first to avoid lazy evaluation surprises, filter header.xml unless metadata extraction is requested
+        val entries: MutableList<ZipEntry> = ArrayList()
+        val enumEntries = zipFile.entries()
+        while (enumEntries.hasMoreElements()) {
+            val e = enumEntries.nextElement()
+            if (extractMetadataRegex.isEmpty() && e.name.contains("header.xml")) continue
+            entries.add(e)
+        }
+        if (entries.isEmpty()) return
+
+        // If only one thread requested, do sequential to avoid pool overhead
+        if (maxThreads <= 1) {
+            entries.forEach { entry -> processZipEntry(zipFile, foundry, entry, waitForMorpho) }
+            return
+        }
+
+        // Submit all entry tasks to the shared executor and await completion before closing the zip
+        val latch = java.util.concurrent.CountDownLatch(entries.size)
+        entries.forEach { entry ->
+            entryExecutor?.execute {
+                try {
+                    processZipEntry(zipFile, foundry, entry, waitForMorpho)
+                } catch (t: Throwable) {
+                    LOGGER.warning("Failed to process entry ${entry.name}: ${t.message}")
+                } finally {
+                    latch.countDown()
+                }
+            }
+        }
+        try {
+            latch.await()
+        } catch (ie: InterruptedException) {
+            Thread.currentThread().interrupt()
+        }
+    }
+
     fun processZipEntry(zipFile: ZipFile, _foundry: String, zipEntry: ZipEntry, passedWaitForMorpho: Boolean) {
         var foundry = _foundry
         var waitForMorpho = passedWaitForMorpho