blob: bc39587c40b6e0cb07b5a90d1b079b3e90a8a539 [file] [log] [blame]
Akron5cf5fca2017-10-09 19:01:47 +02001package Krawfish::Compile::Node;
Akrone5166b02017-12-05 21:21:25 +01002use Array::Queue::Priority;
3use Krawfish::Util::Heap;
4use Krawfish::Log;
Akron15f1bc92017-12-06 13:41:07 +01005use Role::Tiny::With;
Akron83236072017-06-21 15:54:16 +02006use strict;
7use warnings;
8
Akron15f1bc92017-12-06 13:41:07 +01009with 'Krawfish::Compile';
Akron83236072017-06-21 15:54:16 +020010
Akrone5166b02017-12-05 21:21:25 +010011# This will sort the incoming results using a heap
12# and the sort criteria.
13# The priority queue will have n entries for n channels.
14# When the list is full, the top entry is taken and the
15# next entry of the channel of the top entry is enqueued.
Akron66485972017-12-07 19:53:14 +010016#
17# This may be less efficient than a dynamic
18# mergesort, but for the moment, it's way simpler.
Akronc84f00c2017-12-03 17:24:21 +010019
Akron83236072017-06-21 15:54:16 +020020# TODO:
21# Add a timeout! Just in case ...!
22
Akrone5166b02017-12-05 21:21:25 +010023# TODO:
Akroncb3a9172017-12-06 17:58:05 +010024# Merge warnings, errors, messages!
25
26# TODO:
Akrone5166b02017-12-05 21:21:25 +010027# Introduce max_rank_ref!
28
Akrone5166b02017-12-05 21:21:25 +010029# TODO:
Akron66485972017-12-07 19:53:14 +010030# This may use a concurrent priorityqueue
Akrone5166b02017-12-05 21:21:25 +010031
32# May be renamed to
33# - Krawfish::MultiSegment::*
34# - Krawfish::MultiNodes::*
35
36
Akron405cba02018-03-02 20:18:30 +010037use constant DEBUG => 0;
Akrone5166b02017-12-05 21:21:25 +010038
39
40# Constructor
Akron83236072017-06-21 15:54:16 +020041sub new {
42 my $class = shift;
Akrone5166b02017-12-05 21:21:25 +010043
Akron4a6fbed2017-12-08 12:54:43 +010044 # top_k, query, queries
Akron83236072017-06-21 15:54:16 +020045 my $self = bless {
Akrone5166b02017-12-05 21:21:25 +010046 @_
Akron83236072017-06-21 15:54:16 +020047 }, $class;
48
49
Akrone5166b02017-12-05 21:21:25 +010050 # Initialize match position
51 $self->{pos} = 0;
52 $self->{match} = undef;
53
Akron4a6fbed2017-12-08 12:54:43 +010054 return $self unless $self->{top_k};
Akroncb3a9172017-12-06 17:58:05 +010055
56 # Add criterion comparation method
Akrone5166b02017-12-05 21:21:25 +010057 $self->{prio} = Array::Queue::Priority->new(
58 sort_cb => sub {
59 my ($match_a, $match_b) = @_;
60
61 # List of criteria
62 my $crit_a = $match_a->[0]->sorted_by;
63 my $crit_b = $match_b->[0]->sorted_by;
64
65 # If the criterion is not defined on any level,
66 # the entry is below any set entry
67 return $crit_a->compare($crit_b);
68 }
Akron83236072017-06-21 15:54:16 +020069 );
Akroncb3a9172017-12-06 17:58:05 +010070
Akron83236072017-06-21 15:54:16 +020071 return $self;
72};
73
Akrona588d072017-10-13 14:45:34 +020074
Akron66485972017-12-07 19:53:14 +010075# Get maximum frequency
76sub max_freq {
77 my $self = shift;
78 my $max_freq;
Akron4a6fbed2017-12-08 12:54:43 +010079 foreach (@{$self->{queries}}) {
Akron66485972017-12-07 19:53:14 +010080 $max_freq += $_->max_freq
81 };
82 return $max_freq;
83};
84
Akron4a6fbed2017-12-08 12:54:43 +010085
Akrone5166b02017-12-05 21:21:25 +010086# Initially fill up the heap
87sub _init {
Akronef5931b2017-09-20 20:46:01 +020088 my $self = shift;
89
Akrone5166b02017-12-05 21:21:25 +010090 return if $self->{init}++;
91
92 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +010093 print_log('cmp_node', 'Initialize node response');
Akronef5931b2017-09-20 20:46:01 +020094 };
95
Akroncb3a9172017-12-06 17:58:05 +010096 # Priority queue if sorting is required, per default with size $n
97 my $prio = $self->{prio};
98
Akrone5166b02017-12-05 21:21:25 +010099 my $i = 0;
Akron4a6fbed2017-12-08 12:54:43 +0100100 my $n = scalar @{$self->{queries}};
Akronef5931b2017-09-20 20:46:01 +0200101
Akroncb3a9172017-12-06 17:58:05 +0100102 # Iterate over all segments - either for grouping
103 # or (in case of sorting) until the prio is full
Akrone5166b02017-12-05 21:21:25 +0100104 #
105 # TODO:
106 # This needs to be done in parallel, as the initial
107 # querying (+ sorting) can take quite a lot of time!
108 for (my $i = 0; $i < $n; $i++) {
Akron89440982017-07-28 14:48:28 +0200109
Akrone5166b02017-12-05 21:21:25 +0100110 # Get query from segment
Akron4a6fbed2017-12-08 12:54:43 +0100111 my $seg_q = $self->{queries}->[$i];
Akron89440982017-07-28 14:48:28 +0200112
Akroncb3a9172017-12-06 17:58:05 +0100113 # Do grouping!
114 unless ($prio) {
115
116 if (DEBUG) {
117 print_log('cmp_node', "Finalize query at channel $i");
118 };
119
120 # Search through all results
121 $seg_q->finalize;
122 next;
123 };
124
Akrone5166b02017-12-05 21:21:25 +0100125 # There is a next item from the segment
126 if ($seg_q->next) {
Akron83236072017-06-21 15:54:16 +0200127
Akrone5166b02017-12-05 21:21:25 +0100128 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100129 print_log('cmp_node', "Init query at channel $i");
Akrone5166b02017-12-05 21:21:25 +0100130 };
Akron83236072017-06-21 15:54:16 +0200131
Akrone5166b02017-12-05 21:21:25 +0100132 # Enqueue and remember the segment/channel
133 # TODO: enqueue
134 $prio->add([$seg_q->current_match, $i]);
Akron83236072017-06-21 15:54:16 +0200135
Akrone5166b02017-12-05 21:21:25 +0100136 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100137 print_log('cmp_node', "Added match " . $seg_q->current_match->to_string);
Akrone5166b02017-12-05 21:21:25 +0100138 };
139 }
Akron83236072017-06-21 15:54:16 +0200140
Akrone5166b02017-12-05 21:21:25 +0100141 # No next segment - remove segment from query processing
142 else {
Akron83236072017-06-21 15:54:16 +0200143
Akrone5166b02017-12-05 21:21:25 +0100144 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100145 print_log('cmp_node', "Remove query at channel $i");
Akrone5166b02017-12-05 21:21:25 +0100146 };
147
148 # Remove segment query
Akron4a6fbed2017-12-08 12:54:43 +0100149 splice(@{$self->{queries}}, $i, 1);
Akrone5166b02017-12-05 21:21:25 +0100150 # segment list is shortened
151 $i--;
152 $n--;
153 };
Akron83236072017-06-21 15:54:16 +0200154 };
155
Akroncb3a9172017-12-06 17:58:05 +0100156 return unless $self->{prio};
157
Akrone5166b02017-12-05 21:21:25 +0100158 # Resize the priority queue
159 # $prio->size($n);
160
161 if (DEBUG) {
162 print_log(
Akroncb3a9172017-12-06 17:58:05 +0100163 'cmp_node',
Akrone5166b02017-12-05 21:21:25 +0100164 'Array: ' . join(',', map { $_->[0]->to_string } @{$prio->queue})
165 );
166 };
167
Akroncb3a9172017-12-06 17:58:05 +0100168 # $self->{prio} = $prio;
Akron83236072017-06-21 15:54:16 +0200169};
170
171
Akrone5166b02017-12-05 21:21:25 +0100172# Get next match
Akron83236072017-06-21 15:54:16 +0200173sub next {
174 my $self = shift;
175
Akrone5166b02017-12-05 21:21:25 +0100176 $self->_init;
Akron83236072017-06-21 15:54:16 +0200177
Akrone5166b02017-12-05 21:21:25 +0100178 # There is no next
Akroncb3a9172017-12-06 17:58:05 +0100179 return if !$self->{prio} || $self->{pos} > $self->{top_k} -1;
Akrone5166b02017-12-05 21:21:25 +0100180
181 # Get next match from list
182 # TODO: dequeue
183 my $entry = $self->{prio}->remove;
184
185 # No more entries
186 unless ($entry) {
187
188 # Prevent further requests
189 $self->{pos} = $self->{top_k} + 1;
190 $self->{match} = undef;
191 return;
Akron83236072017-06-21 15:54:16 +0200192 };
193
Akrone5166b02017-12-05 21:21:25 +0100194 # Set match
195 $self->{match} = $entry->[0];
196
197 # Get channel
Akron4a6fbed2017-12-08 12:54:43 +0100198 my $channel = $self->{queries}->[$entry->[1]];
Akrone5166b02017-12-05 21:21:25 +0100199
200 # If the channel has more entries to come,
201 # add them to the priority queue
202 if ($channel->next) {
203 $self->{prio}->add([$channel->current_match, $entry->[1]]);
204 };
205
206 if (DEBUG) {
207 print_log(
Akroncb3a9172017-12-06 17:58:05 +0100208 'cmp_node',
Akrone5166b02017-12-05 21:21:25 +0100209 'Array: ' . join(',', map { $_->[0]->to_string } @{$self->{prio}->queue})
210 );
211 };
212
213 $self->{pos}++;
Akron83236072017-06-21 15:54:16 +0200214 return 1;
215};
216
217
Akrone5166b02017-12-05 21:21:25 +0100218# Return current match
Akron83236072017-06-21 15:54:16 +0200219sub current_match {
Akrone5166b02017-12-05 21:21:25 +0100220 return $_[0]->{match};
221};
222
223
224# Get merged result match
225# TODO:
226# May not be necessary
227sub compile {
Akron83236072017-06-21 15:54:16 +0200228 my $self = shift;
229
Akrone5166b02017-12-05 21:21:25 +0100230 $self->_init;
Akron83236072017-06-21 15:54:16 +0200231
Akrone5166b02017-12-05 21:21:25 +0100232 my $result = $self->result;
Akron83236072017-06-21 15:54:16 +0200233
Akroncb3a9172017-12-06 17:58:05 +0100234 print_log('cmp_node','Compile result') if DEBUG;
Akron83236072017-06-21 15:54:16 +0200235
Akrone5166b02017-12-05 21:21:25 +0100236 my $k = $self->{top_k};
Akron83236072017-06-21 15:54:16 +0200237
Akrone5166b02017-12-05 21:21:25 +0100238 # Get next match from list
239 # TODO: dequeue
Akroncb3a9172017-12-06 17:58:05 +0100240 while ($k-- > 0) {
Akrone5166b02017-12-05 21:21:25 +0100241 my $entry = $self->{prio}->remove;
Akron83236072017-06-21 15:54:16 +0200242
Akrone5166b02017-12-05 21:21:25 +0100243 # No more entries
244 last unless $entry;
245
246 $result->add_match($entry->[0]);
247
248 # Get channel
Akron4a6fbed2017-12-08 12:54:43 +0100249 my $channel = $self->{queries}->[$entry->[1]];
Akrone5166b02017-12-05 21:21:25 +0100250
251 # If the channel has more entries to come,
252 # add them to the priority queue
253 if ($channel->next) {
254 $self->{prio}->add(
255 [$channel->current_match, $entry->[1]]
256 );
257 };
Akron83236072017-06-21 15:54:16 +0200258 };
259
Akrone5166b02017-12-05 21:21:25 +0100260 # Because all queries were sorted on a first pass,
261 # there is no need to next() to the end for aggregation
262
263 # Merge all aggregation
264 $self->aggregate;
265
266 return $result;
Akron83236072017-06-21 15:54:16 +0200267};
268
Akrone5166b02017-12-05 21:21:25 +0100269
Akroncb3a9172017-12-06 17:58:05 +0100270# Group data
271sub group {
272 my $self = shift;
273
274 $self->_init;
275
276 if (DEBUG) {
277 print_log('cmp_node', 'Group data');
278 };
279
280 my $result = $self->result;
281
282 if (DEBUG && $result->{group}) {
283 print_log('cmp_node', 'Group is already done is already done');
284 };
285
286 # Aggregation already collected
287 return $result if $result->group;
288
289 # Iterate over all queries
Akron4a6fbed2017-12-08 12:54:43 +0100290 foreach my $seg_q (@{$self->{queries}}) {
Akroncb3a9172017-12-06 17:58:05 +0100291
292 # Check for compilation role
293 if (Role::Tiny::does_role($seg_q, 'Krawfish::Compile::Segment::Group')) {
294 if (DEBUG) {
295 print_log('cmp_node', 'Add result from ' . ref($seg_q));
296 };
297
298 # Merge aggregations
299 my $group = $seg_q->group;
300
301 if (DEBUG) {
302 use Data::Dumper;
303 print_log('cmp_node', 'Merge result: ' . ref($group) . ':' . $group->to_string);
304 };
305
306 # Merge group
307 $result->merge_group($group);
308
309 if (DEBUG) {
310 print_log('cmp_node', 'Groups merged');
311 };
312 };
313 };
314
315 return $result;
316};
317
318
Akrone5166b02017-12-05 21:21:25 +0100319# Get aggregation data only
Akrone5166b02017-12-05 21:21:25 +0100320sub aggregate {
321 my $self = shift;
322
323 $self->_init;
324
325 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100326 print_log('cmp_node', 'Aggregate data');
Akrone5166b02017-12-05 21:21:25 +0100327 };
328
329 my $result = $self->result;
330
331 if (DEBUG && @{$result->{aggregation}}) {
Akroncb3a9172017-12-06 17:58:05 +0100332 print_log('cmp_node', 'Aggregation is already done');
Akrone5166b02017-12-05 21:21:25 +0100333 };
334
335 # Aggregation already collected
336 return $result if @{$result->{aggregation}};
337
338 # Iterate over all queries
Akron4a6fbed2017-12-08 12:54:43 +0100339 foreach my $seg_q (@{$self->{queries}}) {
Akrone5166b02017-12-05 21:21:25 +0100340
341 # Check for compilation role
Akron15f1bc92017-12-06 13:41:07 +0100342 if (Role::Tiny::does_role($seg_q, 'Krawfish::Compile::Segment')) {
Akrone5166b02017-12-05 21:21:25 +0100343 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100344 print_log('cmp_node', 'Add result from ' . ref($seg_q));
Akrone5166b02017-12-05 21:21:25 +0100345 };
346
347 # Merge aggregations
348 my $aggregate = $seg_q->aggregate;
349 if (DEBUG) {
350 use Data::Dumper;
Akroncb3a9172017-12-06 17:58:05 +0100351 print_log('cmp_node', 'Merge result ' . $aggregate->to_string);
Akrone5166b02017-12-05 21:21:25 +0100352 };
353 $result->merge_aggregation($aggregate);
354
355 if (DEBUG) {
Akroncb3a9172017-12-06 17:58:05 +0100356 print_log('cmp_node', 'Result merged');
Akrone5166b02017-12-05 21:21:25 +0100357 };
358 };
359 };
360
361 return $result;
362};
363
364
Akron4a6fbed2017-12-08 12:54:43 +0100365# Prefetch final results
366sub prefetch {
367 # TODO:
368 # In case there are enrich methods,
369 # it can be beneficial to enrich the first x matches before
370 # the cluster resend the request to the nodes and the segments.
371 # so - the node may send the enrich request to the segments
372 # after the last successfull current_match with a certain number
373 # of matches to prefetch. When this is done, the segments
374 # can go through the top X results and prefetch snippets
375 # etc. while the node and the cluster is busy merging the result.
376 ...
377};
378
379
Akrone5166b02017-12-05 21:21:25 +0100380# stringification
381sub to_string {
382 my ($self, $id) = @_;
383 my $str = 'node(';
Akron4a6fbed2017-12-08 12:54:43 +0100384 $str .= join(';', map { $_->to_string($id) } @{$self->{queries}});
Akrone5166b02017-12-05 21:21:25 +0100385 $str .= ')';
386};
387
388
Akron83236072017-06-21 15:54:16 +02003891;
Akrone5166b02017-12-05 21:21:25 +0100390
391
392__END__