Allow -f zip for -A

Change-Id: Ic40174a524aed909cad233e80aacebc6ebbdf970
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 7a2e765..9fd108f 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -16,12 +16,16 @@
 class AnnotationWorkerPool(
     private val command: String,
     private val numWorkers: Int,
-    private val LOGGER: Logger
+    private val LOGGER: Logger,
+    private val outputHandler: ((String, AnnotationTask?) -> Unit)? = null
 ) {
-    private val queue: BlockingQueue<String> = LinkedBlockingQueue()
+    private val queue: BlockingQueue<AnnotationTask> = LinkedBlockingQueue()
     private val threads = mutableListOf<Thread>()
     private val threadCount = AtomicInteger(0)
     private val threadsLock = Any()
+    private val pendingOutputHandlers = AtomicInteger(0) // Track pending outputHandler calls
+
+    data class AnnotationTask(val text: String, val docId: String?, val entryPath: String?)
 
     init {
         openWorkerPool()
@@ -33,13 +37,24 @@
             Thread {
                 val self = currentThread()
                 var successfullyInitialized = false
-                try {
-                    synchronized(threadsLock) {
-                        threads.add(self)
+                var workerAttempts = 0
+                val maxRestarts = 50 // Allow up to 50 restarts per worker to handle crashes
+
+                while (workerAttempts < maxRestarts && !Thread.currentThread().isInterrupted) {
+                    workerAttempts++
+                    if (workerAttempts > 1) {
+                        LOGGER.info("Worker $workerIndex: Restart attempt $workerAttempts")
                     }
-                    threadCount.incrementAndGet()
-                    successfullyInitialized = true
-                    LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) started.")
+
+                try {
+                    if (workerAttempts == 1) {
+                        synchronized(threadsLock) {
+                            threads.add(self)
+                        }
+                        threadCount.incrementAndGet()
+                        successfullyInitialized = true
+                        LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) started.")
+                    }
 
                     val process = ProcessBuilder("/bin/sh", "-c", command)
                         .redirectOutput(ProcessBuilder.Redirect.PIPE).redirectInput(ProcessBuilder.Redirect.PIPE)
@@ -50,6 +65,10 @@
                         LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed to open pipe for command '$command'")
                         return@Thread // Exits thread, finally block will run
                     }
+
+                    // Declare pendingTasks here so it's accessible after process exits
+                    val pendingTasks: BlockingQueue<AnnotationTask> = LinkedBlockingQueue()
+
                     // Using try-with-resources for streams to ensure they are closed
                     process.outputStream.buffered(BUFFER_SIZE).use { procOutStream ->
                         process.inputStream.buffered(BUFFER_SIZE).use { procInStream ->
@@ -62,15 +81,15 @@
                                 val outputStreamWriter = OutputStreamWriter(procOutStream)
                                 try {
                                     while (true) { // Loop until EOF is received
-                                        val text = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
-                                        if (text == null) { // Timeout, continue waiting for more data
+                                        val task = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
+                                        if (task == null) { // Timeout, continue waiting for more data
                                             if (Thread.currentThread().isInterrupted) {
                                                 LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) writer interrupted, stopping")
                                                 break
                                             }
                                             continue
                                         }
-                                        if (text == "#eof") {
+                                        if (task.text == "#eof") {
                                             try {
                                                 outputStreamWriter.write("\n# eof\n") // Send EOF to process
                                                 outputStreamWriter.flush()
@@ -83,9 +102,14 @@
                                             LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) sent EOF to process and writer is stopping.")
                                             break // Exit while loop
                                         }
+                                        pendingTasks.put(task)
                                         try {
-                                            outputStreamWriter.write(text + "\n# eot\n")
+                                            val dataToSend = task.text + "\n# eot\n\n"
+                                            LOGGER.fine("Worker $workerIndex: Sending ${dataToSend.length} chars to external process")
+                                            LOGGER.finer("Worker $workerIndex: First 500 chars of data to send:\n${dataToSend.take(500)}")
+                                            outputStreamWriter.write(dataToSend)
                                             outputStreamWriter.flush()
+                                            LOGGER.fine("Worker $workerIndex: Data sent and flushed")
                                         } catch (e: IOException) {
                                             LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed to write to process: ${e.message}")
                                             break // Exit the loop
@@ -99,8 +123,11 @@
                             // Reader coroutine
                             coroutineScope.launch {
                                 val output = StringBuilder()
+                                var lastLineWasEmpty = false
+                                var linesRead = 0
                                 try {
                                     procInStream.bufferedReader().use { reader ->
+                                        LOGGER.fine("Worker $workerIndex: Reader started, waiting for input from external process")
                                         while (!inputGotEof) {
                                             if (Thread.currentThread().isInterrupted) {
                                                 LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) reader interrupted, stopping")
@@ -112,31 +139,107 @@
                                                     sleep(5) // Very short sleep when waiting for more output
                                                     continue
                                                 } else {
+                                                    LOGGER.fine("Worker $workerIndex: External process died, no more input")
                                                     break
                                                 }
                                             }
-                                            when (line) {
-                                                "# eof" -> {
+                                            linesRead++
+                                            when {
+                                                line == "# eof" -> {
                                                     LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) got EOF in output")
                                                     inputGotEof = true
                                                     if (output.isNotEmpty()) {
-                                                        printOutput(output.toString()) // Print any remaining output
+                                                        val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+                                                        if (outputHandler != null) {
+                                                            if (task == null) {
+                                                                LOGGER.warning("Worker $workerIndex: Got # eof but no task in pendingTasks queue!")
+                                                            }
+                                                            LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (EOF)")
+                                                            pendingOutputHandlers.incrementAndGet()
+                                                            try {
+                                                                outputHandler.invoke(output.toString(), task)
+                                                            } finally {
+                                                                pendingOutputHandlers.decrementAndGet()
+                                                            }
+                                                        } else {
+                                                            printOutput(output.toString())
+                                                        }
                                                         output.clear()
                                                     }
                                                     break
                                                 }
-                                                "# eot" -> {
-                                                    printOutput(output.toString()) // Assuming printOutput is thread-safe
+                                                line == "# eot" -> {
+                                                    val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+                                                    if (outputHandler != null) {
+                                                        if (task == null) {
+                                                            LOGGER.warning("Worker $workerIndex: Got # eot but no task in pendingTasks queue!")
+                                                        }
+                                                        LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (EOT)")
+                                                        pendingOutputHandlers.incrementAndGet()
+                                                        try {
+                                                            outputHandler.invoke(output.toString(), task)
+                                                        } finally {
+                                                            pendingOutputHandlers.decrementAndGet()
+                                                        }
+                                                    } else {
+                                                        LOGGER.fine("Worker $workerIndex: Printing output (${output.length} chars)")
+                                                        printOutput(output.toString())
+                                                    }
                                                     output.clear()
+                                                    lastLineWasEmpty = false
+                                                }
+                                                line.isEmpty() -> {
+                                                    // Empty line - potential document separator
+                                                    // In CoNLL-U, double empty line marks end of document
+                                                    if (lastLineWasEmpty && output.isNotEmpty()) {
+                                                        // This is the second empty line - end of document
+                                                        if (outputHandler != null) {
+                                                            val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+                                                            if (task == null) {
+                                                                LOGGER.warning("Worker $workerIndex: Double empty line detected but no task in pendingTasks queue!")
+                                                            }
+                                                            LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (double empty line)")
+                                                            pendingOutputHandlers.incrementAndGet()
+                                                            try {
+                                                                outputHandler.invoke(output.toString(), task)
+                                                            } finally {
+                                                                pendingOutputHandlers.decrementAndGet()
+                                                            }
+                                                            output.clear()
+                                                            lastLineWasEmpty = false
+                                                        } else {
+                                                            // For stdout mode, just add the empty line
+                                                            output.append('\n')
+                                                            lastLineWasEmpty = true
+                                                        }
+                                                    } else {
+                                                        output.append('\n')
+                                                        lastLineWasEmpty = true
+                                                    }
                                                 }
                                                 else -> {
                                                     output.append(line).append('\n')
+                                                    lastLineWasEmpty = false
                                                 }
                                             }
                                         }
                                     }
                                     if (output.isNotEmpty()) { // Print any remaining output
-                                        printOutput(output.toString())
+                                        val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+                                        if (outputHandler != null) {
+                                            if (task == null) {
+                                                LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
+                                            }
+                                            LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
+                                            pendingOutputHandlers.incrementAndGet()
+                                            try {
+                                                outputHandler.invoke(output.toString(), task)
+                                            } finally {
+                                                pendingOutputHandlers.decrementAndGet()
+                                            }
+                                        } else {
+                                            printOutput(output.toString())
+                                        }
                                     }
                                 } catch (e: Exception) {
                                     LOGGER.severe("Reader coroutine in worker $workerIndex (thread ${self.threadId()}) failed: ${e.message}")
@@ -163,27 +266,58 @@
                     val exitCode = process.waitFor()
                     if (exitCode != 0) {
                         LOGGER.warning("Worker $workerIndex (thread ${self.threadId()}) process exited with code $exitCode")
+
+                        // Return any pending tasks back to the queue for other workers to process
+                        val remainingTasks = mutableListOf<AnnotationTask>()
+                        pendingTasks.drainTo(remainingTasks)
+                        if (remainingTasks.isNotEmpty()) {
+                            LOGGER.warning("Worker $workerIndex: Returning ${remainingTasks.size} unprocessed task(s) to queue after process failure")
+                            remainingTasks.forEach { task ->
+                                if (task.text != "#eof") { // Don't re-queue EOF markers
+                                    try {
+                                        queue.put(task)
+                                    } catch (e: InterruptedException) {
+                                        LOGGER.severe("Failed to return task to queue: ${e.message}")
+                                    }
+                                }
+                            }
+                        }
+
+                        // Check if there are more items in the queue to process
+                        if (queue.isEmpty()) {
+                            LOGGER.info("Worker $workerIndex: Queue is empty after crash, exiting")
+                            break // Exit the restart loop
+                        } else {
+                            LOGGER.info("Worker $workerIndex: Restarting to process remaining ${queue.size} items in queue")
+                            continue // Restart the worker
+                        }
                     } else {
                         LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) process finished normally")
+                        break // Normal exit
                     }
                 } catch (e: IOException) {
                     LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed: ${e.message}")
+                    break // Exit restart loop on IOException
                 } catch (e: InterruptedException) {
                     LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) was interrupted during processing")
                     Thread.currentThread().interrupt() // Restore interrupt status
+                    break // Exit restart loop on interrupt
                 } catch (e: Exception) { // Catch any other unexpected exceptions during setup or process handling
                     LOGGER.severe("Unhandled exception in worker thread ${self.threadId()} (index $workerIndex): ${e.message}")
                     e.printStackTrace()
-                } finally {
-                    if (successfullyInitialized) {
-                        synchronized(threadsLock) {
-                            threads.remove(self)
-                        }
-                        threadCount.decrementAndGet()
-                        LOGGER.info("Worker thread ${self.threadId()} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
-                    } else {
-                        LOGGER.warning("Worker thread ${self.threadId()} (index $workerIndex) exiting without full initialization/cleanup.")
+                    break // Exit restart loop on unexpected exceptions
+                }
+                } // End of while (workerAttempts < maxRestarts) loop
+
+                // Cleanup after all restart attempts
+                if (successfullyInitialized) {
+                    synchronized(threadsLock) {
+                        threads.remove(self)
                     }
+                    threadCount.decrementAndGet()
+                    LOGGER.info("Worker thread ${self.threadId()} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
+                } else {
+                    LOGGER.warning("Worker thread ${self.threadId()} (index $workerIndex) exiting without full initialization/cleanup.")
                 }
             }.start()
         }
@@ -201,9 +335,10 @@
         }
     }
 
-    fun pushToQueue(text: String) {
+    fun pushToQueue(text: String, docId: String? = null, entryPath: String? = null) {
         try {
-            queue.put(text)
+            LOGGER.fine("pushToQueue called: text length=${text.length}, docId=$docId, entryPath=$entryPath")
+            queue.put(AnnotationTask(text, docId, entryPath))
         } catch (e: InterruptedException) {
             Thread.currentThread().interrupt()
             LOGGER.warning("Interrupted while trying to push text to queue.")
@@ -213,7 +348,7 @@
     fun pushToQueue(texts: List<String>) {
         texts.forEach { text ->
             try {
-                queue.put(text)
+                queue.put(AnnotationTask(text, null, null))
             } catch (e: InterruptedException) {
                 Thread.currentThread().interrupt()
                 LOGGER.warning("Interrupted while trying to push texts to queue. Some texts may not have been added.")
@@ -224,13 +359,14 @@
 
     fun close() {
         val currentThreadCount = threadCount.get()
-        LOGGER.info("Closing worker pool with $currentThreadCount threads")
-        
+        val queueSizeBeforeEOF = queue.size
+        LOGGER.info("Closing worker pool with $currentThreadCount threads, queue size: $queueSizeBeforeEOF")
+
         // Send EOF marker for each worker - use numWorkers instead of current thread count
         // to ensure we send enough EOF markers even if some threads haven't started yet
         for (i in 0 until numWorkers) {
             try {
-                queue.put("#eof")
+                queue.put(AnnotationTask("#eof", null, null))
                 LOGGER.info("Sent EOF marker ${i+1}/$numWorkers to queue")
             } catch (e: InterruptedException) {
                 Thread.currentThread().interrupt()
@@ -239,23 +375,36 @@
             }
         }
         
+        LOGGER.info("All EOF markers sent, queue size now: ${queue.size}")
+
         if (threadCount.get() > 0) {
             waitForWorkersToFinish()
         }
     }
 
     private fun waitForWorkersToFinish() {
+        val startTime = System.currentTimeMillis()
+        var lastReportedSize = queue.size
         LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
         while (queue.isNotEmpty()) {
             try {
                 sleep(50) // Reduced sleep time for more responsive monitoring
+                val currentSize = queue.size
+                val elapsed = (System.currentTimeMillis() - startTime) / 1000
+
+                // Report every 5 seconds or when size changes significantly
+                if (elapsed % 5 == 0L && currentSize != lastReportedSize) {
+                    LOGGER.info("Queue status: $currentSize items remaining (${elapsed}s elapsed)")
+                    lastReportedSize = currentSize
+                }
             } catch (e: InterruptedException) {
                 Thread.currentThread().interrupt()
                 LOGGER.warning("Interrupted while waiting for queue to empty. Proceeding to join threads.")
                 break
             }
         }
-        LOGGER.info("Queue is empty. Joining worker threads.")
+        val totalTime = (System.currentTimeMillis() - startTime) / 1000
+        LOGGER.info("Queue is empty after ${totalTime}s. Joining worker threads.")
 
         val threadsToJoin: List<Thread>
         synchronized(threadsLock) {
@@ -268,11 +417,11 @@
             LOGGER.info("Attempting to join ${threadsToJoin.size} thread(s) from recorded list (current active count: ${threadCount.get()}).")
             threadsToJoin.forEach { thread ->
                 try {
-                    thread.join(10000) // Increased timeout to 10 seconds
+                    thread.join(1800000) // 30 minutes timeout - allow workers time to restart and process all documents
                     if (thread.isAlive) {
-                        LOGGER.warning("Thread ${thread.threadId()} did not terminate after 10s. Interrupting.")
+                        LOGGER.warning("Thread ${thread.threadId()} did not terminate after 30 minutes. Interrupting.")
                         thread.interrupt()
-                        thread.join(2000) // Wait 2 seconds after interrupt
+                        thread.join(10000) // Wait 10 seconds after interrupt
                         if (thread.isAlive) {
                             LOGGER.severe("Thread ${thread.threadId()} failed to terminate after interrupt.")
                         }
@@ -296,6 +445,30 @@
                 }
             }
         }
+
+        // CRITICAL: Wait for all pending outputHandler invocations to complete
+        val pendingCount = pendingOutputHandlers.get()
+        if (pendingCount > 0) {
+            LOGGER.info("Waiting for $pendingCount pending outputHandler invocation(s) to complete...")
+        }
+        val startWait = System.currentTimeMillis()
+        while (pendingOutputHandlers.get() > 0) {
+            try {
+                sleep(100)
+                val elapsed = System.currentTimeMillis() - startWait
+                if (elapsed > 30000) { // 30 second timeout
+                    LOGGER.severe("Timeout waiting for ${pendingOutputHandlers.get()} pending outputHandler(s) after 30s!")
+                    break
+                }
+            } catch (e: InterruptedException) {
+                Thread.currentThread().interrupt()
+                LOGGER.warning("Interrupted while waiting for pending outputHandlers")
+                break
+            }
+        }
+        if (pendingCount > 0) {
+            LOGGER.info("All outputHandler invocations completed")
+        }
     }
 }
 
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a9b1333..eb9687f 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -317,6 +317,8 @@
     val metadata: ConcurrentHashMap<String, Array<String>> = ConcurrentHashMap()
     val extraFeatures: ConcurrentHashMap<String, MutableMap<String, String>> = ConcurrentHashMap()
     private val processedDocs = java.util.concurrent.atomic.AtomicInteger(0)
+    private val docsSentToAnnotation = java.util.concurrent.atomic.AtomicInteger(0)
+    private val docsWrittenToZip = java.util.concurrent.atomic.AtomicInteger(0)
     var taggerToolBridges: ConcurrentHashMap<Long, TaggerToolBridge?> = ConcurrentHashMap()
     var parserToolBridges: ConcurrentHashMap<Long, ParserToolBridge?> = ConcurrentHashMap()
 
@@ -345,15 +347,57 @@
     }
 
     fun korapxml2conllu(args: Array<String>) {
-        if (outputFormat == OutputFormat.KORAPXML && annotateWith.isNotEmpty()) {
-            LOGGER.severe("Shell command annotation is not yet supported with output format $outputFormat")
-            exitProcess(1)
-        }
         // Initialize shared entry executor (used inside each zip)
         entryExecutor = Executors.newFixedThreadPool(maxThreads)
 
         if (annotateWith.isNotEmpty()) {
-            annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER)
+            // Initialize ZIP output stream BEFORE creating worker pool, if needed
+            if (outputFormat == OutputFormat.KORAPXML) {
+                // Determine output filename
+                val inputZipPath = args[0] // First ZIP file
+                var targetFoundry = "base"
+                when {
+                    annotateWith.contains("spacy") -> targetFoundry = "spacy"
+                    annotateWith.contains("stanza") -> targetFoundry = "stanza"
+                    annotateWith.contains("udpipe") -> targetFoundry = "udpipe"
+                    annotateWith.contains("tree") -> targetFoundry = "tree_tagger"
+                    annotateWith.contains("marmot") -> targetFoundry = "marmot"
+                    annotateWith.contains("opennlp") -> targetFoundry = "opennlp"
+                    annotateWith.contains("corenlp") -> targetFoundry = "corenlp"
+                    else -> targetFoundry = "annotated"
+                }
+
+                val outputMorphoZipFileName = inputZipPath.replace(Regex("\\.zip$"), ".".plus(targetFoundry).plus(".zip"))
+                LOGGER.info("Initializing output ZIP: $outputMorphoZipFileName (from input: $inputZipPath, foundry: $targetFoundry)")
+
+                if (File(outputMorphoZipFileName).exists() && !overwrite) {
+                    LOGGER.severe("Output file $outputMorphoZipFileName already exists. Use --overwrite to overwrite.")
+                    exitProcess(1)
+                }
+
+                // Delete old file if it exists
+                if (File(outputMorphoZipFileName).exists()) {
+                    LOGGER.info("Deleting existing file: $outputMorphoZipFileName")
+                    File(outputMorphoZipFileName).delete()
+                }
+
+                dbFactory = DocumentBuilderFactory.newInstance()
+                dBuilder = dbFactory!!.newDocumentBuilder()
+                val fileOutputStream = FileOutputStream(outputMorphoZipFileName)
+                morphoZipOutputStream = ZipArchiveOutputStream(fileOutputStream).apply {
+                    setUseZip64(Zip64Mode.Always)
+                }
+                LOGGER.info("Initialized morphoZipOutputStream for external annotation to: $outputMorphoZipFileName")
+            }
+
+            if (outputFormat == OutputFormat.KORAPXML) {
+                // For ZIP output with external annotation, we need a custom handler
+                annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER) { annotatedConllu, task ->
+                    parseAndWriteAnnotatedConllu(annotatedConllu, task)
+                }
+            } else {
+                annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER, null)
+            }
         }
 
         var zips: Array<String> = args
@@ -425,7 +469,28 @@
 
         if (annotationWorkerPool != null) {
             LOGGER.info("closing worker pool")
+            LOGGER.info("Documents sent to annotation: ${docsSentToAnnotation.get()}")
             annotationWorkerPool?.close()
+            LOGGER.info("Documents written to ZIP: ${docsWrittenToZip.get()}")
+
+            // Close the ZIP file after worker pool is done (if using external annotation with ZIP output)
+            if (outputFormat == OutputFormat.KORAPXML && morphoZipOutputStream != null) {
+                try {
+                    morphoZipOutputStream!!.flush()
+                    morphoZipOutputStream!!.close()
+                    LOGGER.info("Closed output ZIP file after annotation processing")
+                } catch (e: Exception) {
+                    LOGGER.severe("ERROR closing ZIP file: ${e.message}")
+                    e.printStackTrace()
+                }
+            }
+
+            // Check if all documents were written
+            val sent = docsSentToAnnotation.get()
+            val written = docsWrittenToZip.get()
+            if (sent != written) {
+                LOGGER.warning("Document count mismatch! Sent to annotation: $sent, Written to ZIP: $written (missing: ${sent - written})")
+            }
         }
         // Shutdown entry executor
         entryExecutor?.shutdown()
@@ -520,6 +585,19 @@
                 }
             } else if (parserName != null) {
                 targetFoundry = parserName!!
+            } else if (annotateWith.isNotEmpty()) {
+                // Try to detect foundry from external annotation command
+                when {
+                    annotateWith.contains("spacy") -> targetFoundry = "spacy"
+                    annotateWith.contains("stanza") -> targetFoundry = "stanza"
+                    annotateWith.contains("udpipe") -> targetFoundry = "udpipe"
+                    annotateWith.contains("tree") -> targetFoundry = "tree_tagger"
+                    annotateWith.contains("marmot") -> targetFoundry = "marmot"
+                    annotateWith.contains("opennlp") -> targetFoundry = "opennlp"
+                    annotateWith.contains("corenlp") -> targetFoundry = "corenlp"
+                    else -> targetFoundry = "annotated"
+                }
+                LOGGER.info("Detected foundry '$targetFoundry' from annotation command: $annotateWith")
             }
             dbFactory = DocumentBuilderFactory.newInstance()
             dBuilder = dbFactory!!.newDocumentBuilder()
@@ -528,6 +606,7 @@
                     zipFilePath.replace(Regex("(\\.(opennlp|marmot|tree_tagger|corenlp|spacy))?\\.zip$"), ".".plus(parserName).plus(".zip"))
                 else
                     zipFilePath.replace(Regex("\\.zip$"), ".".plus(targetFoundry).plus(".zip"))
+            LOGGER.info("Output ZIP file: $outputMorphoZipFileName")
             if (File(outputMorphoZipFileName).exists() && !overwrite) {
                 LOGGER.severe("Output file $outputMorphoZipFileName already exists. Use --overwrite to overwrite.")
                 exitProcess(1)
@@ -536,6 +615,9 @@
             morphoZipOutputStream = ZipArchiveOutputStream(fileOutputStream).apply {
                 setUseZip64(Zip64Mode.Always)
             }
+            LOGGER.info("Initialized morphoZipOutputStream for $outputMorphoZipFileName")
+        } else {
+            LOGGER.info("Skipping ZIP initialization: dbFactory=${dbFactory != null}, outputFormat=$outputFormat")
         }
         if (zipFilePath.hasCorrespondingBaseZip()) {
             val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
@@ -550,8 +632,12 @@
                 processZipEntriesWithPool(zipFile, foundry, false)
             }
         }
-        if (outputFormat == OutputFormat.KORAPXML) {
+        // Don't close the ZIP here if using external annotation - it will be closed after worker pool finishes
+        if (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null) {
+            LOGGER.fine("Closing output ZIP file in processZipFile (no annotation worker pool)")
             morphoZipOutputStream!!.close()
+        } else if (outputFormat == OutputFormat.KORAPXML) {
+            LOGGER.fine("NOT closing ZIP in processZipFile - will close after worker pool finishes")
         }
         logZipProgress(zipFilePath)
     }
@@ -747,19 +833,23 @@
                     }
                 }
 
-                val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || outputFormat == OutputFormat.KORAPXML
+                val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null)
                 // For lemma-only/lemma-based word2vec/now, we can proceed without full text
                 val textRequired = when (outputFormat) {
                     OutputFormat.WORD2VEC, OutputFormat.NOW -> !(useLemma || lemmaOnly)
                     else -> true
                 }
+
+                LOGGER.fine("Checking if ready to process $docId: texts=${texts[docId] != null}, sentences=${sentences[docId] != null}, tokens=${tokens[docId] != null}, morpho=${morpho[docId] != null}, morphoRequired=$morphoRequired, textRequired=$textRequired, annotationWorkerPool=${annotationWorkerPool != null}")
+
                 if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
                     && (!morphoRequired || morpho[docId] != null)
                     && (extractMetadataRegex.isEmpty() || metadata[docId] != null)
                 ) {
-                    // Be quiet on INFO; per-text logs only on FINE and below
-                    LOGGER.fine("Processing text: $docId in thread ${Thread.currentThread().threadId()}")
+                    LOGGER.fine("All data ready for $docId, calling processText")
                     processText(docId, foundry)
+                } else {
+                    LOGGER.fine("NOT ready to process $docId yet: textOK=${texts[docId] != null || !textRequired}, sentencesOK=${sentences[docId] != null}, tokensOK=${tokens[docId] != null}, morphoOK=${!morphoRequired || morpho[docId] != null}")
                 }
             } else if (extractMetadataRegex.isNotEmpty() && zipEntry.name.matches(Regex(".*/header\\.xml$"))) {
                 //LOGGER.info("Processing header file: " + zipEntry.name)
@@ -777,7 +867,7 @@
                 }
                 if (meta.isNotEmpty() && docId != null) {
                     metadata[docId] = meta.toTypedArray()
-                    val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || outputFormat == OutputFormat.KORAPXML
+                    val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null)
                     val textRequired = when (outputFormat) {
                         OutputFormat.WORD2VEC, OutputFormat.NOW -> !(useLemma || lemmaOnly)
                         else -> true
@@ -786,7 +876,7 @@
                          && (!morphoRequired || morpho[docId] != null)
                      ) {
                         // Be quiet on INFO; per-text logs only on FINE and below
-                        LOGGER.fine("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
+                        LOGGER.info("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
                         processText(docId, foundry)
                     }
                 }
@@ -838,7 +928,20 @@
         }
 
         if (annotationWorkerPool != null) {
-            annotationWorkerPool?.pushToQueue(output.toString())
+            if (outputFormat == OutputFormat.KORAPXML) {
+                // Store metadata in task, send clean CoNLL-U to external process
+                val entryPath = if (parserName != null)  docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
+                else
+                    docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
+                LOGGER.fine("Sending document $docId (${output.length} chars) to annotation worker pool for ZIP output")
+                // Pass metadata via AnnotationTask, NOT in the text itself
+                annotationWorkerPool?.pushToQueue(output.toString(), docId, entryPath + "|" + foundry)
+                docsSentToAnnotation.incrementAndGet()
+            } else {
+                LOGGER.fine("Sending document $docId (${output.length} chars) to annotation worker pool")
+                annotationWorkerPool?.pushToQueue(output.toString())
+                docsSentToAnnotation.incrementAndGet()
+            }
             // Release internal char[] early
             output.setLength(0)
         } else if (outputFormat != OutputFormat.KORAPXML) {
@@ -848,7 +951,18 @@
             // Release internal char[] early
             output.setLength(0)
         } else {
-            korapXmlOutput(foundry, docId)
+            // Direct ZIP output without external annotation
+            val entryPath = if (parserName != null)  docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
+            else
+                docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
+            val zipEntry = ZipArchiveEntry(entryPath)
+            zipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+            synchronized(morphoZipOutputStream!!) {
+                morphoZipOutputStream!!.putArchiveEntry(zipEntry)
+                morphoZipOutputStream!!.write(output.toString().toByteArray())
+                morphoZipOutputStream!!.closeArchiveEntry()
+            }
+            output.clear()
         }
 
 
@@ -872,20 +986,6 @@
                 logMemoryStats(count)
             }
         }
