Second attempt to fix thread handling
Change-Id: Ib223a94645a50c63f8f063ca0ec900c0937913f9
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
index 31e2f46..4a5f816 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/AnnotationWorkerPool.kt
@@ -7,6 +7,7 @@
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.Logger
private const val BUFFER_SIZE = 10000000
@@ -19,7 +20,8 @@
) {
private val queue: BlockingQueue<String> = LinkedBlockingQueue()
private val threads = mutableListOf<Thread>()
- private var threadCount = 0
+ private val threadCount = AtomicInteger(0)
+ private val threadsLock = Any()
init {
openWorkerPool()
@@ -27,101 +29,161 @@
}
private fun openWorkerPool() {
- repeat(numWorkers) {
+ repeat(numWorkers) { workerIndex ->
Thread {
+ val self = currentThread()
+ var successfullyInitialized = false
try {
- threads.add(currentThread())
- threadCount++
+ synchronized(threadsLock) {
+ threads.add(self)
+ }
+ threadCount.incrementAndGet()
+ successfullyInitialized = true
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) started.")
+
val process = ProcessBuilder("/bin/sh", "-c", command)
- //.directory(File("/tmp"))
.redirectOutput(ProcessBuilder.Redirect.PIPE).redirectInput(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.INHERIT)
- //.redirectErrorStream(true) // Merges stderr into stdout
.start()
+
if (process.outputStream == null) {
- LOGGER.severe("Worker $it failed to open pipe '$command'")
- return@Thread
+ LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed to open pipe for command '$command'")
+ return@Thread // Exits thread, finally block will run
}
- process.outputStream.buffered(BUFFER_SIZE)
- process.inputStream.buffered(BUFFER_SIZE)
+ // Using try-with-resources for streams to ensure they are closed
+ process.outputStream.buffered(BUFFER_SIZE).use { procOutStream ->
+ process.inputStream.buffered(BUFFER_SIZE).use { procInStream ->
- val coroutineScope = CoroutineScope(Dispatchers.IO)
- var inputGotEof = false
- var readBytes = 0
- var writtenBytes = 0
+ val coroutineScope = CoroutineScope(Dispatchers.IO + Job()) // Ensure Job can be cancelled
+ var inputGotEof = false // Specific to this worker's process interaction
- coroutineScope.launch {
- val outputStreamWriter = OutputStreamWriter(process.outputStream)
- while (true) {
- val text = queue.poll(5, TimeUnit.SECONDS)
- if (text == "#eof" || text == null) {
- outputStreamWriter.write("\n# eof\n")
- outputStreamWriter.close()
- LOGGER.info("Worker $it received eof")
- break
+ // Writer coroutine
+ coroutineScope.launch {
+ val outputStreamWriter = OutputStreamWriter(procOutStream)
+ try {
+ while (true) { // Loop until EOF is received
+ val text = queue.poll(50, TimeUnit.MILLISECONDS) // Reduced timeout for more responsiveness
+ if (text == null) { // Timeout, continue waiting for more data
+ if (Thread.currentThread().isInterrupted) {
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) writer interrupted, stopping")
+ break
+ }
+ continue
+ }
+ if (text == "#eof") {
+ try {
+ outputStreamWriter.write("\n# eof\n") // Send EOF to process
+ outputStreamWriter.flush()
+ } catch (e: IOException) {
+ // Log error, but proceed to close
+ LOGGER.warning("Worker $workerIndex (thread ${self.id}) failed to write EOF to process: ${e.message}")
+ } finally {
+ try { outputStreamWriter.close() } catch (_: IOException) {}
+ }
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) sent EOF to process and writer is stopping.")
+ break // Exit while loop
+ }
+ try {
+ outputStreamWriter.write(text + "\n# eot\n")
+ outputStreamWriter.flush()
+ } catch (e: IOException) {
+ LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed to write to process: ${e.message}")
+ break // Exit the loop
+ }
+ }
+ } catch (e: Exception) {
+ LOGGER.severe("Writer coroutine in worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+ }
}
+
+ // Reader coroutine
+ coroutineScope.launch {
+ val output = StringBuilder()
+ try {
+ procInStream.bufferedReader().use { reader ->
+ while (!inputGotEof) {
+ if (Thread.currentThread().isInterrupted) {
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) reader interrupted, stopping")
+ break
+ }
+ val line = reader.readLine()
+ if (line == null) {
+ if (process.isAlive) {
+ sleep(5) // Very short sleep when waiting for more output
+ continue
+ } else {
+ break
+ }
+ }
+ when (line) {
+ "# eof" -> {
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) got EOF in output")
+ inputGotEof = true
+ if (output.isNotEmpty()) {
+ printOutput(output.toString()) // Print any remaining output
+ output.clear()
+ }
+ break
+ }
+ "# eot" -> {
+ printOutput(output.toString()) // Assuming printOutput is thread-safe
+ output.clear()
+ }
+ else -> {
+ output.append(line).append('\n')
+ }
+ }
+ }
+ }
+ if (output.isNotEmpty()) { // Print any remaining output
+ printOutput(output.toString())
+ }
+ } catch (e: Exception) {
+ LOGGER.severe("Reader coroutine in worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+ }
+ }
+
+ // Wait for coroutines to complete
try {
- outputStreamWriter.write(text + "\n# eot\n")
- /*text.split("\n\n").forEach {
- outputStreamWriter.write(it + "\n\n")
- }*/
- outputStreamWriter.flush()
- writtenBytes += text.length
- } catch (e: IOException) {
- LOGGER.severe("Worker $it failed to write to process in thread ${currentThread().id}: ${e.message}")
- threads.remove(currentThread())
- threadCount--
- cancel()
- }
-
- }
-
- }
-
- coroutineScope.launch {
- val output = StringBuilder()
- while (!inputGotEof && process.isAlive) {
- process.inputStream.bufferedReader().useLines { lines ->
- lines.forEach { line ->
- when (line) {
- "# eof" -> {
- LOGGER.info("Worker $it got EOF in output")
- inputGotEof = true
- return@forEach }
- "# eot" -> {
- printOutput(output.toString())
- output.clear() }
- else -> { output.append(line, "\n")
- readBytes += line.length +1 }
+ runBlocking {
+ coroutineScope.coroutineContext[Job]?.children?.forEach { job ->
+ job.join()
}
}
- printOutput(output.toString())
- output.clear()
- if (!inputGotEof && process.isAlive) {
- LOGGER.info("Worker $it waiting for more output")
- sleep(10)
- }
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) coroutines completed")
+ } catch (e: InterruptedException) {
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) interrupted while waiting for coroutines")
+ Thread.currentThread().interrupt() // Restore interrupt status
+ } finally {
+ coroutineScope.cancel() // Ensure cleanup
}
}
-
}
- //while (!inputGotEof && process.isAlive) {
- // LOGGER.info("Worker $it waiting for EOF output to finish")
- // sleep(1000)
- // }
- //outputStreamWriter.close()
+
val exitCode = process.waitFor()
if (exitCode != 0) {
- LOGGER.warning("Worker $it exited with code $exitCode")
+ LOGGER.warning("Worker $workerIndex (thread ${self.id}) process exited with code $exitCode")
} else {
- LOGGER.info("Worker $it finished")
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) process finished normally")
}
- threads.remove(currentThread())
} catch (e: IOException) {
+ LOGGER.severe("Worker $workerIndex (thread ${self.id}) failed: ${e.message}")
+ } catch (e: InterruptedException) {
+ LOGGER.info("Worker $workerIndex (thread ${self.id}) was interrupted during processing")
+ Thread.currentThread().interrupt() // Restore interrupt status
+ } catch (e: Exception) { // Catch any other unexpected exceptions during setup or process handling
+ LOGGER.severe("Unhandled exception in worker thread ${self.id} (index $workerIndex): ${e.message}")
e.printStackTrace()
- LOGGER.warning("Worker $it failed: ${e.message}")
- threads.remove(currentThread())
- threadCount--
+ } finally {
+ if (successfullyInitialized) {
+ synchronized(threadsLock) {
+ threads.remove(self)
+ }
+ threadCount.decrementAndGet()
+ LOGGER.info("Worker thread ${self.id} (index $workerIndex) cleaned up and exiting. Active threads: ${threadCount.get()}")
+ } else {
+ LOGGER.warning("Worker thread ${self.id} (index $workerIndex) exiting without full initialization/cleanup.")
+ }
}
}.start()
}
@@ -140,43 +202,100 @@
}
fun pushToQueue(text: String) {
- queue.offer(text)
+ try {
+ queue.put(text)
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while trying to push text to queue.")
+ }
}
fun pushToQueue(texts: List<String>) {
- texts.forEach {
- queue.offer(it)
- }
- // Add an "#eof" marker for each worker to know when to stop
- repeat(queue.remainingCapacity()) {
- queue.offer("#eof")
+ texts.forEach { text ->
+ try {
+ queue.put(text)
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while trying to push texts to queue. Some texts may not have been added.")
+ return // Exit early if interrupted
+ }
}
}
fun close() {
- var n = threadCount
- LOGGER.info("Closing worker pool with $n threads")
- while (n > 0) {
- if (queue.offer("#eof")) {
- n--
- } else {
- LOGGER.info("Queue is full, waiting for workers to process")
- sleep(100)
+ val currentThreadCount = threadCount.get()
+ LOGGER.info("Closing worker pool with $currentThreadCount threads")
+
+ // Send EOF marker for each worker - use numWorkers instead of current thread count
+ // to ensure we send enough EOF markers even if some threads haven't started yet
+ for (i in 0 until numWorkers) {
+ try {
+ queue.put("#eof")
+ LOGGER.info("Sent EOF marker ${i+1}/$numWorkers to queue")
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while sending EOF to workers. Some workers may not shut down cleanly.")
+ break
}
}
- if (threadCount > 0)
+
+ if (threadCount.get() > 0) {
waitForWorkersToFinish()
+ }
}
private fun waitForWorkersToFinish() {
+ LOGGER.info("Waiting for queue to empty (current size: ${queue.size})...")
while (queue.isNotEmpty()) {
- sleep(100) // Wait for queue to empty
+ try {
+ sleep(50) // Reduced sleep time for more responsive monitoring
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while waiting for queue to empty. Proceeding to join threads.")
+ break
+ }
}
- LOGGER.info("Queue is empty, waiting for workers to finish")
- // Create a copy of the threads list to avoid ConcurrentModificationException
- val threadsCopy = threads.toList()
- threadsCopy.forEach(Thread::join)
- LOGGER.info("All workers finished")
+ LOGGER.info("Queue is empty. Joining worker threads.")
+
+ val threadsToJoin: List<Thread>
+ synchronized(threadsLock) {
+ threadsToJoin = threads.toList() // Create copy while holding lock
+ }
+
+ if (threadsToJoin.isEmpty() && threadCount.get() == 0) {
+ LOGGER.info("No threads were active or recorded to join.")
+ } else {
+ LOGGER.info("Attempting to join ${threadsToJoin.size} thread(s) from recorded list (current active count: ${threadCount.get()}).")
+ threadsToJoin.forEach { thread ->
+ try {
+ thread.join(10000) // Increased timeout to 10 seconds
+ if (thread.isAlive) {
+ LOGGER.warning("Thread ${thread.id} did not terminate after 10s. Interrupting.")
+ thread.interrupt()
+ thread.join(2000) // Wait 2 seconds after interrupt
+ if (thread.isAlive) {
+ LOGGER.severe("Thread ${thread.id} failed to terminate after interrupt.")
+ }
+ }
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ LOGGER.warning("Interrupted while joining thread ${thread.id}. It may not have shut down cleanly.")
+ }
+ }
+ }
+
+ val finalCount = threadCount.get()
+ if (finalCount == 0) {
+ LOGGER.info("All worker threads appear to have terminated.")
+ } else {
+ LOGGER.warning("$finalCount worker thread(s) still marked as active according to counter. This might indicate an issue in thread lifecycle management.")
+ synchronized(threadsLock) {
+ if (threads.isNotEmpty()) {
+ LOGGER.warning("The internal threads list is not empty: ${threads.map { it.id }}. Forcing clear.")
+ threads.clear() // Clean up if any refs are lingering despite count issues
+ }
+ }
+ }
}
}