Nodes can now be distributed to several servers
diff --git a/src/main/java/de/ids_mannheim/korap/KorapNode.java b/src/main/java/de/ids_mannheim/korap/KorapNode.java
index 7227085..24ff746 100644
--- a/src/main/java/de/ids_mannheim/korap/KorapNode.java
+++ b/src/main/java/de/ids_mannheim/korap/KorapNode.java
@@ -1,5 +1,8 @@
package de.ids_mannheim.korap;
+import java.util.*;
+import java.io.*;
+
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;
@@ -7,13 +10,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
import java.net.URI;
+import java.beans.PropertyVetoException;
import de.ids_mannheim.korap.KorapIndex;
import org.apache.lucene.store.MMapDirectory;
+import com.mchange.v2.c3p0.*;
/**
* Standalone REST-Service for the Lucene Search Backend.
@@ -23,36 +26,59 @@
public class KorapNode {
// Base URI the Grizzly HTTP server will listen on
- public static final String BASE_URI = "http://localhost:8080/";
+ public static String BASE_URI = "http://localhost:8080/";
// Logger
private final static Logger log = LoggerFactory.getLogger(KorapNode.class);
// Index
private static KorapIndex index;
+ private static ComboPooledDataSource cpds;
+ private static String path;
+ private static String name = "unknown";
- /*
- Todo: Use korap.config for paths to indexDirectory
- */
- private static String path =
- new String("/home/ndiewald/Repositories/korap/KorAP-modules/KorAP-lucene-index/sandbox/index");
+ private static String dbUser, dbPwd;
- private static String name = "tanja";
-
+ private static String dbClass = "org.sqlite.JDBC";
+ private static String dbURL = "jdbc:sqlite:";
/*
* Todo: Add shutdown hook,
+ * Then also close cdps.close();
* see: https://10.0.10.12/trac/korap/browser/KorAP-modules/KorAP-REST/src/main/java/de/ids_mannheim/korap/web/Application.java
* https://10.0.10.12/trac/korap/browser/KorAP-modules/KorAP-REST/src/main/java/de/ids_mannheim/korap/web/ShutdownHook.java
*/
-
/**
* Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
* @return Grizzly HTTP server.
*/
public static HttpServer startServer() {
+ // Load configuration
+ try {
+ InputStream file = new FileInputStream(
+ KorapNode.class.getClassLoader().getResource("server.properties").getFile()
+ );
+ Properties prop = new Properties();
+ prop.load(file);
+
+ // Node properties
+ path = prop.getProperty("lucene.indexDir", path);
+ name = prop.getProperty("lucene.node.name", name);
+ BASE_URI = prop.getProperty("lucene.node.baseURI", BASE_URI);
+
+ // Database properties
+ dbUser = prop.getProperty("lucene.db.user", dbUser);
+ dbPwd = prop.getProperty("lucene.db.pwd", dbPwd);
+ dbClass = prop.getProperty("lucene.db.class", dbClass);
+ dbURL = prop.getProperty("lucene.db.jdbcURL", dbURL);
+
+ }
+ catch (IOException e) {
+ log.error(e.getLocalizedMessage());
+ };
+
// create a resource config that scans for JAX-RS resources and providers
// in de.ids_mannheim.korap.server package
final ResourceConfig rc =
@@ -97,6 +123,32 @@
return name;
};
+ public static String getListener () {
+ return BASE_URI;
+ };
+
+ public static ComboPooledDataSource getDBPool () {
+ if (cpds != null)
+ return cpds;
+
+ try {
+ cpds = new ComboPooledDataSource();
+ cpds.setDriverClass(dbClass);
+ cpds.setJdbcUrl(dbURL);
+ if (dbUser != null)
+ cpds.setUser(dbUser);
+ if (dbPwd != null)
+ cpds.setPassword(dbPwd);
+ cpds.setMaxStatements(100);
+ return cpds;
+ }
+ catch (PropertyVetoException e) {
+ log.error(e.getLocalizedMessage());
+ };
+ return null;
+ };
+
+
// Get Index
public static KorapIndex getIndex () {
if (index != null)
diff --git a/src/main/java/de/ids_mannheim/korap/index/collector/MatchCollectorDB.java b/src/main/java/de/ids_mannheim/korap/index/collector/MatchCollectorDB.java
index 0dbe73c..62bc4c7 100644
--- a/src/main/java/de/ids_mannheim/korap/index/collector/MatchCollectorDB.java
+++ b/src/main/java/de/ids_mannheim/korap/index/collector/MatchCollectorDB.java
@@ -1,4 +1,5 @@
package de.ids_mannheim.korap.index.collector;
+import de.ids_mannheim.korap.KorapNode;
import de.ids_mannheim.korap.KorapMatch;
import de.ids_mannheim.korap.index.MatchCollector;
import com.fasterxml.jackson.annotation.*;
@@ -8,18 +9,25 @@
import java.sql.SQLException;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class MatchCollectorDB extends MatchCollector {
+ // Logger
+ private final static Logger log = LoggerFactory.getLogger(KorapNode.class);
+
/*
Todo: In case there are multiple threads searching,
the list should be synchrinized Collections.synchronizedList()
*/
private String databaseType;
private List matchCollector;
- private int bufferSize;
- private int docCollect;
+ private int bufferSize, docCollect;
private String resultID;
+ // private Connection connection;
+ private DataSource pool;
private Connection connection;
private PreparedStatement prepared;
@@ -36,12 +44,13 @@
* Add matches till the bufferSize exceeds - then commit to the database.
*/
public void add (int UID, int matchCount) {
+ if (this.docCollect == bufferSize)
+ this.commit();
+
this.incrTotalResultDocs(1);
this.incrTotalResults(matchCount);
this.matchCollector.add(new int[]{UID, matchCount});
-
- if (this.docCollect++ > bufferSize)
- this.commit();
+ this.docCollect++;
};
@JsonIgnore
@@ -55,10 +64,9 @@
};
@JsonIgnore
- public void openConnection (String type, DataSource ds) throws SQLException {
+ public void setDBPool (String type, DataSource ds) throws SQLException {
this.setDatabaseType(type);
- this.connection = ds.getConnection();
- this.connection.setAutoCommit(false);
+ this.pool = ds;
// Create prepared statement for multiple requests
@@ -73,20 +81,93 @@
};
+ /* TODO: Ensure the commit was successful! */
public void commit () {
+ if (this.pool == null)
+ return;
- /*
- */
- this.matchCollector.clear();
- this.docCollect = 0;
+ try {
+ // This should be heavily optimized! It's aweful!
+ /*
+ * ARGHHHHHHH!
+ */
+
+ if (this.connection == null)
+ this.connection = this.pool.getConnection();
+
+ // TODO: Create a BEGIN ... COMMIT Transaction
+ // connection.setAutoCommit(true);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ");
+ sb.append(this.resultID);
+ sb.append(" (text_id, match_count) ");
+
+ // SQLite insertion idiom
+ if (this.getDatabaseType().equals("sqlite")) {
+ for (int i = 1; i < this.docCollect; i++) {
+ sb.append("SELECT ?, ? UNION ");
+ }
+ if (this.docCollect == 1)
+ sb.append("VALUES (?, ?)");
+ else
+ sb.append("SELECT ?, ?");
+ }
+
+ // MySQL insertion idiom
+ else if (this.getDatabaseType().equals("mysql")) {
+ sb.append(" VALUES ");
+ for (int i = 1; i < this.docCollect; i++) {
+ sb.append("(?,?),");
+ };
+ sb.append("(?,?)");
+ }
+ else {
+ log.error("Unsupported Database type");
+ return;
+ };
+
+ // System.err.println(sb.toString());
+
+ PreparedStatement prep = connection.prepareStatement(sb.toString());
+
+ int i = 1;
+ ListIterator li = this.matchCollector.listIterator();
+ while (li.hasNext()) {
+ int[] v = (int[]) li.next();
+ // System.err.println("Has " + i + ":" + v[0]);
+ prep.setInt(i++, v[0]);
+ // System.err.println("Has " + i + ":" + v[1]);
+ prep.setInt(i++, v[1]);
+ // System.err.println("-");
+ };
+
+ // System.err.println(sb.toString());
+
+ prep.addBatch();
+ prep.executeBatch();
+ // connection.setAutoCommit(false);
+ // connection.close();
+ this.matchCollector.clear();
+ this.docCollect = 0;
+ }
+ catch (SQLException e) {
+ this.matchCollector.clear();
+ this.docCollect = 0;
+ System.err.println("Error: " + e.getLocalizedMessage());
+ log.error(e.getLocalizedMessage());
+ };
+ return;
};
public void close () {
this.commit();
+ /*
try {
this.connection.close();
}
catch (SQLException e) {
};
+ */
};
};
diff --git a/src/main/java/de/ids_mannheim/korap/server/KorapResponse.java b/src/main/java/de/ids_mannheim/korap/server/KorapResponse.java
index d1edf4b..6e2d967 100644
--- a/src/main/java/de/ids_mannheim/korap/server/KorapResponse.java
+++ b/src/main/java/de/ids_mannheim/korap/server/KorapResponse.java
@@ -20,9 +20,9 @@
public class KorapResponse {
ObjectMapper mapper = new ObjectMapper();
- private String errstr, msg, version, node;
+ private String errstr, msg, version, node, listener;
private int err, unstaged;
- private int totalResults = 0;
+ private int totalResults;
private String benchmark;
public KorapResponse (String node, String version) {
@@ -132,6 +132,14 @@
return this.benchmark;
};
+ public KorapResponse setListener (String listener) {
+ this.listener = listener;
+ return this;
+ };
+
+ public String getListener () {
+ return this.listener;
+ }
// Serialize
public String toJSON () {
diff --git a/src/main/java/de/ids_mannheim/korap/server/Resource.java b/src/main/java/de/ids_mannheim/korap/server/Resource.java
index 818b6c3..ba8d3e9 100644
--- a/src/main/java/de/ids_mannheim/korap/server/Resource.java
+++ b/src/main/java/de/ids_mannheim/korap/server/Resource.java
@@ -29,6 +29,8 @@
import de.ids_mannheim.korap.server.KorapResponse;
import de.ids_mannheim.korap.index.FieldDocument;
import de.ids_mannheim.korap.util.QueryException;
+import de.ids_mannheim.korap.index.MatchCollector;
+import de.ids_mannheim.korap.index.collector.MatchCollectorDB;
import java.util.List;
import java.util.regex.Pattern;
@@ -36,7 +38,10 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import com.mchange.v2.c3p0.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
/*
http://www.vogella.com/tutorials/REST/article.html
@@ -56,6 +61,8 @@
static Pattern p = Pattern.compile("\\s*(?i:false|null)\\s*");
private String version;
+ // Logger
+ private final static Logger log = LoggerFactory.getLogger(KorapNode.class);
private static boolean isNull (String value) {
if (value == null)
@@ -68,6 +75,17 @@
return false;
};
+ @GET
+ @Path("/info")
+ @Produces(MediaType.APPLICATION_JSON)
+ public String info () {
+ KorapIndex index = KorapNode.getIndex();
+ KorapResponse kresp = new KorapResponse(KorapNode.getName(), index.getVersion());
+ kresp.setListener(KorapNode.getListener());
+ return kresp.setMsg("Up and running!").toJSON();
+ };
+
+
/**
* Add new documents to the index
*
@@ -208,11 +226,13 @@
*
* @param text_id
*/
- @POST
+ @PUT
@Path("/collect/{resultID}")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
- public String collect (String json, @Context UriInfo uri) {
+ public String collect (String json,
+ @PathParam("resultID") String resultID,
+ @Context UriInfo uri) {
// Get index
KorapIndex index = KorapNode.getIndex();
@@ -224,44 +244,27 @@
index.getVersion()
).setError(601, "Unable to find index").toJSON();
-
- return "";
-
// Get the database
- // Create a database based matchcollector
+ try {
+ MatchCollectorDB mc = new MatchCollectorDB(1000, "Res_" + resultID);
+ mc.setDBPool("mysql", KorapNode.getDBPool());
+ // TODO: Only search in self documents (REPLICATION FTW!)
- // TODO: Only search in self documents (REPLICATION FTW!)
+ KorapSearch ks = new KorapSearch(json);
+ MatchCollector result = index.collect(ks, mc);
- //MatchCollector result = index.collect(KorapCollection, KorapSearch, MatchCollector);
- /*
-
- // Build Collection based on a list of uids
- List<String> uids = qp.get("uid");
- KorapCollection kc = new KorapCollection();
- kc.filterUIDs(uids.toArray(new String[uids.size()]));
-
- // TODO: RESTRICT COLLECTION TO ONLY RESPECT SELF DOCS (REPLICATION)
-
- // Override old collection
- ks.setCollection(kc);
-
- // Only return the first match per text
- ks.setItemsPerResource(1);
-
- return ks.run(index).toJSON();
- };
- KorapResult kr = new KorapResult();
- kr.setNode(KorapNode.getName());
- kr.setError(610, "No UUIDs given");
- return kr.toJSON();
+ result.setNode(KorapNode.getName());
+ return result.toJSON();
+ }
+ catch (SQLException e) {
+ log.error(e.getLocalizedMessage());
};
return new KorapResponse(
KorapNode.getName(),
index.getVersion()
- ).setError(601, "Unable to find index").toJSON();
-*/
+ ).setError("Unable to connect to database").toJSON();
};
diff --git a/src/test/java/de/ids_mannheim/korap/server/TestDatabase.java b/src/test/java/de/ids_mannheim/korap/server/TestDatabase.java
index 80276cf..ec06631 100644
--- a/src/test/java/de/ids_mannheim/korap/server/TestDatabase.java
+++ b/src/test/java/de/ids_mannheim/korap/server/TestDatabase.java
@@ -15,6 +15,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Ignore;
import static org.junit.Assert.assertEquals;
public class TestDatabase {
@@ -63,17 +64,18 @@
rs.close();
};
- @Test
+ /*
+ * The following tests don't work well with in-memory dbs and
+ * temporary dbs - should be improved
+ */
+
+ @Ignore
public void TestDatabasePool () throws Exception {
ComboPooledDataSource cpds = new ComboPooledDataSource();
// Connect to a temporary file instead of a in-memory file
cpds.setDriverClass("org.sqlite.JDBC");
- cpds.setJdbcUrl("jdbc:sqlite:");
- // cpds.setUser(xxx);
- // cpds.setPassword(xxx);
+ cpds.setJdbcUrl("jdbc:sqlite:hui");
cpds.setMaxStatements(100);
- // cpds.autoCommitOnClose(true);
- // cpds.aquireRetryDelay(100);
// This is part of the match collector
this.conn = cpds.getConnection();
@@ -81,7 +83,7 @@
stat.executeUpdate(
"CREATE TABLE IF NOT EXISTS result_a (text_id INTEGER, match_count INTEGER);"
);
- conn.setAutoCommit(false);
+ // conn.setAutoCommit(false);
PreparedStatement prep = this.conn.prepareStatement(
"INSERT INTO result_a VALUES (?, ?);"
);
@@ -99,12 +101,10 @@
rs.close();
- this.conn.close();
-
+ // this.conn.close();
MatchCollectorDB mc = new MatchCollectorDB(2000, "result_a");
- mc.openConnection("sqlite", cpds);
-
+ mc.setDBPool("sqlite", cpds);
mc.add(9, 5000);
mc.add(12, 6785);
@@ -112,14 +112,51 @@
mc.close();
- cpds.close();
-
/*
this.stat = this.conn.createStatement();
stat.executeUpdate("CREATE TABLE IF NOT EXISTS result_a (text_id INTEGER, match_count INTEGER);");
*/
};
+ @Ignore
+ public void TestDatabasePoolConnector () throws Exception {
+ ComboPooledDataSource cpds = new ComboPooledDataSource();
+ // Connect to a temporary file instead of a in-memory file
+ cpds.setDriverClass("org.sqlite.JDBC");
+ cpds.setJdbcUrl("jdbc:sqlite:hui");
+ cpds.setMaxStatements(100);
+
+ // This is part of the match collector
+ conn = cpds.getConnection();
+ stat = conn.createStatement();
+ // conn.setAutoCommit(true);
+ stat.executeUpdate(
+ "CREATE TABLE matchXYZ (text_id INTEGER, match_count INTEGER);"
+ );
+
+ MatchCollectorDB mc = new MatchCollectorDB(3, "matchXYZ");
+ mc.setDBPool("sqlite", cpds);
+
+ mc.add(9, 5000);
+ mc.add(12, 6785);
+ mc.add(39, 56576);
+ // First commit
+
+ mc.add(45, 5000);
+ mc.add(67, 6785);
+ mc.add(81, 56576);
+ // Second commit
+
+ mc.add(94, 456);
+ mc.close();
+ // Final commit
+
+ // conn = cpds.getConnection();
+ stat = conn.createStatement();
+ ResultSet rs = stat.executeQuery("SELECT count('*') AS num FROM matchXYZ;");
+ rs.next();
+ assertEquals(7, rs.getInt("num"));
+ };
@Test
public void TestMatchCollectorDB () throws Exception {