w2v-server.pl: make knn-search multi-threaded
diff --git a/w2v-server.pl b/w2v-server.pl
index 8c7d048..e37ef3b 100644
--- a/w2v-server.pl
+++ b/w2v-server.pl
@@ -23,6 +23,7 @@
push(@lists, get_neighbours(encode("iso-8859-1", $w), $no_nbs));
}
}
+ $word =~ s/ *\| */ | /g;
$c->render(template=>"index", word=>$word, no_nbs=>$no_nbs, no_iterations => $no_iterations, lists=> \@lists);
};
@@ -39,18 +40,35 @@
#include <malloc.h>
#include <stdlib.h> //strlen
#include <sys/mman.h>
+#include <pthread.h>
#define max_size 2000
#define max_w 50
#define MAX_NEIGHBOURS 1000
#define MAX_WORDS -1
+#define MAX_THREADS 100
//the thread function
void *connection_handler(void *);
-
+
+typedef struct {
+ long long *index;
+ float *dist;
+ unsigned int length;
+} knn;
+
+
+typedef struct {
+ char *token;
+ int N;
+ unsigned long from;
+ unsigned long upto;
+} knnpars;
+
float *M;
char *vocab;
long long words, size;
+int num_threads=20;
int init_net(char *file_name) {
FILE *f, *binvecs, *binwords;
@@ -73,8 +91,8 @@
if(MAX_WORDS > 0 && words > MAX_WORDS) words = MAX_WORDS;
fscanf(f, "%lld", &size);
if( (binvecs_fd = open(binvecs_fname, O_RDONLY)) >= 0 && (binwords_fd = open(binwords_fname, O_RDONLY)) >= 0) {
- M = mmap(0, sizeof(float) * (long long)words * (long long)size, PROT_READ, MAP_PRIVATE, binvecs_fd, 0);
- vocab = mmap(0, sizeof(char) * (long long)words * max_w, PROT_READ, MAP_PRIVATE, binwords_fd, 0);
+ M = mmap(0, sizeof(float) * (long long)words * (long long)size, PROT_READ, MAP_SHARED, binvecs_fd, 0);
+ vocab = mmap(0, sizeof(char) * (long long)words * max_w, PROT_READ, MAP_SHARED, binwords_fd, 0);
if (M == MAP_FAILED || vocab == MAP_FAILED) {
close(binvecs_fd);
close(binwords_fd);
@@ -113,20 +131,24 @@
return 0;
}
-SV *get_neighbours(char *st1, int N) {
- char *bestw[MAX_NEIGHBOURS];
+
+void *_get_neighbours(knnpars *pars) {
+ char *st1 = pars->token;
+ int N = pars->N;
+ unsigned long from = pars -> from;
+ unsigned long upto = pars -> upto;
char file_name[max_size], st[100][max_size];
- float dist, len, bestd[MAX_NEIGHBOURS], vec[max_size];
- long long a, b, c, d, cn, bi[100], besti[MAX_NEIGHBOURS];
+ float dist, len, *bestd, vec[max_size];
+ long long a, b, c, d, cn, bi[100], *besti;
char ch;
+ knn *nbs = NULL;
- if(N>MAX_NEIGHBOURS) N=MAX_NEIGHBOURS;
+ besti = malloc(N * sizeof(long long));
+ bestd = malloc(N * sizeof(float));
- for (a = 0; a < MAX_NEIGHBOURS; a++) bestw[a] = (char *)malloc(max_size * sizeof(char));
- float worstbest=0;
+ float worstbest=-1;
for (a = 0; a < N; a++) bestd[a] = 0;
- for (a = 0; a < N; a++) bestw[a][0] = 0;
a = 0;
cn = 0;
b = 0;
@@ -155,7 +177,7 @@
break;
}
}
- if (b == -1 && cn <= 0) {
+ if (b == -1) {
N = 0;
goto end;
}
@@ -169,8 +191,7 @@
len = sqrt(len);
for (a = 0; a < size; a++) vec[a] /= len;
for (a = 0; a < N; a++) bestd[a] = -1;
- for (a = 0; a < N; a++) bestw[a][0] = 0;
- for (c = 0; c < words; c++) {
+ for (c = from; c < upto; c++) {
a = 0;
// do not skip taget word
// for (b = 0; b < cn; b++) if (bi[b] == c) a = 1;
@@ -193,25 +214,81 @@
}
}
+ nbs = malloc(sizeof(knn));
+ nbs->index = besti;
+ nbs->dist = bestd;
+ nbs->length = N;
end:
- a=0;
- AV* array = newAV();
- for (a = 0; a < N; a++) {
- strcpy(bestw[a], &vocab[besti[a] * max_w]);
- HV* hash = newHV();
- hv_store(hash, "word", strlen("word"), newSVpvf(bestw[a], 0), 0);
- hv_store(hash, "dist", strlen("dist"), newSVnv(bestd[a]), 0);
- hv_store(hash, "rank", strlen("rank"), newSVuv(besti[a]), 0);
- AV *vector = newAV();
- for (b = 0; b < size; b++) {
- av_push(vector, newSVnv(M[b + besti[a] * size]));
- }
- hv_store(hash, "vector", strlen("vector"), newRV_noinc((SV*)vector), 0);
- av_push(array, newRV_noinc((SV*)hash));
- }
- return newRV_noinc((SV*)array);
+ pthread_exit(nbs);
}
+SV *get_neighbours(char *st1, int N) {
+ float bestd[MAX_NEIGHBOURS], vec[max_size];
+ long long besti[MAX_NEIGHBOURS], a, b, c, d, slice;
+ char *bestw[MAX_NEIGHBOURS];
+ knn *nbs[MAX_THREADS];
+ knnpars pars[MAX_THREADS];
+ pthread_t *pt = (pthread_t *)malloc(num_threads * sizeof(pthread_t));
+ AV* array = newAV();
+
+ if(N>MAX_NEIGHBOURS) N=MAX_NEIGHBOURS;
+
+ slice = words / num_threads;
+
+ for(a=0; a < num_threads; a++) {
+ pars[a].token = st1;
+ pars[a].N = N;
+ pars[a].from = a*slice;
+ pars[a].upto = ((a+1)*slice > words? words:(a+1)*slice);
+ pthread_create(&pt[a], NULL, _get_neighbours, (void *) &pars[a]);
+ }
+ for (a = 0; a < num_threads; a++) pthread_join(pt[a], &nbs[a]);
+
+ if(!nbs[0])
+ goto end;
+
+ for(b=0; b < N; b++) {
+ besti[b] = nbs[0]->index[b];
+ bestd[b] = nbs[0]->dist[b];
+ }
+
+ for(a=1; a < num_threads; a++) {
+ for(b=0; b < N; b++) {
+ for(c=0; c < N; c++) {
+ if(nbs[a]->dist[b] > bestd[c]) {
+ for(d=N-1; d>c; d--) {
+ bestd[d] = bestd[d-1];
+ besti[d] = besti[d-1];
+ }
+ besti[c] = nbs[a]->index[b];
+ bestd[c] = nbs[a]->dist[b];
+ break;
+ }
+ }
+ }
+ }
+
+ if(nbs) {
+ for (a = 0; a < N; a++) {
+ bestw[a] = (char *)malloc(max_size * sizeof(char));
+ }
+ for (a = 0; a < N; a++) {
+ strcpy(bestw[a], &vocab[besti[a] * max_w]);
+ HV* hash = newHV();
+ hv_store(hash, "word", strlen("word"), newSVpvf(bestw[a], 0), 0);
+ hv_store(hash, "dist", strlen("dist"), newSVnv(bestd[a]), 0);
+ hv_store(hash, "rank", strlen("rank"), newSVuv(besti[a]), 0);
+ AV *vector = newAV();
+ for (b = 0; b < size; b++) {
+ av_push(vector, newSVnv(M[b + besti[a] * size]));
+ }
+ hv_store(hash, "vector", strlen("vector"), newRV_noinc((SV*)vector), 0);
+ av_push(array, newRV_noinc((SV*)hash));
+ }
+ }
+end:
+ return newRV_noinc((SV*)array);
+}
__DATA__