Allow -f zip for -A
Change-Id: Ic40174a524aed909cad233e80aacebc6ebbdf970
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 7a2e765..9fd108f 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -16,12 +16,16 @@
class AnnotationWorkerPool(
private val command: String,
private val numWorkers: Int,
- private val LOGGER: Logger
+ private val LOGGER: Logger,
+ private val outputHandler: ((String, AnnotationTask?) -> Unit)? = null
) {
- private val queue: BlockingQueue<String> = LinkedBlockingQueue()
+ private val queue: BlockingQueue<AnnotationTask> = LinkedBlockingQueue()
private val threads = mutableListOf<Thread>()
private val threadCount = AtomicInteger(0)
private val threadsLock = Any()
+ private val pendingOutputHandlers = AtomicInteger(0) // Track pending outputHandler calls
+
+ data class AnnotationTask(val text: String, val docId: String?, val entryPath: String?)
init {
openWorkerPool()
@@ -33,13 +37,24 @@
Thread {
val self = currentThread()
var successfullyInitialized = false
- try {
- synchronized(threadsLock) {
- threads.add(self)
+ var workerAttempts = 0
+ val maxRestarts = 50 // Allow up to 50 restarts per worker to handle crashes
+
+ while (workerAttempts < maxRestarts && !Thread.currentThread().isInterrupted) {
+ workerAttempts++
+ if (workerAttempts > 1) {
+ LOGGER.info("Worker $workerIndex: Restart attempt $workerAttempts")
}
- threadCount.incrementAndGet()
- successfullyInitialized = true
- LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) started.")
+
+ try {
+ if (workerAttempts == 1) {
+ synchronized(threadsLock) {
+ threads.add(self)
+ }
+ threadCount.incrementAndGet()
+ successfullyInitialized = true
+ LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) started.")
+ }
val process = ProcessBuilder("/bin/sh", "-c", command)
.redirectOutput(ProcessBuilder.Redirect.PIPE).redirectInput(ProcessBuilder.Redirect.PIPE)
@@ -50,6 +65,10 @@
LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed to open pipe for command '$command'")
return@Thread // Exits thread, finally block will run
}
+
+ // Declare pendingTasks here so it's accessible after process exits
+ val pendingTasks: BlockingQueue<AnnotationTask> = LinkedBlockingQueue()
+
// Using try-with-resources for streams to ensure they are closed
process.outputStream.buffered(BUFFER_SIZE).use { procOutStream ->
process.inputStream.buffered(BUFFER_SIZE).use { procInStream ->
@@ -62,15 +81,15 @@
val outputStreamWriter = OutputStreamWriter(procOutStream)
try {
while (true) { // Loop until EOF is received
- val text = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
- if (text == null) { // Timeout, continue waiting for more data
+ val task = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
+ if (task == null) { // Timeout, continue waiting for more data
if (Thread.currentThread().isInterrupted) {
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) writer interrupted, stopping")
break
}
continue
}
- if (text == "#eof") {
+ if (task.text == "#eof") {
try {
outputStreamWriter.write("\n# eof\n") // Send EOF to process
outputStreamWriter.flush()
@@ -83,9 +102,14 @@
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) sent EOF to process and writer is stopping.")
break // Exit while loop
}
+ pendingTasks.put(task)
try {
- outputStreamWriter.write(text + "\n# eot\n")
+ val dataToSend = task.text + "\n# eot\n\n"
+ LOGGER.fine("Worker $workerIndex: Sending ${dataToSend.length} chars to external process")
+ LOGGER.finer("Worker $workerIndex: First 500 chars of data to send:\n${dataToSend.take(500)}")
+ outputStreamWriter.write(dataToSend)
outputStreamWriter.flush()
+ LOGGER.fine("Worker $workerIndex: Data sent and flushed")
} catch (e: IOException) {
LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed to write to process: ${e.message}")
break // Exit the loop
@@ -99,8 +123,11 @@
// Reader coroutine
coroutineScope.launch {
val output = StringBuilder()
+ var lastLineWasEmpty = false
+ var linesRead = 0
try {
procInStream.bufferedReader().use { reader ->
+ LOGGER.fine("Worker $workerIndex: Reader started, waiting for input from external process")
while (!inputGotEof) {
if (Thread.currentThread().isInterrupted) {
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) reader interrupted, stopping")
@@ -112,31 +139,107 @@
sleep(5) // Very short sleep when waiting for more output
continue
} else {
+ LOGGER.fine("Worker $workerIndex: External process died, no more input")
break
}
}
- when (line) {
- "# eof" -> {
+ linesRead++
+ when {
+ line == "# eof" -> {
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) got EOF in output")
inputGotEof = true
if (output.isNotEmpty()) {
- printOutput(output.toString()) // Print any remaining output
+ val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+ if (outputHandler != null) {
+ if (task == null) {
+ LOGGER.warning("Worker $workerIndex: Got # eof but no task in pendingTasks queue!")
+ }
+ LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (EOF)")
+ pendingOutputHandlers.incrementAndGet()
+ try {
+ outputHandler.invoke(output.toString(), task)
+ } finally {
+ pendingOutputHandlers.decrementAndGet()
+ }
+ } else {
+ printOutput(output.toString())
+ }
output.clear()
}
break
}
- "# eot" -> {
- printOutput(output.toString()) // Assuming printOutput is thread-safe
+ line == "# eot" -> {
+ val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+ if (outputHandler != null) {
+ if (task == null) {
+ LOGGER.warning("Worker $workerIndex: Got # eot but no task in pendingTasks queue!")
+ }
+ LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (EOT)")
+ pendingOutputHandlers.incrementAndGet()
+ try {
+ outputHandler.invoke(output.toString(), task)
+ } finally {
+ pendingOutputHandlers.decrementAndGet()
+ }
+ } else {
+ LOGGER.fine("Worker $workerIndex: Printing output (${output.length} chars)")
+ printOutput(output.toString())
+ }
output.clear()
+ lastLineWasEmpty = false
+ }
+ line.isEmpty() -> {
+ // Empty line - potential document separator
+ // In CoNLL-U, double empty line marks end of document
+ if (lastLineWasEmpty && output.isNotEmpty()) {
+ // This is the second empty line - end of document
+ if (outputHandler != null) {
+ val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+ if (task == null) {
+ LOGGER.warning("Worker $workerIndex: Double empty line detected but no task in pendingTasks queue!")
+ }
+ LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (double empty line)")
+ pendingOutputHandlers.incrementAndGet()
+ try {
+ outputHandler.invoke(output.toString(), task)
+ } finally {
+ pendingOutputHandlers.decrementAndGet()
+ }
+ output.clear()
+ lastLineWasEmpty = false
+ } else {
+ // For stdout mode, just add the empty line
+ output.append('\n')
+ lastLineWasEmpty = true
+ }
+ } else {
+ output.append('\n')
+ lastLineWasEmpty = true
+ }
}
else -> {
output.append(line).append('\n')
+ lastLineWasEmpty = false
}
}
}
}
if (output.isNotEmpty()) { // Print any remaining output
- printOutput(output.toString())
+ val task = pendingTasks.poll(500, TimeUnit.MILLISECONDS)
+ if (outputHandler != null) {
+ if (task == null) {
+ LOGGER.fine("Worker $workerIndex: Remaining output but no task in pendingTasks queue!")
+ }
+ LOGGER.fine("Worker $workerIndex: Invoking outputHandler with ${output.length} chars (remaining)")
+ pendingOutputHandlers.incrementAndGet()
+ try {
+ outputHandler.invoke(output.toString(), task)
+ } finally {
+ pendingOutputHandlers.decrementAndGet()
+ }
+ } else {
+ printOutput(output.toString())
+ }
}
} catch (e: Exception) {
LOGGER.severe("Reader coroutine in worker $workerIndex (thread ${self.threadId()}) failed: ${e.message}")
@@ -163,27 +266,58 @@
val exitCode = process.waitFor()
if (exitCode != 0) {
LOGGER.warning("Worker $workerIndex (thread ${self.threadId()}) process exited with code $exitCode")
+
+ // Return any pending tasks back to the queue for other workers to process
+ val remainingTasks = mutableListOf<AnnotationTask>()
+ pendingTasks.drainTo(remainingTasks)
+ if (remainingTasks.isNotEmpty()) {
+ LOGGER.warning("Worker $workerIndex: Returning ${remainingTasks.size} unprocessed task(s) to queue after process failure")
+ remainingTasks.forEach { task ->
+ if (task.text != "#eof") { // Don't re-queue EOF markers
+ try {
+ queue.put(task)
+ } catch (e: InterruptedException) {
+ LOGGER.severe("Failed to return task to queue: ${e.message}")
+ }
+ }
+ }
+ }
+
+ // Check if there are more items in the queue to process
+ if (queue.isEmpty()) {
+ LOGGER.info("Worker $workerIndex: Queue is empty after crash, exiting")
+ break // Exit the restart loop
+ } else {
+ LOGGER.info("Worker $workerIndex: Restarting to process remaining ${queue.size} items in queue")
+ continue // Restart the worker
+ }
} else {
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) process finished normally")
+ break // Normal exit
}
} catch (e: IOException) {
LOGGER.severe("Worker $workerIndex (thread ${self.threadId()}) failed: ${e.message}")
+ break // Exit restart loop on IOException
} catch (e: InterruptedException) {
LOGGER.info("Worker $workerIndex (thread ${self.threadId()}) was interrupted during processing")
Thread.currentThread().interrupt() // Restore interrupt status
+ break // Exit restart loop on interrupt
} catch (e: Exception) { // Catch any other unexpected exceptions during setup or process handling
LOGGER.severe("Unhandled exception in worker thread ${self.threadId()} (index $workerIndex): ${e.message}")
e.printStackTrace()
- } finally {
- if (successfullyInitialized) {
- synchronized(threadsLock) {
- threads.remove(self)
- }
- threadCount.decrementAndGet()
- LOGGER.info("Worker thread ${self.threadId()} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
- } else {
- LOGGER.warning("Worker thread ${self.threadId()} (index $workerIndex) exiting without full initialization/cleanup.")
+ break // Exit restart loop on unexpected exceptions
+ }
+ } // End of while (workerAttempts < maxRestarts) loop
+
+ // Cleanup after all restart attempts
+ if (successfullyInitialized) {
+ synchronized(threadsLock) {
+ threads.remove(self)
}
+ threadCount.decrementAndGet()
+ LOGGER.info("Worker thread ${self.threadId()} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
+ } else {
+ LOGGER.warning("Worker thread ${self.threadId()} (index $workerIndex) exiting without full initialization/cleanup.")
}
}.start()
}
@@ -201,9 +335,10 @@
}
}
- fun pushToQueue(text: String) {
+ fun pushToQueue(text: String, docId: String? = null, entryPath: String? = null) {
try {
- queue.put(text)
+ LOGGER.fine("pushToQueue called: text length=${text.length}, docId=$docId, entryPath=$entryPath")
+ queue.put(AnnotationTask(text, docId, entryPath))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
LOGGER.warning("Interrupted while trying to push text to queue.")
@@ -213,7 +348,7 @@
fun pushToQueue(texts: List<String>) {
texts.forEach { text ->
try {
- queue.put(text)
+ queue.put(AnnotationTask(text, null, null))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
LOGGER.warning("Interrupted while trying to push texts to queue. Some texts may not have been added.")
@@ -224,13 +359,14 @@
fun close() {
val currentThreadCount = threadCount.get()
- LOGGER.info("Closing worker pool with $currentThreadCount threads")
-
+ val queueSizeBeforeEOF = queue.size
+ LOGGER.info("Closing worker pool with $currentThreadCount threads, queue size: $queueSizeBeforeEOF")
+
// Send EOF marker for each worker - use numWorkers instead of current thread count
// to ensure we send enough EOF markers even if some threads haven't started yet
for (i in 0 until numWorkers) {
try {
- queue.put("#eof")
+ queue.put(AnnotationTask("#eof", null, null))
LOGGER.info("Sent EOF marker ${i+1}/$numWorkers to queue")
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
@@ -239,23 +375,36 @@
}
}
+ LOGGER.info("All EOF markers sent, queue size now: ${queue.size}")
+
if (threadCount.get() > 0) {
waitForWorkersToFinish()
}
}
private fun waitForWorkersToFinish() {
+ val startTime = System.currentTimeMillis()
+ var lastReportedSize = queue.size
LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
while (queue.isNotEmpty()) {
try {
sleep(50) // Reduced sleep time for more responsive monitoring
+ val currentSize = queue.size
+ val elapsed = (System.currentTimeMillis() - startTime) / 1000
+
+ // Report every 5 seconds or when size changes significantly
+ if (elapsed % 5 == 0L && currentSize != lastReportedSize) {
+ LOGGER.info("Queue status: $currentSize items remaining (${elapsed}s elapsed)")
+ lastReportedSize = currentSize
+ }
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
LOGGER.warning("Interrupted while waiting for queue to empty. Proceeding to join threads.")
break
}
}
- LOGGER.info("Queue is empty. Joining worker threads.")
+ val totalTime = (System.currentTimeMillis() - startTime) / 1000
+ LOGGER.info("Queue is empty after ${totalTime}s. Joining worker threads.")
val threadsToJoin: List<Thread>
synchronized(threadsLock) {
@@ -268,11 +417,11 @@
LOGGER.info("Attempting to join ${threadsToJoin.size} thread(s) from recorded list (current active count: ${threadCount.get()}).")
threadsToJoin.forEach { thread ->
try {
- thread.join(10000) // Increased timeout to 10 seconds
+ thread.join(1800000) // 30 minutes timeout - allow workers time to restart and process all documents
if (thread.isAlive) {
- LOGGER.warning("Thread ${thread.threadId()} did not terminate after 10s. Interrupting.")
+ LOGGER.warning("Thread ${thread.threadId()} did not terminate after 30 minutes. Interrupting.")
thread.interrupt()
- thread.join(2000) // Wait 2 seconds after interrupt
+ thread.join(10000) // Wait 10 seconds after interrupt
if (thread.isAlive) {
LOGGER.severe("Thread ${thread.threadId()} failed to terminate after interrupt.")
}
@@ -296,6 +445,30 @@
}
}
}
+
+ // CRITICAL: Wait for all pending outputHandler invocations to complete
+ val pendingCount = pendingOutputHandlers.get()
+ if (pendingCount > 0) {
+ LOGGER.info("Waiting for $pendingCount pending outputHandler invocation(s) to complete...")
+ }
+ val startWait = System.currentTimeMillis()
+ while (pendingOutputHandlers.get() > 0) {
+ try {
+ sleep(100)
+ val elapsed = System.currentTimeMillis() - startWait
+ if (elapsed > 30000) { // 30 second timeout
+ LOGGER.severe("Timeout waiting for ${pendingOutputHandlers.get()} pending outputHandler(s) after 30s!")
+ break
+ }
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while waiting for pending outputHandlers")
+ break
+ }
+ }
+ if (pendingCount > 0) {
+ LOGGER.info("All outputHandler invocations completed")
+ }
}
}
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a9b1333..eb9687f 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -317,6 +317,8 @@
val metadata: ConcurrentHashMap<String, Array<String>> = ConcurrentHashMap()
val extraFeatures: ConcurrentHashMap<String, MutableMap<String, String>> = ConcurrentHashMap()
private val processedDocs = java.util.concurrent.atomic.AtomicInteger(0)
+ private val docsSentToAnnotation = java.util.concurrent.atomic.AtomicInteger(0)
+ private val docsWrittenToZip = java.util.concurrent.atomic.AtomicInteger(0)
var taggerToolBridges: ConcurrentHashMap<Long, TaggerToolBridge?> = ConcurrentHashMap()
var parserToolBridges: ConcurrentHashMap<Long, ParserToolBridge?> = ConcurrentHashMap()
@@ -345,15 +347,57 @@
}
fun korapxml2conllu(args: Array<String>) {
- if (outputFormat == OutputFormat.KORAPXML && annotateWith.isNotEmpty()) {
- LOGGER.severe("Shell command annotation is not yet supported with output format $outputFormat")
- exitProcess(1)
- }
// Initialize shared entry executor (used inside each zip)
entryExecutor = Executors.newFixedThreadPool(maxThreads)
if (annotateWith.isNotEmpty()) {
- annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER)
+ // Initialize ZIP output stream BEFORE creating worker pool, if needed
+ if (outputFormat == OutputFormat.KORAPXML) {
+ // Determine output filename
+ val inputZipPath = args[0] // First ZIP file
+ var targetFoundry = "base"
+ when {
+ annotateWith.contains("spacy") -> targetFoundry = "spacy"
+ annotateWith.contains("stanza") -> targetFoundry = "stanza"
+ annotateWith.contains("udpipe") -> targetFoundry = "udpipe"
+ annotateWith.contains("tree") -> targetFoundry = "tree_tagger"
+ annotateWith.contains("marmot") -> targetFoundry = "marmot"
+ annotateWith.contains("opennlp") -> targetFoundry = "opennlp"
+ annotateWith.contains("corenlp") -> targetFoundry = "corenlp"
+ else -> targetFoundry = "annotated"
+ }
+
+ val outputMorphoZipFileName = inputZipPath.replace(Regex("\\.zip$"), ".".plus(targetFoundry).plus(".zip"))
+ LOGGER.info("Initializing output ZIP: $outputMorphoZipFileName (from input: $inputZipPath, foundry: $targetFoundry)")
+
+ if (File(outputMorphoZipFileName).exists() && !overwrite) {
+ LOGGER.severe("Output file $outputMorphoZipFileName already exists. Use --overwrite to overwrite.")
+ exitProcess(1)
+ }
+
+ // Delete old file if it exists
+ if (File(outputMorphoZipFileName).exists()) {
+ LOGGER.info("Deleting existing file: $outputMorphoZipFileName")
+ File(outputMorphoZipFileName).delete()
+ }
+
+ dbFactory = DocumentBuilderFactory.newInstance()
+ dBuilder = dbFactory!!.newDocumentBuilder()
+ val fileOutputStream = FileOutputStream(outputMorphoZipFileName)
+ morphoZipOutputStream = ZipArchiveOutputStream(fileOutputStream).apply {
+ setUseZip64(Zip64Mode.Always)
+ }
+ LOGGER.info("Initialized morphoZipOutputStream for external annotation to: $outputMorphoZipFileName")
+ }
+
+ if (outputFormat == OutputFormat.KORAPXML) {
+ // For ZIP output with external annotation, we need a custom handler
+ annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER) { annotatedConllu, task ->
+ parseAndWriteAnnotatedConllu(annotatedConllu, task)
+ }
+ } else {
+ annotationWorkerPool = AnnotationWorkerPool(annotateWith, maxThreads, LOGGER, null)
+ }
}
var zips: Array<String> = args
@@ -425,7 +469,28 @@
if (annotationWorkerPool != null) {
LOGGER.info("closing worker pool")
+ LOGGER.info("Documents sent to annotation: ${docsSentToAnnotation.get()}")
annotationWorkerPool?.close()
+ LOGGER.info("Documents written to ZIP: ${docsWrittenToZip.get()}")
+
+ // Close the ZIP file after worker pool is done (if using external annotation with ZIP output)
+ if (outputFormat == OutputFormat.KORAPXML && morphoZipOutputStream != null) {
+ try {
+ morphoZipOutputStream!!.flush()
+ morphoZipOutputStream!!.close()
+ LOGGER.info("Closed output ZIP file after annotation processing")
+ } catch (e: Exception) {
+ LOGGER.severe("ERROR closing ZIP file: ${e.message}")
+ e.printStackTrace()
+ }
+ }
+
+ // Check if all documents were written
+ val sent = docsSentToAnnotation.get()
+ val written = docsWrittenToZip.get()
+ if (sent != written) {
+ LOGGER.warning("Document count mismatch! Sent to annotation: $sent, Written to ZIP: $written (missing: ${sent - written})")
+ }
}
// Shutdown entry executor
entryExecutor?.shutdown()
@@ -520,6 +585,19 @@
}
} else if (parserName != null) {
targetFoundry = parserName!!
+ } else if (annotateWith.isNotEmpty()) {
+ // Try to detect foundry from external annotation command
+ when {
+ annotateWith.contains("spacy") -> targetFoundry = "spacy"
+ annotateWith.contains("stanza") -> targetFoundry = "stanza"
+ annotateWith.contains("udpipe") -> targetFoundry = "udpipe"
+ annotateWith.contains("tree") -> targetFoundry = "tree_tagger"
+ annotateWith.contains("marmot") -> targetFoundry = "marmot"
+ annotateWith.contains("opennlp") -> targetFoundry = "opennlp"
+ annotateWith.contains("corenlp") -> targetFoundry = "corenlp"
+ else -> targetFoundry = "annotated"
+ }
+ LOGGER.info("Detected foundry '$targetFoundry' from annotation command: $annotateWith")
}
dbFactory = DocumentBuilderFactory.newInstance()
dBuilder = dbFactory!!.newDocumentBuilder()
@@ -528,6 +606,7 @@
zipFilePath.replace(Regex("(\\.(opennlp|marmot|tree_tagger|corenlp|spacy))?\\.zip$"), ".".plus(parserName).plus(".zip"))
else
zipFilePath.replace(Regex("\\.zip$"), ".".plus(targetFoundry).plus(".zip"))
+ LOGGER.info("Output ZIP file: $outputMorphoZipFileName")
if (File(outputMorphoZipFileName).exists() && !overwrite) {
LOGGER.severe("Output file $outputMorphoZipFileName already exists. Use --overwrite to overwrite.")
exitProcess(1)
@@ -536,6 +615,9 @@
morphoZipOutputStream = ZipArchiveOutputStream(fileOutputStream).apply {
setUseZip64(Zip64Mode.Always)
}
+ LOGGER.info("Initialized morphoZipOutputStream for $outputMorphoZipFileName")
+ } else {
+ LOGGER.info("Skipping ZIP initialization: dbFactory=${dbFactory != null}, outputFormat=$outputFormat")
}
if (zipFilePath.hasCorrespondingBaseZip()) {
val relatedZips = arrayOf(zipFilePath, zipFilePath.correspondingBaseZip()!!)
@@ -550,8 +632,12 @@
processZipEntriesWithPool(zipFile, foundry, false)
}
}
- if (outputFormat == OutputFormat.KORAPXML) {
+ // Don't close the ZIP here if using external annotation - it will be closed after worker pool finishes
+ if (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null) {
+ LOGGER.fine("Closing output ZIP file in processZipFile (no annotation worker pool)")
morphoZipOutputStream!!.close()
+ } else if (outputFormat == OutputFormat.KORAPXML) {
+ LOGGER.fine("NOT closing ZIP in processZipFile - will close after worker pool finishes")
}
logZipProgress(zipFilePath)
}
@@ -747,19 +833,23 @@
}
}
- val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || outputFormat == OutputFormat.KORAPXML
+ val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null)
// For lemma-only/lemma-based word2vec/now, we can proceed without full text
val textRequired = when (outputFormat) {
OutputFormat.WORD2VEC, OutputFormat.NOW -> !(useLemma || lemmaOnly)
else -> true
}
+
+ LOGGER.fine("Checking if ready to process $docId: texts=${texts[docId] != null}, sentences=${sentences[docId] != null}, tokens=${tokens[docId] != null}, morpho=${morpho[docId] != null}, morphoRequired=$morphoRequired, textRequired=$textRequired, annotationWorkerPool=${annotationWorkerPool != null}")
+
if ((texts[docId] != null || !textRequired) && sentences[docId] != null && tokens[docId] != null
&& (!morphoRequired || morpho[docId] != null)
&& (extractMetadataRegex.isEmpty() || metadata[docId] != null)
) {
- // Be quiet on INFO; per-text logs only on FINE and below
- LOGGER.fine("Processing text: $docId in thread ${Thread.currentThread().threadId()}")
+ LOGGER.fine("All data ready for $docId, calling processText")
processText(docId, foundry)
+ } else {
+ LOGGER.fine("NOT ready to process $docId yet: textOK=${texts[docId] != null || !textRequired}, sentencesOK=${sentences[docId] != null}, tokensOK=${tokens[docId] != null}, morphoOK=${!morphoRequired || morpho[docId] != null}")
}
} else if (extractMetadataRegex.isNotEmpty() && zipEntry.name.matches(Regex(".*/header\\.xml$"))) {
//LOGGER.info("Processing header file: " + zipEntry.name)
@@ -777,7 +867,7 @@
}
if (meta.isNotEmpty() && docId != null) {
metadata[docId] = meta.toTypedArray()
- val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || outputFormat == OutputFormat.KORAPXML
+ val morphoRequired = waitForMorpho || useLemma || taggerName != null || parserName != null || (outputFormat == OutputFormat.KORAPXML && annotationWorkerPool == null)
val textRequired = when (outputFormat) {
OutputFormat.WORD2VEC, OutputFormat.NOW -> !(useLemma || lemmaOnly)
else -> true
@@ -786,7 +876,7 @@
&& (!morphoRequired || morpho[docId] != null)
) {
// Be quiet on INFO; per-text logs only on FINE and below
- LOGGER.fine("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
+ LOGGER.info("Processing text (meta-ready): $docId in thread ${Thread.currentThread().threadId()}")
processText(docId, foundry)
}
}
@@ -838,7 +928,20 @@
}
if (annotationWorkerPool != null) {
- annotationWorkerPool?.pushToQueue(output.toString())
+ if (outputFormat == OutputFormat.KORAPXML) {
+ // Store metadata in task, send clean CoNLL-U to external process
+ val entryPath = if (parserName != null) docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
+ else
+ docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
+ LOGGER.fine("Sending document $docId (${output.length} chars) to annotation worker pool for ZIP output")
+ // Pass metadata via AnnotationTask, NOT in the text itself
+ annotationWorkerPool?.pushToQueue(output.toString(), docId, entryPath + "|" + foundry)
+ docsSentToAnnotation.incrementAndGet()
+ } else {
+ LOGGER.fine("Sending document $docId (${output.length} chars) to annotation worker pool")
+ annotationWorkerPool?.pushToQueue(output.toString())
+ docsSentToAnnotation.incrementAndGet()
+ }
// Release internal char[] early
output.setLength(0)
} else if (outputFormat != OutputFormat.KORAPXML) {
@@ -848,7 +951,18 @@
// Release internal char[] early
output.setLength(0)
} else {
- korapXmlOutput(foundry, docId)
+ // Direct ZIP output without external annotation
+ val entryPath = if (parserName != null) docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
+ else
+ docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
+ val zipEntry = ZipArchiveEntry(entryPath)
+ zipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+ synchronized(morphoZipOutputStream!!) {
+ morphoZipOutputStream!!.putArchiveEntry(zipEntry)
+ morphoZipOutputStream!!.write(output.toString().toByteArray())
+ morphoZipOutputStream!!.closeArchiveEntry()
+ }
+ output.clear()
}
@@ -872,20 +986,6 @@
logMemoryStats(count)
}
}
-
- if (outputFormat == OutputFormat.KORAPXML) {
- val entryPath = if (parserName != null) docId.replace(Regex("[_.]"), "/").plus("/$parserName/").plus("dependency.xml")
- else
- docId.replace(Regex("[_.]"), "/").plus("/$morphoFoundry/").plus("morpho.xml")
- val zipEntry = ZipArchiveEntry(entryPath)
- zipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
- synchronized(morphoZipOutputStream!!) {
- morphoZipOutputStream!!.putArchiveEntry(zipEntry)
- morphoZipOutputStream!!.write(output.toString().toByteArray())
- morphoZipOutputStream!!.closeArchiveEntry()
- }
- output.clear()
- }
}
private fun getMorphoFoundry() = taggerToolBridges[Thread.currentThread().threadId()]?.foundry ?: "base"
@@ -1113,33 +1213,64 @@
previousSpanStart = span.from + 1
}
// Bestimme den Token-Text sicher
- val tokenText: String = if (textVal != null) {
+ var tokenText: String = if (textVal != null) {
val safeFrom = span.from.coerceIn(0, textVal.length)
val safeTo = span.to.coerceIn(safeFrom, textVal.length)
textVal.substring(safeFrom, safeTo)
} else "_"
+ // Validate and fix empty/whitespace-only tokens that cause SpaCy to crash
+ if (tokenText.isBlank()) {
+ LOGGER.fine("Replacing empty/blank token at offset ${span.from}-${span.to} in document $docId with underscore")
+ tokenText = "_" // Replace with underscore instead of skipping
+ }
+
if (morpho[docId]?.containsKey("${span.from}-${span.to}") == true) {
val mfs = morpho[docId]!!["${span.from}-${span.to}"]
- output.append(
- printConlluToken(
- token_index,
- tokenText,
- mfs!!.lemma!!,
- mfs.upos!!,
- mfs.xpos!!,
- mfs.feats!!,
- mfs.head!!,
- mfs.deprel!!,
- mfs.deps!!,
- mfs.misc!!,
- columns
+ if (mfs != null) {
+ // Add offset info to MISC field for external annotation with ZIP output
+ val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+ val existing = mfs.misc ?: "_"
+ if (existing == "_") "Offset=${span.from}-${span.to}"
+ else "${existing}|Offset=${span.from}-${span.to}"
+ } else mfs.misc ?: "_"
+
+ output.append(
+ printConlluToken(
+ token_index,
+ tokenText,
+ mfs.lemma ?: "_",
+ mfs.upos ?: "_",
+ mfs.xpos ?: "_",
+ mfs.feats ?: "_",
+ mfs.head ?: "_",
+ mfs.deprel ?: "_",
+ mfs.deps ?: "_",
+ miscWithOffset,
+ columns
+ )
)
- )
+ } else {
+ // Fallback if mfs is null
+ val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+ "Offset=${span.from}-${span.to}"
+ } else "_"
+
+ output.append(
+ printConlluToken(
+ token_index, tokenText, misc = miscWithOffset, columns = columns
+ )
+ )
+ }
} else {
+ // Add offset info for tokens without morpho data when needed
+ val miscWithOffset = if (annotationWorkerPool != null && outputFormat == OutputFormat.KORAPXML) {
+ "Offset=${span.from}-${span.to}"
+ } else "_"
+
output.append(
printConlluToken(
- token_index, tokenText, columns = columns
+ token_index, tokenText, misc = miscWithOffset, columns = columns
)
)
}
@@ -1210,10 +1341,10 @@
var real_token_index = 0
var sentence_index = 0
val output: StringBuilder = StringBuilder()
-
+
// Add the text sigle prefix
output.append("@@$docId ")
-
+
if (texts[docId] == null) {
// Lemma-only fallback when original text is not loaded
tokens[docId]?.forEach { span ->
@@ -1449,6 +1580,198 @@
var misc: String? = "_"
)
+ private fun parseAndWriteAnnotatedConllu(annotatedConllu: String, task: AnnotationWorkerPool.AnnotationTask?) {
+ LOGGER.fine("parseAndWriteAnnotatedConllu called with ${annotatedConllu.length} chars, task=$task")
+
+ // Extract metadata from task
+ val docId = task?.docId
+ val entryPathAndFoundry = task?.entryPath?.split("|") ?: listOf(null, null)
+ val entryPath = entryPathAndFoundry.getOrNull(0)
+ val foundry = entryPathAndFoundry.getOrNull(1) ?: "base"
+
+ if (docId == null || entryPath == null) {
+ LOGGER.fine("Missing metadata from task! docId=$docId, entryPath=$entryPath, task=$task")
+ return
+ }
+
+ LOGGER.fine("Processing annotated document: docId=$docId, entryPath=$entryPath, foundry=$foundry")
+
+ val morphoSpans = mutableMapOf<String, MorphoSpan>()
+
+ // Parse the annotated CoNLL-U to extract morpho data
+ val lines = annotatedConllu.lines()
+ var currentStartOffsets: List<Int>? = null
+ var currentEndOffsets: List<Int>? = null
+ var tokenIndexInSentence = 0
+
+ for (line in lines) {
+ when {
+ line.startsWith("# start_offsets =") -> {
+ // Parse start offsets from comment
+ // Format: # start_offsets = <first_token_from> <token1_from> <token2_from> ...
+ // The first value is duplicated, so skip it and use values from index 1 onwards
+ val offsetsStr = line.substring("# start_offsets =".length).trim()
+ val allOffsets = offsetsStr.split(Regex("\\s+")).mapNotNull { it.toIntOrNull() }
+ currentStartOffsets = if (allOffsets.size > 1) allOffsets.drop(1) else allOffsets
+ tokenIndexInSentence = 0
+ LOGGER.fine("Found start offsets: $currentStartOffsets")
+ }
+ line.startsWith("# end_offsets =") -> {
+ // Parse end offsets from comment
+ // Format: # end_offsets = <sentence_end> <token1_to> <token2_to> ...
+ // First value is sentence end, actual token ends start from index 1
+ val offsetsStr = line.substring("# end_offsets =".length).trim()
+ val allOffsets = offsetsStr.split(Regex("\\s+")).mapNotNull { it.toIntOrNull() }
+ currentEndOffsets = if (allOffsets.size > 1) allOffsets.drop(1) else emptyList()
+ LOGGER.fine("Found end offsets: $currentEndOffsets")
+ }
+ line.isEmpty() -> {
+ // Reset for next sentence
+ currentStartOffsets = null
+ currentEndOffsets = null
+ tokenIndexInSentence = 0
+ }
+ !line.startsWith("#") -> {
+ // This is a token line
+ val fields = line.split("\t")
+ if (fields.size < 10) continue
+
+ val lemma = if (fields.size > 2) fields[2] else "_"
+ val upos = if (fields.size > 3) fields[3] else "_"
+ val xpos = if (fields.size > 4) fields[4] else "_"
+ val feats = if (fields.size > 5) fields[5] else "_"
+ val head = if (fields.size > 6) fields[6] else "_"
+ val deprel = if (fields.size > 7) fields[7] else "_"
+ val deps = if (fields.size > 8) fields[8] else "_"
+ val misc = if (fields.size > 9) fields[9] else "_"
+
+ // Get offset from the comment-based offset arrays
+ if (currentStartOffsets != null && currentEndOffsets != null &&
+ tokenIndexInSentence < currentStartOffsets.size &&
+ tokenIndexInSentence < currentEndOffsets.size) {
+
+ val spanFrom = currentStartOffsets[tokenIndexInSentence]
+ val spanTo = currentEndOffsets[tokenIndexInSentence]
+ val spanKey = "$spanFrom-$spanTo"
+
+ morphoSpans[spanKey] = MorphoSpan(lemma, upos, xpos, feats, head, deprel, deps, misc)
+ LOGGER.fine("Added morpho span: $spanKey -> $lemma/$upos")
+ tokenIndexInSentence++
+ } else {
+ LOGGER.fine("No offset information for token at index $tokenIndexInSentence in sentence (starts=${currentStartOffsets?.size}, ends=${currentEndOffsets?.size})")
+ }
+ }
+ }
+ }
+
+ LOGGER.fine("Extracted ${morphoSpans.size} morpho spans for $docId")
+
+ if (morphoSpans.isEmpty()) {
+ LOGGER.warning("No morpho spans found in annotated output for $docId, skipping")
+ LOGGER.warning("Sample lines: ${lines.take(10).joinToString("\\n")}")
+ return
+ }
+
+ // Check if the data contains dependency information (non-empty head/deprel fields)
+ val hasDependencies = morphoSpans.values.any { span ->
+ span.head != null && span.head != "_" && span.deprel != null && span.deprel != "_"
+ }
+ LOGGER.fine("Document has dependencies: $hasDependencies")
+
+ if (morphoZipOutputStream == null) {
+ LOGGER.severe("morphoZipOutputStream is null! Cannot write to ZIP. This should have been initialized in processZipFile.")
+ return
+ }
+
+ if (dBuilder == null) {
+ LOGGER.warning("dBuilder is null, initializing now...")
+ dbFactory = DocumentBuilderFactory.newInstance()
+ dBuilder = dbFactory!!.newDocumentBuilder()
+ }
+
+ // Create a temporary document context for korapXmlOutput
+ // Store the morpho data temporarily and copy necessary supporting data
+ val tempDocId = "_temp_annotated_$docId"
+ morpho[tempDocId] = morphoSpans
+
+ // For dependency output, we need sentences data
+ // Create dummy sentence spans covering the entire document based on tokens
+ if (hasDependencies && morphoSpans.isNotEmpty()) {
+ // Get min and max offsets from all tokens
+ val allOffsets = morphoSpans.keys.map { key ->
+ val parts = key.split("-")
+ Pair(parts[0].toInt(), parts[1].toInt())
+ }
+ val minOffset = allOffsets.minOfOrNull { it.first } ?: 0
+ val maxOffset = allOffsets.maxOfOrNull { it.second } ?: 0
+
+ // Create a single sentence span covering all tokens
+ // This is a simplification - ideally we'd track sentence boundaries from CoNLL-U
+ sentences[tempDocId] = arrayOf(Span(minOffset, maxOffset))
+ }
+
+ LOGGER.fine("Generating KorapXML for $docId with ${morphoSpans.size} spans")
+
+ // Generate morpho.xml (always)
+ try {
+ val morphoXmlOutput = korapXmlMorphoOutput(foundry, tempDocId)
+
+ // Fix the docid attribute - replace temp prefix with actual docId
+ val fixedMorphoXml = morphoXmlOutput.toString().replace(
+ "docid=\"$tempDocId\"",
+ "docid=\"$docId\""
+ )
+
+ val morphoEntryPath = docId.replace(Regex("[_.]"), "/") + "/$foundry/morpho.xml"
+
+ val morphoZipEntry = ZipArchiveEntry(morphoEntryPath)
+ morphoZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+ LOGGER.fine("Writing ${fixedMorphoXml.length} bytes to ZIP entry: $morphoEntryPath")
+ synchronized(morphoZipOutputStream!!) {
+ morphoZipOutputStream!!.putArchiveEntry(morphoZipEntry)
+ morphoZipOutputStream!!.write(fixedMorphoXml.toByteArray())
+ morphoZipOutputStream!!.closeArchiveEntry()
+ }
+ LOGGER.fine("Successfully wrote morpho.xml for $docId")
+ docsWrittenToZip.incrementAndGet()
+ } catch (e: Exception) {
+ LOGGER.severe("ERROR generating/writing morpho.xml: ${e.message}")
+ e.printStackTrace()
+ }
+
+ // Generate dependency.xml if dependencies are present
+ if (hasDependencies) {
+ try {
+ val dependencyXmlOutput = korapXmlDependencyOutput(foundry, tempDocId)
+
+ // Fix the docid attribute - replace temp prefix with actual docId
+ val fixedDependencyXml = dependencyXmlOutput.toString().replace(
+ "docid=\"$tempDocId\"",
+ "docid=\"$docId\""
+ )
+
+ val dependencyEntryPath = docId.replace(Regex("[_.]"), "/") + "/$foundry/dependency.xml"
+
+ val dependencyZipEntry = ZipArchiveEntry(dependencyEntryPath)
+ dependencyZipEntry.unixMode = ZIP_ENTRY_UNIX_MODE
+ LOGGER.fine("Writing ${fixedDependencyXml.length} bytes to ZIP entry: $dependencyEntryPath")
+ synchronized(morphoZipOutputStream!!) {
+ morphoZipOutputStream!!.putArchiveEntry(dependencyZipEntry)
+ morphoZipOutputStream!!.write(fixedDependencyXml.toByteArray())
+ morphoZipOutputStream!!.closeArchiveEntry()
+ }
+ LOGGER.fine("Successfully wrote dependency.xml for $docId")
+ } catch (e: Exception) {
+ LOGGER.severe("ERROR generating/writing dependency.xml: ${e.message}")
+ e.printStackTrace()
+ }
+ }
+
+ // Clean up temporary data
+ morpho.remove(tempDocId)
+ sentences.remove(tempDocId)
+ }
+
}
fun main(args: Array<String>): Unit = exitProcess(CommandLine(KorapXmlTool()).execute(*args))
@@ -1476,3 +1799,4 @@
object NowOutputFormat {
const val NAME = "now"
}
+