Make annotation zip out more robust against tagger crashes
Change-Id: I9bbfdfb84a32a7303191acc2126efc3235779d2b
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 75c3ed4..1f9900c 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -17,6 +17,7 @@
private const val MIN_BUFFER_BYTES = 256L * 1024 * 1024
private const val MAX_BUFFER_BYTES = 8L * 1024 * 1024 * 1024
private const val APPROX_BYTES_PER_BUFFER_UNIT = 2L * HIGH_WATERMARK
+private const val MAX_RETRIES_PER_TASK = 5 // Give up on a document after this many unsuccessful annotation attempts
internal fun parseKorapXmlToolXmxToBytes(spec: String?): Long? {
if (spec.isNullOrBlank()) return null
@@ -94,7 +95,8 @@
val text: String,
val docId: String?,
val entryPath: String?,
- val bufferedUnits: Int = 0
+ val bufferedUnits: Int = 0,
+ val retries: Int = 0 // how many times this specific task has been re-queued after a crash
)
init {
@@ -331,22 +333,34 @@
}
}
if (output.isNotEmpty()) { // Print any remaining output
- val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
- if (outputHandler != null) {
- if (task == null) {
- LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
- }
- LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
- pendingOutputHandlers.incrementAndGet()
- try {
- outputHandler.invoke(output.toString(), task)
- } finally {
- pendingOutputHandlers.decrementAndGet()
+ // Only discard remaining output when the process crashed (non-zero exit
+ // code). Annotators that don't speak the # eot/# eof protocol – like
+ // "cat > /dev/null; cat $file" or the spacy Docker image – exit cleanly
+ // with code 0 and rely on this path to deliver their output.
+ val exitCode = try { process.exitValue() } catch (_: IllegalThreadStateException) { -1 }
+ if (!inputGotEof && exitCode != 0) {
+ // Process died mid-stream with a non-zero exit code: discard the
+ // partial output buffer. Tasks in pendingTasks will be drained and
+ // re-queued by the post-waitFor() logic, so they are NOT lost.
+ LOGGER.warning("Worker $workerIndex: Discarding ${output.length} chars of partial output – process exited with code $exitCode without EOF marker; tasks will be re-queued")
+ } else {
+ val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+ if (outputHandler != null) {
+ if (task == null) {
+ LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
+ }
+ LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
+ pendingOutputHandlers.incrementAndGet()
+ try {
+ outputHandler.invoke(output.toString(), task)
+ } finally {
+ pendingOutputHandlers.decrementAndGet()
+ releaseTaskBuffer(task)
+ }
+ } else {
+ printOutput(output.toString())
releaseTaskBuffer(task)
}
- } else {
- printOutput(output.toString())
- releaseTaskBuffer(task)
}
}
} catch (e: Exception) {
@@ -402,13 +416,21 @@
val remainingTasks = mutableListOf<AnnotationTask>()
pendingTasks.drainTo(remainingTasks)
if (remainingTasks.isNotEmpty()) {
- LOGGER.warning("Worker $workerIndex: Returning ${remainingTasks.size} unprocessed task(s) to queue after process failure")
- remainingTasks.forEach { task ->
- if (task.text != "#eof") { // Don't re-queue EOF markers
+ val realTasks = remainingTasks.filter { it.text != "#eof" }
+ LOGGER.warning("Worker $workerIndex: Returning ${realTasks.size} unprocessed task(s) to queue after process failure: " +
+ realTasks.joinToString(", ") { it.docId ?: "(no docId)" })
+ realTasks.forEach { task ->
+ val updated = task.copy(retries = task.retries + 1)
+ if (updated.retries > MAX_RETRIES_PER_TASK) {
+ LOGGER.warning("Worker $workerIndex: Giving up on document after $MAX_RETRIES_PER_TASK failed attempts – skipping: docId=${task.docId}")
+ releaseTaskBuffer(task)
+ } else {
+ LOGGER.warning("Worker $workerIndex: Re-queuing (attempt ${updated.retries}/$MAX_RETRIES_PER_TASK): docId=${task.docId}")
try {
- queue.put(task)
+ queue.put(updated)
} catch (e: InterruptedException) {
LOGGER.severe("Failed to return task to queue: ${e.message}")
+ releaseTaskBuffer(task)
}
}
}
@@ -527,6 +549,25 @@
var lastReportedSize = queue.size
LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
while (queue.isNotEmpty()) {
+ // If all threads have exited but the queue is still non-empty, no one will ever
+ // drain it (e.g. a poison document caused all workers to exhaust their retries).
+ // Drain the leftovers, log the abandoned docIds, and break so the ZIP can be closed.
+ if (threadCount.get() == 0) {
+ val abandoned = mutableListOf<AnnotationTask>()
+ queue.drainTo(abandoned)
+ val realAbandoned = abandoned.filter { it.text != "#eof" && it.docId != null }
+ if (realAbandoned.isNotEmpty()) {
+ LOGGER.warning("All worker threads have exited but ${realAbandoned.size} document(s) " +
+ "could not be processed and will be absent from the output:")
+ realAbandoned.forEach { task ->
+ LOGGER.warning(" Abandoned document: docId=${task.docId}")
+ releaseTaskBuffer(task)
+ }
+ // Release buffer permits for any leftover EOF sentinels too
+ abandoned.filter { it.text == "#eof" }.forEach { releaseTaskBuffer(it) }
+ }
+ break
+ }
try {
sleep(50) // Reduced sleep time for more responsive monitoring
val currentSize = queue.size
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index b182c38..64e5b13 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1385,9 +1385,18 @@
}
LOGGER.info("Initialized morphoZipOutputStream for external annotation to: $outputMorphoZipFileName")
- // Ensure we restore System.err and remove file handler at the end of processing (shutdown hook)
+ // Ensure we restore System.err and remove file handler at the end of processing (shutdown hook).
+ // Also finalize the annotation ZIP so its central directory is written even on SIGTERM/Ctrl-C.
Runtime.getRuntime().addShutdownHook(Thread {
try {
+ val zipOut = morphoZipOutputStream
+ if (zipOut != null) {
+ LOGGER.warning("Shutdown hook: finalizing annotation output ZIP")
+ try { synchronized(zipOut) { zipOut.finish() } } catch (_: Exception) {}
+ try { zipOut.close() } catch (_: Exception) {}
+ }
+ } catch (_: Exception) {}
+ try {
LOGGER.info("Shutting down; closing per-zip log handler")
LOGGER.removeHandler(fileHandler)
fileHandler.close()
@@ -2207,9 +2216,18 @@
}
LOGGER.info("Initialized morphoZipOutputStream for $outputMorphoZipFileName")
- // Restore System.err and remove file handler on shutdown
+ // Restore System.err and remove file handler on shutdown.
+ // Also finalize the annotation ZIP so its central directory is written even on SIGTERM/Ctrl-C.
Runtime.getRuntime().addShutdownHook(Thread {
try {
+ val zipOut = morphoZipOutputStream
+ if (zipOut != null) {
+ LOGGER.warning("Shutdown hook: finalizing annotation output ZIP")
+ try { synchronized(zipOut) { zipOut.finish() } } catch (_: Exception) {}
+ try { zipOut.close() } catch (_: Exception) {}
+ }
+ } catch (_: Exception) {}
+ try {
LOGGER.info("Shutting down; closing ZIP log handler")
LOGGER.removeHandler(fileHandler)
fileHandler.close()
@@ -4221,7 +4239,9 @@
val actualFoundry = if (foundryOverride != null) {
foundryOverride!!
} else if (extractedFoundry != null) {
- LOGGER.info("Using foundry from CoNLL-U output: $extractedFoundry (was: $foundry)")
+ if (extractedFoundry != foundry) {
+ LOGGER.info("Using foundry from CoNLL-U output: $extractedFoundry (was: $foundry)")
+ }
// Update the global externalFoundry variable for consistent naming
externalFoundry = extractedFoundry
extractedFoundry
@@ -4256,8 +4276,11 @@
morphoZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
synchronized(morphoZipOutputStream!!) {
morphoZipOutputStream!!.putArchiveEntry(morphoZipEntry)
- KorapXmlFormatter.formatMorphoToStream(context, dBuilder!!, morphoZipOutputStream!!)
- morphoZipOutputStream!!.closeArchiveEntry()
+ try {
+ KorapXmlFormatter.formatMorphoToStream(context, dBuilder!!, morphoZipOutputStream!!)
+ } finally {
+ morphoZipOutputStream!!.closeArchiveEntry()
+ }
}
val written = docsWrittenToZip.incrementAndGet()
if (!quiet) progressBar?.step()
@@ -4294,8 +4317,11 @@
dependencyZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
synchronized(morphoZipOutputStream!!) {
morphoZipOutputStream!!.putArchiveEntry(dependencyZipEntry)
- KorapXmlFormatter.formatDependencyToStream(context, dBuilder!!, morphoZipOutputStream!!)
- morphoZipOutputStream!!.closeArchiveEntry()
+ try {
+ KorapXmlFormatter.formatDependencyToStream(context, dBuilder!!, morphoZipOutputStream!!)
+ } finally {
+ morphoZipOutputStream!!.closeArchiveEntry()
+ }
}
} catch (e: Exception) {
LOGGER.severe("ERROR generating/writing dependency.xml: ${e.message}")