Add option -A to pipe output through annotation tool

Change-Id: Ibb9697af1d519ac2a9211144067d10ea7dbe9d9c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
index e2fe79d..a76694c 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
@@ -1,5 +1,6 @@
 package de.ids_mannheim.korapxmltools
 
+import WorkerPool
 import org.w3c.dom.Document
 import org.w3c.dom.Element
 import org.w3c.dom.NodeList
@@ -97,6 +98,20 @@
     )
     var extractMetadataRegex: MutableList<String> = mutableListOf()
 
+    @Option(
+        names = ["--annotate-with", "-A"],
+        paramLabel = "COMMAND",
+        description = ["Pipe output through command"]
+    )
+    var annotateWith: String = ""
+
+    @Option(
+        names = ["--threads", "-T"],
+        paramLabel = "THREADS",
+        description = ["Maximum number of threads to use. Default: ${"$"}{DEFAULT-VALUE}"]
+    )
+    var threads: Int = Runtime.getRuntime().availableProcessors()
+
     override fun call(): Int {
         LOGGER.level = try {
             Level.parse(logLevel.uppercase(Locale.getDefault()))
@@ -113,18 +128,20 @@
 
     private val LOGGER: Logger = Logger.getLogger(KorapXml2Conllu::class.java.name)
 
+    private var workerPool : WorkerPool? = null
+
     fun korapxml2conllu(args: Array<String>) {
-        val executor: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
+        val executor: ExecutorService = Executors.newFixedThreadPool(threads)
         val texts: ConcurrentHashMap<String, String> = ConcurrentHashMap()
         val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
         val tokens: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
         val morpho: ConcurrentHashMap<String, MutableMap<String, MorphoSpan>> = ConcurrentHashMap()
         val fnames: ConcurrentHashMap<String, String> = ConcurrentHashMap()
 
-        if (args.isEmpty()) {
-            LOGGER.severe("Usage: KorapXml2Conllu <zipfile1> [<zipfile2> ...]")
-            return
+        if (annotateWith != "") {
+            workerPool = WorkerPool(annotateWith, threads, LOGGER)
         }
+
         var zips: Array<String> = args
         if (args.size == 1 && args[0].matches(Regex(".*\\.([^/.]+)\\.zip$"))) {
             val baseZip = args[0].replace(Regex("\\.([^/.]+)\\.zip$"), ".zip")
@@ -167,6 +184,9 @@
                 morpho
             )
         }
+        if (annotateWith != "") {
+            workerPool?.close()
+        }
     }
 
 
@@ -351,8 +371,13 @@
                 real_token_index++
             }
         }
