totalngrams: sanitize worker node pool
diff --git a/src/main/java/org/ids_mannheim/Worker.java b/src/main/java/org/ids_mannheim/Worker.java
index 1578a98..e1d052b 100644
--- a/src/main/java/org/ids_mannheim/Worker.java
+++ b/src/main/java/org/ids_mannheim/Worker.java
@@ -49,8 +49,8 @@
String fname = fnames.get(index);
File current_file = new File(fname);
long file_size = current_file.length();
- int poolIndex = index % pool.size;
- logger.info(String.format("%5d/%5d %-10s %-10s", index, fnames.size(), pool.getHost(poolIndex), current_file.getName()));
+ int poolIndex = pool.getNextFree();
+ logger.info(String.format("Started %d/%d %s %s", index, fnames.size(), pool.getHost(poolIndex), current_file.getName()));
String[] cmd = {
"/bin/sh",
"-c",
@@ -78,6 +78,7 @@
//noinspection ConstantConditions
slidingWindowQueue.add(strings[1]);
}
+ pool.markFree(poolIndex);
if (texts > 0) {
logger.info(pool.getHost(poolIndex)+" finished " + fname + " with "+texts+ " texts");
etaPrinter.update(file_size);
@@ -90,7 +91,6 @@
} else {
logger.severe ("Giving up " + fname);
index = queue.take();
- sleep(1000);
}
}
}
diff --git a/src/main/java/org/ids_mannheim/WorkerNodePool.java b/src/main/java/org/ids_mannheim/WorkerNodePool.java
index 255809d..3d9485e 100644
--- a/src/main/java/org/ids_mannheim/WorkerNodePool.java
+++ b/src/main/java/org/ids_mannheim/WorkerNodePool.java
@@ -2,11 +2,16 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.IntStream;
public class WorkerNodePool {
- ArrayList<String> execPool = new ArrayList<String>();
- ArrayList<String> hostPool = new ArrayList<String>();
+ CopyOnWriteArrayList<String> execPool = new CopyOnWriteArrayList<>();
+ CopyOnWriteArrayList<String> hostPool = new CopyOnWriteArrayList<>();
+ CopyOnWriteArrayList<Boolean> busy = new CopyOnWriteArrayList<>();
+
public final int size;
public String getExec(int i) {
@@ -18,13 +23,31 @@
}
private String hostToExec(String host) {
- if(host.equals("") || host.matches("local")) {
+ if (host.equals("") || host.matches("local")) {
return "nice -5 ";
} else {
return " ssh " + host + " nice -5 ";
}
}
+ public void markFree(int i) {
+ busy.set(i, false);
+ }
+
+ public synchronized int getNextFree() throws InterruptedException {
+ while (true) {
+ OptionalInt result = IntStream.range(0, size).filter(i -> !busy.get(i)).findFirst();
+ if (result.isPresent()) {
+ int i = result.getAsInt();
+ busy.set(i, true);
+ return i;
+ } else {
+ TotalNGram.logger.warning("No free worker node. This should not happen.");
+ Thread.sleep(1000);
+ }
+ }
+ }
+
public WorkerNodePool(String description) {
Arrays.stream(description.split(", *")).forEachOrdered(s -> {
String[] single = s.split("\\s*[*]\\s*");
@@ -44,6 +67,7 @@
IntStream.range(0, procs).forEach(u -> {
hostPool.add(host);
execPool.add(hostToExec(host));
+ busy.add(false);
});
});