blob: a53c6e4281fc0f2c7777d5b7f934282c1e5c88c9 [file] [log] [blame]
package Krawfish::Result::Node;
use strict;
use warnings;
# Create a class for result aggregation on the node level.
# TODO:
# Add a timeout! Just in case ...!
sub new {
my $class = shift;
my $self = bless {
aggregation => [], # Array of aggregation methods
# The sorting criteria for the buffer
sorting => [],
query => undef,
indexes => [],
data => undef,
groups => {}, # Store all groups
}, $class;
# Building a merge sorting buffer
$self->{buffer} = Krawfish::Result::Node::SortingBuffer->new(
scalar @{$self->{indexes}}, # Size of the buffer
$self->{sorting} # Criteria for sorting
);
return $self;
};
# Overwrite process_head and pass to deeper query
sub process_head {
my ($self, $head) = @_;
$_[0]->{query}->process_head($head);
return;
};
sub buffer {
return $_[0]->{buffer};
};
# Open all channels and send the query
# Initially this will return all aggregate data and,
# in case it is a group query, all groups.
sub open {
my $self = shift;
foreach my $index (@{$self->{indexes}}) {
next if $index->is_closed;
# Send query to all channels in parallel
# Note if one index is not available
$index->send(
$self->{query} => sub {
# Get the initial aggregation data -
# aggregate in new data hash
my ($aggregates, $groups) = @_;
$self->aggregation($aggregates);
$self->grouping($groups);
});
};
return 1;
};
# Go to next position
sub next {
my $self = shift;
# Get next element from buffer
$self->{current} = $self->buffer->shift;
unless ($self->{current}) {
$self->{current} = $self->_next_current or return;
};
return 1;
};
# Fill the buffer with the next matches,
# sorted by the criterion.
# close indexes no longer needed.
sub _next_current {
my $self = shift;
# TODO:
# this does not work for indexes with sorted items like:
#
# 1. 1 8 9
# 2. 2 4 5 <- here, the 4 is before 5!
# 3. 5 6 8
#
foreach my $index (@{$self->{indexes}}) {
next if $index->is_closed;
# remember index to get current_match
# TODO:
# Probably use next_current
my $current = $index->next_current;
# No more matches from this index
unless ($current) {
# Close the index
$index->close;
next;
};
$self->buffer->push($index, $current);
};
return $self->buffer->pop;
};
# Close all indexes still open
sub close_all {
my $self = shift;
# Iterate over all indexes
foreach my $index (@{$self->{indexes}}) {
# Index already closed
next if $index->is_closed;
# Close index
$index->close();
};
# Remove all indexes
$self->{indexes} = [];
};
# Get current
sub current {
my $self = shift;
# Get current field if available
my $current = $self->{current}->[1] or return;
# TODO:
# Rewrite criterion to criterion strings!
};
# Get current match (including criterion string
sub current_match {
my $self = shift;
# Get current field if available
my $current_index = $self->{current}->[0] or return;
# This should never haben, because it calls a closed index
return if $current_index->is_closed;
# Return the match
return $current_index->current_match;
};
# Aggregate all data using the aggregation mechanisms
sub aggregate {
my ($self, $data) = @_;
foreach my $aggr (@{$self->{aggregation}}) {
$aggr->aggregate($data);
};
return 1;
};
1;