Second attempt to fix thread handling

Change-Id: Ib223a94645a50c63f8f063ca0ec900c0937913f9
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 31e2f46..4a5f816 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -7,6 +7,7 @@
 import java.util.concurrent.BlockingQueue
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.logging.Logger
 
 private const val BUFFER_SIZE = 10000000
@@ -19,7 +20,8 @@
 ) {
     private val queue: BlockingQueue<String> = LinkedBlockingQueue()
     private val threads = mutableListOf<Thread>()
-    private var threadCount = 0
+    private val threadCount = AtomicInteger(0)
+    private val threadsLock = Any()
 
     init {
         openWorkerPool()
@@ -27,101 +29,161 @@
     }
 
     private fun openWorkerPool() {
-        repeat(numWorkers) {
+        repeat(numWorkers) { workerIndex ->
             Thread {
+                val self = currentThread()
+                var successfullyInitialized = false
                 try {
-                    threads.add(currentThread())
-                    threadCount++
+                    synchronized(threadsLock) {
+                        threads.add(self)
+                    }
+                    threadCount.incrementAndGet()
+                    successfullyInitialized = true
+                    LOGGER.info("Worker $workerIndex (thread ${self.id}) started.")
+
                     val process = ProcessBuilder("/bin/sh", "-c", command)
-                        //.directory(File("/tmp"))
                         .redirectOutput(ProcessBuilder.Redirect.PIPE).redirectInput(ProcessBuilder.Redirect.PIPE)
                         .redirectError(ProcessBuilder.Redirect.INHERIT)
-                        //.redirectErrorStream(true) // Merges stderr into stdout
                         .start()
+
                     if (process.outputStream == null) {
-                        LOGGER.severe("Worker $it failed to open pipe '$command'")
-                        return@Thread
+                        LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed to open pipe for command '$command'")
+                        return@Thread // Exits thread, finally block will run
                     }
-                    process.outputStream.buffered(BUFFER_SIZE)
-                    process.inputStream.buffered(BUFFER_SIZE)
+                    // Using try-with-resources for streams to ensure they are closed
+                    process.outputStream.buffered(BUFFER_SIZE).use { procOutStream ->
+                        process.inputStream.buffered(BUFFER_SIZE).use { procInStream ->
 
-                    val coroutineScope = CoroutineScope(Dispatchers.IO)
-                    var inputGotEof = false
-                    var readBytes = 0
-                    var writtenBytes = 0
+                            val coroutineScope = CoroutineScope(Dispatchers.IO + Job()) // Ensure Job can be cancelled
+                            var inputGotEof = false // Specific to this worker's process interaction
 
-                    coroutineScope.launch {
-                        val outputStreamWriter = OutputStreamWriter(process.outputStream)
-                        while (true) {
-                            val text = queue.poll(5, TimeUnit.SECONDS)
-                            if (text == "#eof" || text == null) {
-                                outputStreamWriter.write("\n# eof\n")
-                                outputStreamWriter.close()
-                                LOGGER.info("Worker $it received eof")
-                                break
+                            // Writer coroutine
+                            coroutineScope.launch {
+                                val outputStreamWriter = OutputStreamWriter(procOutStream)
+                                try {
+                                    while (true) { // Loop until EOF is received
+                                        val text = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
+                                        if (text == null) { // Timeout, continue waiting for more data
+                                            if (Thread.currentThread().isInterrupted) {
+                                                LOGGER.info("Worker $workerIndex (thread ${self.id}) writer interrupted, stopping")
+                                                break
+                                            }
+                                            continue
+                                        }
+                                        if (text == "#eof") {
+                                            try {
+                                                outputStreamWriter.write("\n# eof\n") // Send EOF to process
+                                                outputStreamWriter.flush()
+                                            } catch (e: IOException) {
+                                                // Log error, but proceed to close
+                                                LOGGER.warning("Worker $workerIndex (thread ${self.id}) failed to write EOF to process: ${e.message}")
+                                            } finally {
+                                                try { outputStreamWriter.close() } catch (_: IOException) {}
+                                            }
+                                            LOGGER.info("Worker $workerIndex (thread ${self.id}) sent EOF to process and writer is stopping.")
+                                            break // Exit while loop
+                                        }
+                                        try {
+                                            outputStreamWriter.write(text + "\n# eot\n")
+                                            outputStreamWriter.flush()
+                                        } catch (e: IOException) {
+                                            LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed to write to process: ${e.message}")
+                                            break // Exit the loop
+                                        }
+                                    }
+                                } catch (e: Exception) {
+                                    LOGGER.severe("Writer coroutine in worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+                                }
                             }
+
+                            // Reader coroutine
+                            coroutineScope.launch {
+                                val output = StringBuilder()
+                                try {
+                                    procInStream.bufferedReader().use { reader ->
+                                        while (!inputGotEof) {
+                                            if (Thread.currentThread().isInterrupted) {
+                                                LOGGER.info("Worker $workerIndex (thread ${self.id}) reader interrupted, stopping")
+                                                break
+                                            }
+                                            val line = reader.readLine()
+                                            if (line == null) {
+                                                if (process.isAlive) {
+                                                    sleep(5) // Very short sleep when waiting for more output
+                                                    continue
+                                                } else {
+                                                    break
+                                                }
+                                            }
+                                            when (line) {
+                                                "# eof" -> {
+                                                    LOGGER.info("Worker $workerIndex (thread ${self.id}) got EOF in output")
+                                                    inputGotEof = true
+                                                    if (output.isNotEmpty()) {
+                                                        printOutput(output.toString()) // Print any remaining output
+                                                        output.clear()
+                                                    }
+                                                    break
+                                                }
+                                                "# eot" -> {
+                                                    printOutput(output.toString()) // Assuming printOutput is thread-safe
+                                                    output.clear()
+                                                }
+                                                else -> {
+                                                    output.append(line).append('\n')
+                                                }
+                                            }
+                                        }
+                                    }
+                                    if (output.isNotEmpty()) { // Print any remaining output
+                                        printOutput(output.toString())
+                                    }
+                                } catch (e: Exception) {
+                                    LOGGER.severe("Reader coroutine in worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+                                }
+                            }
+
+                            // Wait for coroutines to complete
                             try {
-                                outputStreamWriter.write(text + "\n# eot\n")
-                                /*text.split("\n\n").forEach {
-                                  outputStreamWriter.write(it + "\n\n")
-                                }*/
-                                outputStreamWriter.flush()
-                                writtenBytes += text.length
-                            } catch (e: IOException) {
-                                LOGGER.severe("Worker $it failed to write to process in thread ${currentThread().id}: ${e.message}")
-                                threads.remove(currentThread())
-                                threadCount--
-                                cancel()
-                            }
-
-                        }
-
-                    }
-
-                    coroutineScope.launch {
-                        val output = StringBuilder()
-                        while (!inputGotEof && process.isAlive) {
-                            process.inputStream.bufferedReader().useLines { lines ->
-                                lines.forEach { line ->
-                                    when (line) {
-                                        "# eof" -> {
-                                            LOGGER.info("Worker $it got EOF in output")
-                                            inputGotEof = true
-                                            return@forEach }
-                                        "# eot" -> {
-                                            printOutput(output.toString())
-                                            output.clear() }
-                                        else -> { output.append(line, "\n")
-                                            readBytes += line.length +1 }
+                                runBlocking {
+                                    coroutineScope.coroutineContext[Job]?.children?.forEach { job ->
+                                        job.join()
                                     }
                                 }
-                                printOutput(output.toString())
-                                output.clear()
-                                if (!inputGotEof && process.isAlive) {
-                                    LOGGER.info("Worker $it waiting for more output")
-                                    sleep(10)
-                                }
+                                LOGGER.info("Worker $workerIndex (thread ${self.id}) coroutines completed")
+                            } catch (e: InterruptedException) {
+                                LOGGER.info("Worker $workerIndex (thread ${self.id}) interrupted while waiting for coroutines")
+                                Thread.currentThread().interrupt() // Restore interrupt status
+                            } finally {
+                                coroutineScope.cancel() // Ensure cleanup
                             }
                         }
-
                     }
-                    //while (!inputGotEof && process.isAlive) {
-                    //    LOGGER.info("Worker $it waiting for EOF output to finish")
-                    //    sleep(1000)
-                   // }
-                    //outputStreamWriter.close()
+
                     val exitCode = process.waitFor()
                     if (exitCode != 0) {
-                        LOGGER.warning("Worker $it exited with code $exitCode")
+                        LOGGER.warning("Worker $workerIndex (thread ${self.id}) process exited with code $exitCode")
                     } else {
-                        LOGGER.info("Worker $it finished")
+                        LOGGER.info("Worker $workerIndex (thread ${self.id}) process finished normally")
                     }
-                    threads.remove(currentThread())
                 } catch (e: IOException) {
+                    LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+                } catch (e: InterruptedException) {
+                    LOGGER.info("Worker $workerIndex (thread ${self.id}) was interrupted during processing")
+                    Thread.currentThread().interrupt() // Restore interrupt status
+                } catch (e: Exception) { // Catch any other unexpected exceptions during setup or process handling
+                    LOGGER.severe("Unhandled exception in worker thread ${self.id} (index $workerIndex): ${e.message}")
                     e.printStackTrace()
-                    LOGGER.warning("Worker $it failed: ${e.message}")
-                    threads.remove(currentThread())
-                    threadCount--
+                } finally {
+                    if (successfullyInitialized) {
+                        synchronized(threadsLock) {
+                            threads.remove(self)
+                        }
+                        threadCount.decrementAndGet()
+                        LOGGER.info("Worker thread ${self.id} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
+                    } else {
+                        LOGGER.warning("Worker thread ${self.id} (index $workerIndex) exiting without full initialization/cleanup.")
+                    }
                 }
             }.start()
         }
@@ -140,43 +202,100 @@
     }
 
     fun pushToQueue(text: String) {
-        queue.offer(text)
+        try {
+            queue.put(text)
+        } catch (e: InterruptedException) {
+            Thread.currentThread().interrupt()
+            LOGGER.warning("Interrupted while trying to push text to queue.")
+        }
     }
 
     fun pushToQueue(texts: List<String>) {
-        texts.forEach {
-            queue.offer(it)
-        }
-        // Add an "#eof" marker for each worker to know when to stop
-        repeat(queue.remainingCapacity()) {
-            queue.offer("#eof")
+        texts.forEach { text ->
+            try {
+                queue.put(text)
+            } catch (e: InterruptedException) {
+                Thread.currentThread().interrupt()
+                LOGGER.warning("Interrupted while trying to push texts to queue. Some texts may not have been added.")
+                return // Exit early if interrupted
+            }
         }
     }
 
     fun close() {
-        var n = threadCount
-        LOGGER.info("Closing worker pool with $n threads")
-        while (n > 0) {
-            if (queue.offer("#eof")) {
-                n--
-            } else {
-                LOGGER.info("Queue is full, waiting for workers to process")
-                sleep(100)
+        val currentThreadCount = threadCount.get()
+        LOGGER.info("Closing worker pool with $currentThreadCount threads")
+        
+        // Send EOF marker for each worker - use numWorkers instead of current thread count
+        // to ensure we send enough EOF markers even if some threads haven't started yet
+        for (i in 0 until numWorkers) {
+            try {
+                queue.put("#eof")
+                LOGGER.info("Sent EOF marker ${i+1}/$numWorkers to queue")
+            } catch (e: InterruptedException) {
+                Thread.currentThread().interrupt()
+                LOGGER.warning("Interrupted while sending EOF to workers. Some workers may not shut down cleanly.")
+                break
             }
         }
-        if (threadCount > 0)
+        
+        if (threadCount.get() > 0) {
             waitForWorkersToFinish()
+        }
     }
 
     private fun waitForWorkersToFinish() {
+        LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
         while (queue.isNotEmpty()) {
-            sleep(100) // Wait for queue to empty
+            try {
+                sleep(50) // Reduced sleep time for more responsive monitoring
+            } catch (e: InterruptedException) {
+                Thread.currentThread().interrupt()
+                LOGGER.warning("Interrupted while waiting for queue to empty. Proceeding to join threads.")
+                break
+            }
         }
-        LOGGER.info("Queue is empty, waiting for workers to finish")
-        // Create a copy of the threads list to avoid ConcurrentModificationException
-        val threadsCopy = threads.toList()
-        threadsCopy.forEach(Thread::join)
-        LOGGER.info("All workers finished")
+        LOGGER.info("Queue is empty. Joining worker threads.")
+
+        val threadsToJoin: List<Thread>
+        synchronized(threadsLock) {
+            threadsToJoin = threads.toList() // Create copy while holding lock
+        }
+
+        if (threadsToJoin.isEmpty() && threadCount.get() == 0) {
+            LOGGER.info("No threads were active or recorded to join.")
+        } else {
+            LOGGER.info("Attempting to join ${threadsToJoin.size} thread(s) from recorded list (current active count: ${threadCount.get()}).")
+            threadsToJoin.forEach { thread ->
+                try {
+                    thread.join(10000) // Increased timeout to 10 seconds
+                    if (thread.isAlive) {
+                        LOGGER.warning("Thread ${thread.id} did not terminate after 10s. Interrupting.")
+                        thread.interrupt()
+                        thread.join(2000) // Wait 2 seconds after interrupt
+                        if (thread.isAlive) {
+                            LOGGER.severe("Thread ${thread.id} failed to terminate after interrupt.")
+                        }
+                    }
+                } catch (e: InterruptedException) {
+                    Thread.currentThread().interrupt()
+                    LOGGER.warning("Interrupted while joining thread ${thread.id}. It may not have shut down cleanly.")
+                }
+            }
+        }
+        
+        val finalCount = threadCount.get()
+        if (finalCount == 0) {
+            LOGGER.info("All worker threads appear to have terminated.")
+        } else {
+            LOGGER.warning("$finalCount worker thread(s) still marked as active according to counter. This might indicate an issue in thread lifecycle management.")
+            synchronized(threadsLock) {
+                if (threads.isNotEmpty()) {
+                    LOGGER.warning("The internal threads list is not empty: ${threads.map { it.id }}. Forcing clear.")
+                    threads.clear() // Clean up if any refs are lingering despite count issues
+                }
+            }
+        }
     }
 }