Add option -A to pipe output through annotation tool
Change-Id: Ibb9697af1d519ac2a9211144067d10ea7dbe9d9c
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
index e2fe79d..a76694c 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXml2Conllu.kt
@@ -1,5 +1,6 @@
package de.ids_mannheim.korapxmltools
+import WorkerPool
import org.w3c.dom.Document
import org.w3c.dom.Element
import org.w3c.dom.NodeList
@@ -97,6 +98,20 @@
)
var extractMetadataRegex: MutableList<String> = mutableListOf()
+ @Option(
+ names = ["--annotate-with", "-A"],
+ paramLabel = "COMMAND",
+ description = ["Pipe output through command"]
+ )
+ var annotateWith: String = ""
+
+ @Option(
+ names = ["--threads", "-T"],
+ paramLabel = "THREADS",
+ description = ["Maximum number of threads to use. Default: ${"$"}{DEFAULT-VALUE}"]
+ )
+ var threads: Int = Runtime.getRuntime().availableProcessors()
+
override fun call(): Int {
LOGGER.level = try {
Level.parse(logLevel.uppercase(Locale.getDefault()))
@@ -113,18 +128,20 @@
private val LOGGER: Logger = Logger.getLogger(KorapXml2Conllu::class.java.name)
+ private var workerPool : WorkerPool? = null
+
fun korapxml2conllu(args: Array<String>) {
- val executor: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
+ val executor: ExecutorService = Executors.newFixedThreadPool(threads)
val texts: ConcurrentHashMap<String, String> = ConcurrentHashMap()
val sentences: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
val tokens: ConcurrentHashMap<String, Array<Span>> = ConcurrentHashMap()
val morpho: ConcurrentHashMap<String, MutableMap<String, MorphoSpan>> = ConcurrentHashMap()
val fnames: ConcurrentHashMap<String, String> = ConcurrentHashMap()
- if (args.isEmpty()) {
- LOGGER.severe("Usage: KorapXml2Conllu <zipfile1> [<zipfile2> ...]")
- return
+ if (annotateWith != "") {
+ workerPool = WorkerPool(annotateWith, threads, LOGGER)
}
+
var zips: Array<String> = args
if (args.size == 1 && args[0].matches(Regex(".*\\.([^/.]+)\\.zip$"))) {
val baseZip = args[0].replace(Regex("\\.([^/.]+)\\.zip$"), ".zip")
@@ -167,6 +184,9 @@
morpho
)
}
+ if (annotateWith != "") {
+ workerPool?.close()
+ }
}
@@ -351,8 +371,13 @@
real_token_index++
}
}
- synchronized(System.out) {
- println(output.toString())
+
+ if (annotateWith != "") {
+ workerPool?.pushToQueue(output.append("\n# eot\n").toString())
+ } else {
+ synchronized(System.out) {
+ println(output.toString())
+ }
}
arrayOf(tokens, texts, sentences, morpho, fname).forEach { map ->
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt
new file mode 100644
index 0000000..25c2e1d
--- /dev/null
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/WorkerPool.kt
@@ -0,0 +1,127 @@
+import java.io.*
+import java.lang.Thread.sleep
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.TimeUnit
+import java.util.logging.Logger
+
+class WorkerPool(private val command: String, private val numWorkers: Int, private val LOGGER: Logger) {
+ private val queue: BlockingQueue<String> = LinkedBlockingQueue()
+ private val threads = mutableListOf<Thread>()
+ init {
+ openWorkerPool()
+ LOGGER.info("Worker pool opened")
+ }
+
+ private fun openWorkerPool() {
+ repeat(numWorkers) {
+ Thread {
+ try {
+ threads.add(Thread.currentThread())
+ val process = ProcessBuilder("/bin/sh", "-c", command)
+ //.directory(File("/tmp"))
+ .redirectOutput(ProcessBuilder.Redirect.PIPE)
+ .redirectInput(ProcessBuilder.Redirect.PIPE)
+ .redirectError(ProcessBuilder.Redirect.INHERIT)
+ //.redirectErrorStream(true) // Merges stderr into stdout
+ .start()
+ process.outputStream.buffered(1000000)
+ process.inputStream.buffered(1000000)
+ val outputStreamWriter = process.outputStream.bufferedWriter(Charsets.UTF_8)
+ val output = StringBuilder()
+
+ while (true) {
+ val text = queue.poll(5, TimeUnit.SECONDS)
+ if (text == "#eof" || text == null) {
+ outputStreamWriter.write("\n# eof\n")
+ outputStreamWriter.flush()
+ LOGGER.info("Worker $it received eof")
+ break
+ }
+
+ text.split(Regex("\n\n")).forEach {
+ outputStreamWriter.write(it + "\n\n")
+ outputStreamWriter.flush()
+ readAndPrintAvailable(process, output)
+ }
+ }
+
+ process.outputStream.close()
+ while(process.isAlive && output.indexOf("# eof\n") == -1) {
+ readAndPrintAvailable(process, output)
+ }
+ LOGGER.info("Worker $it got eof in output")
+ output.append(process.inputStream.bufferedReader(Charsets.UTF_8).readText())
+ synchronized(System.out) {
+ print(output.replace(Regex("\\s*\n# eof\n\\s*"), ""))
+ }
+
+ process.inputStream.close()
+
+ } catch (e: IOException) {
+ e.printStackTrace()
+ }
+
+ }.start()
+
+
+ }
+ }
+
+ private fun readAndPrintAvailable(process: Process, output: StringBuilder) {
+ if (process.inputStream.available() > 0) {
+ val readBytes = ByteArray(process.inputStream.available())
+ process.inputStream.read(readBytes)
+ output.append(String(readBytes))
+ val eotOffset = output.lastIndexOf("# eot\n")
+ if (eotOffset > -1) {
+ synchronized(System.out) {
+ print(output.substring(0, eotOffset).replace(Regex("\n# eot\n\\s*"), ""))
+ }
+ output.delete(0, eotOffset + 6)
+ }
+ } else {
+ sleep(1)
+ }
+ }
+
+ fun pushToQueue(text: String) {
+ queue.offer(text)
+ }
+
+ fun pushToQueue(texts: List<String>) {
+ texts.forEach {
+ queue.offer(it)
+ }
+ // Add an "#eof" marker for each worker to know when to stop
+ repeat(queue.remainingCapacity()) {
+ queue.offer("#eof")
+ }
+ }
+
+ fun close() {
+ repeat(numWorkers) {
+ queue.offer("#eof")
+ }
+ waitForWorkersToFinish()
+ }
+
+ private fun waitForWorkersToFinish() {
+ while (queue.isNotEmpty()) {
+ sleep(100) // Wait for queue to empty
+ }
+ threads.forEach(Thread::join)
+ }
+}
+
+fun main() {
+ val command = "cat"
+ val numWorkers = 3
+ val workerPool = WorkerPool(command, numWorkers, Logger.getLogger("WorkerPool") )
+
+ val texts = listOf("The", "World", "This", "Is", "A", "Test")
+
+ workerPool.pushToQueue(texts)
+
+ workerPool.close()
+}
diff --git a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
index 86088c1..c252fd4 100644
--- a/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
+++ b/app/src/test/kotlin/de/ids_mannheim/korapxmltools/KorapXml2ConlluTest.kt
@@ -9,6 +9,7 @@
import kotlin.test.assertContains
import org.junit.Ignore
import kotlin.test.assertFalse
+import kotlin.test.assertTrue
class KorapXml2ConlluTest {
private val outContent = ByteArrayOutputStream(10000000)
@@ -153,6 +154,18 @@
)
}
+ @Test
+ fun canAnnotate() {
+ val args = arrayOf("-A", "sed -e 's/u/x/g'", loadResource("wdf19.zip").path)
+ debug(args)
+ assertContains(
+ outContent.toString(),
+ "axtomatiqxe"
+ )
+ assertTrue("Annotated CoNLL-U should have at least as many lines as the original",
+ { outContent.toString().count { it == '\n'} >= 61511 })
+ }
+
@Ignore("for some reason not working")
fun canConvertMorphoFeatureAnnotations() {
val args = arrayOf(goe, goeMarmot)