-
-        if (outputFormat == OutputFormat.KORAPXML) {
-            val entryPath = if (parserName != null)  docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
-            else
-                docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
-            val zipEntry = ZipArchiveEntry(entryPath)
-            zipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
-            synchronized(morphoZipOutputStream!!) {
-                morphoZipOutputStream!!.putArchiveEntry(zipEntry)
-                morphoZipOutputStream!!.write(output.toString().toByteArray())
-                morphoZipOutputStream!!.closeArchiveEntry()
-            }
-            output.clear()
-        }
     }
 
     private fun getMorphoFoundry() = taggerToolBridges[Thread.currentThread().threadId()]?.foundry ?: "base"
@@ -1113,33 +1213,64 @@
                 previousSpanStart = span.from + 1
             }
             // Bestimme den Token-Text sicher
-            val tokenText: String = if (textVal != null) {
+            var tokenText: String = if (textVal != null) {
                 val safeFrom = span.from.coerceIn(0, textVal.length)
                 val safeTo = span.to.coerceIn(safeFrom, textVal.length)
                 textVal.substring(safeFrom, safeTo)
             } else "_"
 
+            // Validate and fix empty/whitespace-only tokens that cause SpaCy to crash
+            if (tokenText.isBlank()) {
+                LOGGER.fine("Replacing empty/blank token at offset ${span.from}-${span.to} in document $docId with underscore")
+                tokenText = "_"  // Replace with underscore instead of skipping
+            }
+
             if (morpho[docId]?.containsKey("${span.from}-${span.to}") == true) {
                 val mfs = morpho[docId]!!["${span.from}-${span.to}"]
-                output.append(
-                    printConlluToken(
-                        token_index,
-                        tokenText,
-                        mfs!!.lemma!!,
-                        mfs.upos!!,
-                        mfs.xpos!!,
-                        mfs.feats!!,
-                        mfs.head!!,
-                        mfs.deprel!!,
-                        mfs.deps!!,
-                        mfs.misc!!,
-                        columns
+                if (mfs != null) {
+                    // Add offset info to MISC field for external annotation with ZIP output
+                    val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+                        val existing = mfs.misc ?: "_"
+                        if (existing == "_") "Offset=${span.from}-${span.to}"
+                        else "${existing}|Offset=${span.from}-${span.to}"
+                    } else mfs.misc ?: "_"
+
+                    output.append(
+                        printConlluToken(
+                            token_index,
+                            tokenText,
+                            mfs.lemma ?: "_",
+                            mfs.upos ?: "_",
+                            mfs.xpos ?: "_",
+                            mfs.feats ?: "_",
+                            mfs.head ?: "_",
+                            mfs.deprel ?: "_",
+                            mfs.deps ?: "_",
+                            miscWithOffset,
+                            columns
+                        )
                     )
-                )
+                } else {
+                    // Fallback if mfs is null
+                    val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+                        "Offset=${span.from}-${span.to}"
+                    } else "_"
+
+                    output.append(
+                        printConlluToken(
+                            token_index, tokenText, misc = miscWithOffset, columns = columns
+                        )
+                    )
+                }
             } else {
+                // Add offset info for tokens without morpho data when needed
+                val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+                    "Offset=${span.from}-${span.to}"
+                } else "_"
+
                 output.append(
                     printConlluToken(
-                        token_index, tokenText, columns = columns
+                        token_index, tokenText, misc = miscWithOffset, columns = columns
                     )
                 )
             }
