Make annotation zip out more robust against tagger crashes

Change-Id: I9bbfdfb84a32a7303191acc2126efc3235779d2b
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 75c3ed4..1f9900c 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -17,6 +17,7 @@
 private const val MIN_BUFFER_BYTES = 256L * 1024 * 1024
 private const val MAX_BUFFER_BYTES = 8L * 1024 * 1024 * 1024
 private const val APPROX_BYTES_PER_BUFFER_UNIT = 2L * HIGH_WATERMARK
+private const val MAX_RETRIES_PER_TASK = 5 // Give up on a document after this many unsuccessful annotation attempts
 
 internal fun parseKorapXmlToolXmxToBytes(spec: String?): Long? {
     if (spec.isNullOrBlank()) return null
@@ -94,7 +95,8 @@
         val text: String,
         val docId: String?,
         val entryPath: String?,
-        val bufferedUnits: Int = 0
+        val bufferedUnits: Int = 0,
+        val retries: Int = 0  // how many times this specific task has been re-queued after a crash
     )
 
     init {
@@ -331,22 +333,34 @@
                                         }
                                     }
                                     if (output.isNotEmpty()) { // Print any remaining output
-                                        val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
-                                        if (outputHandler != null) {
-                                            if (task == null) {
-                                                LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
-                                            }
-                                            LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
-                                            pendingOutputHandlers.incrementAndGet()
-                                            try {
-                                                outputHandler.invoke(output.toString(), task)
-                                            } finally {
-                                                pendingOutputHandlers.decrementAndGet()
+                                        // Only discard remaining output when the process crashed (non-zero exit
+                                        // code). Annotators that don't speak the # eot/# eof protocol – like
+                                        // "cat > /dev/null; cat $file" or the spacy Docker image – exit cleanly
+                                        // with code 0 and rely on this path to deliver their output.
+                                        val exitCode = try { process.exitValue() } catch (_: IllegalThreadStateException) { -1 }
+                                        if (!inputGotEof && exitCode != 0) {
+                                            // Process died mid-stream with a non-zero exit code: discard the
+                                            // partial output buffer. Tasks in pendingTasks will be drained and
+                                            // re-queued by the post-waitFor() logic, so they are NOT lost.
+                                            LOGGER.warning("Worker $workerIndex: Discarding ${output.length} chars of partial output – process exited with code $exitCode without EOF marker; tasks will be re-queued")
+                                        } else {
+                                            val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+                                            if (outputHandler != null) {
+                                                if (task == null) {
+                                                    LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
+                                                }
+                                                LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
+                                                pendingOutputHandlers.incrementAndGet()
+                                                try {
+                                                    outputHandler.invoke(output.toString(), task)
+                                                } finally {
+                                                    pendingOutputHandlers.decrementAndGet()
+                                                    releaseTaskBuffer(task)
+                                                }
+                                            } else {
+                                                printOutput(output.toString())
                                                 releaseTaskBuffer(task)
                                             }
-                                        } else {
-                                            printOutput(output.toString())
-                                            releaseTaskBuffer(task)
                                         }
                                     }
                                 } catch (e: Exception) {
@@ -402,13 +416,21 @@
                         val remainingTasks = mutableListOf<AnnotationTask>()
                         pendingTasks.drainTo(remainingTasks)
                         if (remainingTasks.isNotEmpty()) {
-                            LOGGER.warning("Worker $workerIndex: Returning ${remainingTasks.size} unprocessed task(s) to queue after process failure")
-                            remainingTasks.forEach { task ->
-                                if (task.text != "#eof") { // Don't re-queue EOF markers
+                            val realTasks = remainingTasks.filter { it.text != "#eof" }
+                            LOGGER.warning("Worker $workerIndex: Returning ${realTasks.size} unprocessed task(s) to queue after process failure: " +
+                                realTasks.joinToString(", ") { it.docId ?: "(no docId)" })
+                            realTasks.forEach { task ->
+                                val updated = task.copy(retries = task.retries + 1)
+                                if (updated.retries > MAX_RETRIES_PER_TASK) {
+                                    LOGGER.warning("Worker $workerIndex: Giving up on document after $MAX_RETRIES_PER_TASK failed attempts – skipping: docId=${task.docId}")
+                                    releaseTaskBuffer(task)
+                                } else {
+                                    LOGGER.warning("Worker $workerIndex: Re-queuing (attempt ${updated.retries}/$MAX_RETRIES_PER_TASK): docId=${task.docId}")
                                     try {
-                                        queue.put(task)
+                                        queue.put(updated)
                                     } catch (e: InterruptedException) {
                                         LOGGER.severe("Failed to return task to queue: ${e.message}")
+                                        releaseTaskBuffer(task)
                                     }
                                 }
                             }
@@ -527,6 +549,25 @@
         var lastReportedSize = queue.size
         LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
         while (queue.isNotEmpty()) {
+            // If all threads have exited but the queue is still non-empty, no one will ever
+            // drain it (e.g. a poison document caused all workers to exhaust their retries).
+            // Drain the leftovers, log the abandoned docIds, and break so the ZIP can be closed.
+            if (threadCount.get() == 0) {
+                val abandoned = mutableListOf<AnnotationTask>()
+                queue.drainTo(abandoned)
+                val realAbandoned = abandoned.filter { it.text != "#eof" && it.docId != null }
+                if (realAbandoned.isNotEmpty()) {
+                    LOGGER.warning("All worker threads have exited but ${realAbandoned.size} document(s) " +
+                        "could not be processed and will be absent from the output:")
+                    realAbandoned.forEach { task ->
+                        LOGGER.warning("  Abandoned document: docId=${task.docId}")
+                        releaseTaskBuffer(task)
+                    }
+                    // Release buffer permits for any leftover EOF sentinels too
+                    abandoned.filter { it.text == "#eof" }.forEach { releaseTaskBuffer(it) }
+                }
+                break
+            }
             try {
                 sleep(50) // Reduced sleep time for more responsive monitoring
                 val currentSize = queue.size
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index b182c38..64e5b13 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1385,9 +1385,18 @@
                 }
                 LOGGER.info("Initialized morphoZipOutputStream for external annotation to: $outputMorphoZipFileName")
 
-                // Ensure we restore System.err and remove file handler at the end of processing (shutdown hook)
+                // Ensure we restore System.err and remove file handler at the end of processing (shutdown hook).
+                // Also finalize the annotation ZIP so its central directory is written even on SIGTERM/Ctrl-C.
                 Runtime.getRuntime().addShutdownHook(Thread {
                     try {
+                        val zipOut = morphoZipOutputStream
+                        if (zipOut != null) {
+                            LOGGER.warning("Shutdown hook: finalizing annotation output ZIP")
+                            try { synchronized(zipOut) { zipOut.finish() } } catch (_: Exception) {}
+                            try { zipOut.close() } catch (_: Exception) {}
+                        }
+                    } catch (_: Exception) {}
+                    try {
                         LOGGER.info("Shutting down; closing per-zip log handler")
                         LOGGER.removeHandler(fileHandler)
                         fileHandler.close()
@@ -2207,9 +2216,18 @@
             }
             LOGGER.info("Initialized morphoZipOutputStream for $outputMorphoZipFileName")
 
-            // Restore System.err and remove file handler on shutdown
+            // Restore System.err and remove file handler on shutdown.
+            // Also finalize the annotation ZIP so its central directory is written even on SIGTERM/Ctrl-C.
             Runtime.getRuntime().addShutdownHook(Thread {
                 try {
+                    val zipOut = morphoZipOutputStream
+                    if (zipOut != null) {
+                        LOGGER.warning("Shutdown hook: finalizing annotation output ZIP")
+                        try { synchronized(zipOut) { zipOut.finish() } } catch (_: Exception) {}
+                        try { zipOut.close() } catch (_: Exception) {}
+                    }
+                } catch (_: Exception) {}
+                try {
                     LOGGER.info("Shutting down; closing ZIP log handler")
                     LOGGER.removeHandler(fileHandler)
                     fileHandler.close()
@@ -4221,7 +4239,9 @@
         val actualFoundry = if (foundryOverride != null) {
             foundryOverride!!
         } else if (extractedFoundry != null) {
-            LOGGER.info("Using foundry from CoNLL-U output: $extractedFoundry (was: $foundry)")
+            if (extractedFoundry != foundry) {
+                LOGGER.info("Using foundry from CoNLL-U output: $extractedFoundry (was: $foundry)")
+            }
             // Update the global externalFoundry variable for consistent naming
             externalFoundry = extractedFoundry
             extractedFoundry
@@ -4256,8 +4276,11 @@
             morphoZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
             synchronized(morphoZipOutputStream!!) {
                 morphoZipOutputStream!!.putArchiveEntry(morphoZipEntry)
-                KorapXmlFormatter.formatMorphoToStream(context, dBuilder!!, morphoZipOutputStream!!)
-                morphoZipOutputStream!!.closeArchiveEntry()
+                try {
+                    KorapXmlFormatter.formatMorphoToStream(context, dBuilder!!, morphoZipOutputStream!!)
+                } finally {
+                    morphoZipOutputStream!!.closeArchiveEntry()
+                }
             }
             val written = docsWrittenToZip.incrementAndGet()
             if (!quiet) progressBar?.step()
@@ -4294,8 +4317,11 @@
                 dependencyZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
                 synchronized(morphoZipOutputStream!!) {
                     morphoZipOutputStream!!.putArchiveEntry(dependencyZipEntry)
-                    KorapXmlFormatter.formatDependencyToStream(context, dBuilder!!, morphoZipOutputStream!!)
-                    morphoZipOutputStream!!.closeArchiveEntry()
+                    try {
+                        KorapXmlFormatter.formatDependencyToStream(context, dBuilder!!, morphoZipOutputStream!!)
+                    } finally {
+                        morphoZipOutputStream!!.closeArchiveEntry()
+                    }
                 }
             } catch (e: Exception) {
                 LOGGER.severe("ERROR generating/writing dependency.xml: ${e.message}")