-        synchronized(System.out) {
-            println(output.toString())
+
+        if (annotateWith != "") {
+            workerPool?.pushToQueue(output.append("\n# eot\n").toString())
+        } else {
+            synchronized(System.out) {
+                println(output.toString())
+            }
         }
 
         arrayOf(tokens, texts, sentences, morpho, fname).forEach { map ->
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt
new file mode 100644
index 0000000..25c2e1d
--- /dev/null
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt
@@ -0,0 +1,127 @@
+import java.io.*
+import java.lang.Thread.sleep
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.TimeUnit
+import java.util.logging.Logger
+
+class WorkerPool(private val command: String, private val numWorkers: Int, private val LOGGER: Logger) {
+    private val queue: BlockingQueue<String> = LinkedBlockingQueue()
+    private val threads = mutableListOf<Thread>()
+    init {
+        openWorkerPool()
+        LOGGER.info("Worker pool opened")
+    }
+
+    private fun openWorkerPool() {
+           repeat(numWorkers) {
+            Thread {
+                    try {
+                        threads.add(Thread.currentThread())
+                        val process = ProcessBuilder("/bin/sh", "-c", command)
+                            //.directory(File("/tmp"))
+                            .redirectOutput(ProcessBuilder.Redirect.PIPE)
+                            .redirectInput(ProcessBuilder.Redirect.PIPE)
+                            .redirectError(ProcessBuilder.Redirect.INHERIT)
+                            //.redirectErrorStream(true) // Merges stderr into stdout
+                            .start()
+                        process.outputStream.buffered(1000000)
+                        process.inputStream.buffered(1000000)
+                        val outputStreamWriter = process.outputStream.bufferedWriter(Charsets.UTF_8)
+                        val output = StringBuilder()
+
+                        while (true) {
+                            val text = queue.poll(5, TimeUnit.SECONDS)
+                            if (text == "#eof" || text == null) {
+                                outputStreamWriter.write("\n# eof\n")
+                                outputStreamWriter.flush()
+                                LOGGER.info("Worker $it received eof")
+                                break
+                            }
+
+                            text.split(Regex("\n\n")).forEach {
+                                outputStreamWriter.write(it + "\n\n")
+                                outputStreamWriter.flush()
+                                readAndPrintAvailable(process, output)
+                            }
+                        }
+
+                        process.outputStream.close()
+                        while(process.isAlive && output.indexOf("# eof\n") == -1) {
+                            readAndPrintAvailable(process, output)
+                        }
+                        LOGGER.info("Worker $it got eof in output")
+                        output.append(process.inputStream.bufferedReader(Charsets.UTF_8).readText())
+                        synchronized(System.out) {
+                            print(output.replace(Regex("\\s*\n# eof\n\\s*"), ""))
+                        }
+
+                        process.inputStream.close()
+
+                    } catch (e: IOException) {
+                        e.printStackTrace()
+                    }
+
+            }.start()
+
+
+        }
+    }
+
+    private fun readAndPrintAvailable(process: Process, output: StringBuilder) {
+        if (process.inputStream.available() > 0) {
+            val readBytes = ByteArray(process.inputStream.available())
+            process.inputStream.read(readBytes)
+            output.append(String(readBytes))
+            val eotOffset = output.lastIndexOf("# eot\n")
+            if (eotOffset > -1) {
+                synchronized(System.out) {
+                    print(output.substring(0, eotOffset).replace(Regex("\n# eot\n\\s*"), ""))
+                }
+                output.delete(0, eotOffset + 6)
+            }
+        } else {
+            sleep(1)
+        }
+    }
+
+    fun pushToQueue(text: String) {
+        queue.offer(text)
+    }
+
+    fun pushToQueue(texts: List<String>) {
+        texts.forEach {
+            queue.offer(it)
+        }
+        // Add an "#eof" marker for each worker to know when to stop
+        repeat(queue.remainingCapacity()) {
+            queue.offer("#eof")
+        }
+    }
+
+    fun close() {
+        repeat(numWorkers) {
+            queue.offer("#eof")
+        }
+        waitForWorkersToFinish()
+    }
+
+    private fun waitForWorkersToFinish() {
+        while (queue.isNotEmpty()) {
+            sleep(100) // Wait for queue to empty
+        }
+        threads.forEach(Thread::join)
+    }
+}
+
+fun main() {
+    val command = "cat"
+    val numWorkers = 3
+    val workerPool = WorkerPool(command, numWorkers, Logger.getLogger("WorkerPool") )
+
+    val texts = listOf("The", "World", "This", "Is", "A", "Test")
+
+    workerPool.pushToQueue(texts)
+
+    workerPool.close()
+}
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
index 86088c1..c252fd4 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
@@ -9,6 +9,7 @@
 import kotlin.test.assertContains
 import org.junit.Ignore
 import kotlin.test.assertFalse
+import kotlin.test.assertTrue
 
 class KorapXml2ConlluTest {
     private val outContent = ByteArrayOutputStream(10000000)
@@ -153,6 +154,18 @@
         )
     }
 
+    @Test
+    fun canAnnotate() {
+        val args = arrayOf("-A", "sed -e 's/u/x/g'", loadResource("wdf19.zip").path)
+        debug(args)
+        assertContains(
+            outContent.toString(),
+            "axtomatiqxe"
+        )
+        assertTrue("Annotated CoNLL-U should have at least as many lines as the original",
+            { outContent.toString().count { it == '\n'} >= 61511 })
+    }
+
     @Ignore("for some reason not working")
     fun canConvertMorphoFeatureAnnotations() {
         val args = arrayOf(goe, goeMarmot)