@@ -1210,10 +1341,10 @@
         var real_token_index = 0
         var sentence_index = 0
         val output: StringBuilder = StringBuilder()
-        
+
         // Add the text sigle prefix
         output.append("@@$docId ")
-        
+
         if (texts[docId] == null) {
             // Lemma-only fallback when original text is not loaded
             tokens[docId]?.forEach { span ->
@@ -1449,6 +1580,198 @@
         var misc: String? = "_"
     )
 
+    private fun parseAndWriteAnnotatedConllu(annotatedConllu: String, task: AnnotationWorkerPool.AnnotationTask?) {
+        LOGGER.fine("parseAndWriteAnnotatedConllu called with ${annotatedConllu.length} chars, task=$task")
+
+        // Extract metadata from task
+        val docId = task?.docId
+        val entryPathAndFoundry = task?.entryPath?.split("|") ?: listOf(null, null)
+        val entryPath = entryPathAndFoundry.getOrNull(0)
+        val foundry = entryPathAndFoundry.getOrNull(1) ?: "base"
+
+        if (docId == null || entryPath == null) {
+            LOGGER.fine("Missing metadata from task! docId=$docId, entryPath=$entryPath, task=$task")
+            return
+        }
+
+        LOGGER.fine("Processing annotated document: docId=$docId, entryPath=$entryPath, foundry=$foundry")
+
+        val morphoSpans = mutableMapOf<String, MorphoSpan>()
+
+        // Parse the annotated CoNLL-U to extract morpho data
+        val lines = annotatedConllu.lines()
+        var currentStartOffsets: List<Int>? = null
+        var currentEndOffsets: List<Int>? = null
+        var tokenIndexInSentence = 0
+
+        for (line in lines) {
+            when {
+                line.startsWith("# start_offsets =") -> {
+                    // Parse start offsets from comment
+                    // Format: # start_offsets = <first_token_from> <token1_from> <token2_from> ...
+                    // The first value is duplicated, so skip it and use values from index 1 onwards
+                    val offsetsStr = line.substring("# start_offsets =".length).trim()
+                    val allOffsets = offsetsStr.split(Regex("\\s+")).mapNotNull { it.toIntOrNull() }
+                    currentStartOffsets = if (allOffsets.size > 1) allOffsets.drop(1) else allOffsets
+                    tokenIndexInSentence = 0
+                    LOGGER.fine("Found start offsets: $currentStartOffsets")
+                }
+                line.startsWith("# end_offsets =") -> {
+                    // Parse end offsets from comment
+                    // Format: # end_offsets = <sentence_end> <token1_to> <token2_to> ...
+                    // First value is sentence end, actual token ends start from index 1
+                    val offsetsStr = line.substring("# end_offsets =".length).trim()
+                    val allOffsets = offsetsStr.split(Regex("\\s+")).mapNotNull { it.toIntOrNull() }
+                    currentEndOffsets = if (allOffsets.size > 1) allOffsets.drop(1) else emptyList()
+                    LOGGER.fine("Found end offsets: $currentEndOffsets")
+                }
+                line.isEmpty() -> {
+                    // Reset for next sentence
+                    currentStartOffsets = null
+                    currentEndOffsets = null
+                    tokenIndexInSentence = 0
+                }
+                !line.startsWith("#") -> {
+                    // This is a token line
+                    val fields = line.split("\t")
+                    if (fields.size < 10) continue
+
+                    val lemma = if (fields.size > 2) fields[2] else "_"
+                    val upos = if (fields.size > 3) fields[3] else "_"
+                    val xpos = if (fields.size > 4) fields[4] else "_"
+                    val feats = if (fields.size > 5) fields[5] else "_"
+                    val head = if (fields.size > 6) fields[6] else "_"
+                    val deprel = if (fields.size > 7) fields[7] else "_"
+                    val deps = if (fields.size > 8) fields[8] else "_"
+                    val misc = if (fields.size > 9) fields[9] else "_"
+
+                    // Get offset from the comment-based offset arrays
+                    if (currentStartOffsets != null && currentEndOffsets != null &&
+                        tokenIndexInSentence < currentStartOffsets.size &&
+                        tokenIndexInSentence < currentEndOffsets.size) {
+
+                        val spanFrom = currentStartOffsets[tokenIndexInSentence]
+                        val spanTo = currentEndOffsets[tokenIndexInSentence]
+                        val spanKey = "$spanFrom-$spanTo"
+
+                        morphoSpans[spanKey] = MorphoSpan(lemma, upos, xpos, feats, head, deprel, deps, misc)
+                        LOGGER.fine("Added morpho span: $spanKey -> $lemma/$upos")
+                        tokenIndexInSentence++
+                    } else {
+                        LOGGER.fine("No offset information for token at index $tokenIndexInSentence in sentence (starts=${currentStartOffsets?.size}, ends=${currentEndOffsets?.size})")
+                    }
+                }
+            }
+        }
+
+        LOGGER.fine("Extracted ${morphoSpans.size} morpho spans for $docId")
+
+        if (morphoSpans.isEmpty()) {
+            LOGGER.warning("No morpho spans found in annotated output for $docId, skipping")
+            LOGGER.warning("Sample lines: ${lines.take(10).joinToString("\\n")}")
+            return
+        }
+
+        // Check if the data contains dependency information (non-empty head/deprel fields)
+        val hasDependencies = morphoSpans.values.any { span ->
+            span.head != null && span.head != "_" && span.deprel != null && span.deprel != "_"
+        }
+        LOGGER.fine("Document has dependencies: $hasDependencies")
+
+        if (morphoZipOutputStream == null) {
+            LOGGER.severe("morphoZipOutputStream is null! Cannot write to ZIP. This should have been initialized in processZipFile.")
+            return
+        }
+
+        if (dBuilder == null) {
+            LOGGER.warning("dBuilder is null, initializing now...")
+            dbFactory = DocumentBuilderFactory.newInstance()
+            dBuilder = dbFactory!!.newDocumentBuilder()
+        }
+
+        // Create a temporary document context for korapXmlOutput
+        // Store the morpho data temporarily and copy necessary supporting data
+        val tempDocId = "_temp_annotated_$docId"
+        morpho[tempDocId] = morphoSpans
+
+        // For dependency output, we need sentences data
+        // Create dummy sentence spans covering the entire document based on tokens
+        if (hasDependencies && morphoSpans.isNotEmpty()) {
+            // Get min and max offsets from all tokens
+            val allOffsets = morphoSpans.keys.map { key ->
+                val parts = key.split("-")
+                Pair(parts[0].toInt(), parts[1].toInt())
+            }
+            val minOffset = allOffsets.minOfOrNull { it.first } ?: 0
+            val maxOffset = allOffsets.maxOfOrNull { it.second } ?: 0
+
+            // Create a single sentence span covering all tokens
+            // This is a simplification - ideally we'd track sentence boundaries from CoNLL-U
+            sentences[tempDocId] = arrayOf(Span(minOffset, maxOffset))
+        }
+
+        LOGGER.fine("Generating KorapXML for $docId with ${morphoSpans.size} spans")
+
+        // Generate morpho.xml (always)
+        try {
+            val morphoXmlOutput = korapXmlMorphoOutput(foundry, tempDocId)
+
+            // Fix the docid attribute - replace temp prefix with actual docId
+            val fixedMorphoXml = morphoXmlOutput.toString().replace(
+                "docid=\"$tempDocId\"",
+                "docid=\"$docId\""
+            )
+
+            val morphoEntryPath = docId.replace(Regex("[_.]"), "/") + "/$foundry/morpho.xml"
+
+            val morphoZipEntry = ZipArchiveEntry(morphoEntryPath)
+            morphoZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+            LOGGER.fine("Writing ${fixedMorphoXml.length} bytes to ZIP entry: $morphoEntryPath")
+            synchronized(morphoZipOutputStream!!) {
+                morphoZipOutputStream!!.putArchiveEntry(morphoZipEntry)
+                morphoZipOutputStream!!.write(fixedMorphoXml.toByteArray())
+                morphoZipOutputStream!!.closeArchiveEntry()
+            }
+            LOGGER.fine("Successfully wrote morpho.xml for $docId")
+            docsWrittenToZip.incrementAndGet()
+        } catch (e: Exception) {
+            LOGGER.severe("ERROR generating/writing morpho.xml: ${e.message}")
+            e.printStackTrace()
+        }
+
+        // Generate dependency.xml if dependencies are present
+        if (hasDependencies) {
+            try {
+                val dependencyXmlOutput = korapXmlDependencyOutput(foundry, tempDocId)
+
+                // Fix the docid attribute - replace temp prefix with actual docId
+                val fixedDependencyXml = dependencyXmlOutput.toString().replace(
+                    "docid=\"$tempDocId\"",
+                    "docid=\"$docId\""
+                )
+
+                val dependencyEntryPath = docId.replace(Regex("[_.]"), "/") + "/$foundry/dependency.xml"
+
+                val dependencyZipEntry = ZipArchiveEntry(dependencyEntryPath)
+                dependencyZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+                LOGGER.fine("Writing ${fixedDependencyXml.length} bytes to ZIP entry: $dependencyEntryPath")
+                synchronized(morphoZipOutputStream!!) {
+                    morphoZipOutputStream!!.putArchiveEntry(dependencyZipEntry)
+                    morphoZipOutputStream!!.write(fixedDependencyXml.toByteArray())
+                    morphoZipOutputStream!!.closeArchiveEntry()
+                }
+                LOGGER.fine("Successfully wrote dependency.xml for $docId")
+            } catch (e: Exception) {
+                LOGGER.severe("ERROR generating/writing dependency.xml: ${e.message}")
+                e.printStackTrace()
+            }
+        }
+
+        // Clean up temporary data
+        morpho.remove(tempDocId)
+        sentences.remove(tempDocId)
+    }
+
 }
 
 fun main(args: Array<String>): Unit = exitProcess(CommandLine(KorapXmlTool()).execute(*args))
@@ -1476,3 +1799,4 @@
 object NowOutputFormat {
     const val NAME = "now"
 }
+