Added experimental support für parallel processing
Change-Id: I6c80e63a49da915510f26f2eb58fa4f5101a16db
diff --git a/Changes b/Changes
index e0477b1..3458b08 100644
--- a/Changes
+++ b/Changes
@@ -1,3 +1,6 @@
+0.10 2016-02-15
+ - Added EXPERIMENTAL support for parallel jobs.
+
0.09 2016-02-15
- Fixed temporary directory handling in scripts.
- Improved skipping for archive handling in scripts.
diff --git a/Makefile.PL b/Makefile.PL
index 7bfcb5e..73832f7 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -20,6 +20,7 @@
'Try::Tiny' => 0.21,
'Array::IntSpan' => 2.003,
'List::MoreUtils' => 0.33,
+ 'Parallel::ForkManager' => 1.17,
'IO::Dir::Recursive' => 0.03,
'File::Temp' => 0,
'Directory::Iterator' => 0,
diff --git a/lib/KorAP/XML/Archive.pm b/lib/KorAP/XML/Archive.pm
index d8dbc27..3090155 100644
--- a/lib/KorAP/XML/Archive.pm
+++ b/lib/KorAP/XML/Archive.pm
@@ -126,6 +126,7 @@
1;
+
__END__
=POD
diff --git a/lib/KorAP/XML/Krill.pm b/lib/KorAP/XML/Krill.pm
index c11588c..c7e8793 100644
--- a/lib/KorAP/XML/Krill.pm
+++ b/lib/KorAP/XML/Krill.pm
@@ -17,7 +17,7 @@
# Due to the kind of processing, processed metadata may be stored in
# a multiprocess cache instead.
-our $VERSION = '0.09';
+our $VERSION = '0.10';
our @ATTR = qw/text_sigle
doc_sigle
diff --git a/script/korapxml2krill_dir b/script/korapxml2krill_dir
index 6293a7d..b955dcf 100644
--- a/script/korapxml2krill_dir
+++ b/script/korapxml2krill_dir
@@ -3,12 +3,14 @@
use warnings;
use lib 'lib';
use FindBin;
-use File::Temp qw/tempdir/;
+use File::Temp;
use File::Spec::Functions qw/catfile catdir/;
use Getopt::Long;
use Directory::Iterator;
use KorAP::XML::Krill;
use KorAP::XML::Archive;
+use Benchmark qw/:hireswallclock/;
+use Parallel::ForkManager;
my $local = $FindBin::Bin;
@@ -29,6 +31,7 @@
# 2016/02/15
# - Fixed temporary directory bug
# - Improved skipping before unzipping
+# - Added EXPERIMENTAL concurrency support
sub printversion {
print "Version " . $KorAP::XML::Krill::VERSION . "\n\n";
@@ -59,6 +62,8 @@
combining the foundry name with a # and the layer name.
--primary|-p Output primary data or not. Defaults to true.
Can be flagged using --no-primary as well.
+ --jobs|-j Define the number of concurrent jobs in seperated forks,
+ defaults to 0. This is EXPERIMENTAL!
--human|-m Represent the data human friendly,
while the output defaults to JSON
--pretty|-y Pretty print json output
@@ -76,7 +81,9 @@
};
my ($input, $output, $text, $gzip, $log_level, @skip,
- $token_base, $primary, @allow, $pretty, $overwrite);
+ $token_base, $primary, @allow, $pretty,
+ $overwrite);
+my $jobs = 0;
GetOptions(
'input|i=s' => \$input,
'output|o=s' => \$output,
@@ -89,6 +96,7 @@
'allow|a=s' => \@allow,
'primary|p!' => \$primary,
'pretty|y' => \$pretty,
+ 'jobs|j=i' => \$jobs,
'help|h' => sub { printhelp },
'version|v' => sub { printversion }
);
@@ -108,7 +116,10 @@
my $anno = shift;
my $file = get_file_name($anno);
- my $call = 'perl ' . $local . '/korapxml2krill -i ' . $anno . ' -o ' . $output . '/' . $file . '.json';
+ # TODO: This should be done directly with a data structure! KorAP::XML::Wrap
+
+ my $call = 'perl ' . $local . '/korapxml2krill -i ' .
+ $anno . ' -o ' . $output . '/' . $file . '.json';
$call .= '.gz -z' if $gzip;
$call .= ' -m' if $text;
$call .= ' -w' if $overwrite;
@@ -118,16 +129,37 @@
$call .= ' -y ' . $pretty if $pretty;
$call .= ' -a ' . $_ foreach @allow;
$call .= ' -s ' . $_ foreach @skip;
- print "$file ";
system($call);
- print "\n";
+ return "$file";
};
+# Zero means: everything runs in the parent process
+my $pool = Parallel::ForkManager->new($jobs);
+
+my $count = 0;
+my $iter = 0;
+
+# Report on fork message
+$pool->run_on_finish (
+ sub {
+ my ($pid, $code) = shift;
+ my $data = pop;
+ print 'Convert ['. ($jobs > 0 ? "$pid:" : '') .
+ ($iter++) . "/$count]" .
+ ($code ? " $code" : '') .
+ " $$data\n";
+ }
+);
+
+my $t;
+print "Reading data ...\n";
+
# Input is a directory
if (-d $input) {
my $it = Directory::Iterator->new($input);
my @dirs;
my $dir;
+
while (1) {
if (!$it->is_directory && ($dir = $it->get) && $dir =~ s{/data\.xml$}{}) {
push @dirs, $dir;
@@ -136,10 +168,32 @@
last unless $it->next;
};
- my $count = scalar @dirs;
+ print "Start processing ...\n";
+ $t = Benchmark->new;
+ $count = scalar @dirs;
+
+ DIRECTORY_LOOP:
for (my $i = 0; $i < $count; $i++) {
- print 'Convert [' . ($i + 1) . "/$count] ";
- write_file($dirs[$i]);
+
+ unless ($overwrite) {
+ my $filename = catfile(
+ $output,
+ get_file_name($dirs[$i]) . '.json' . ($gzip ? '.gz' : '')
+ );
+
+ if (-e $filename) {
+ $iter++;
+ print "Skip $filename\n";
+ next;
+ };
+ };
+
+ # Get the next fork
+ my $pid = $pool->start and next DIRECTORY_LOOP;
+ my $msg;
+
+ $msg = write_file($dirs[$i]);
+ $pool->finish(0, \$msg);
};
}
@@ -155,29 +209,38 @@
exit(1);
};
+ print "Start processing ...\n";
+ $t = Benchmark->new;
my @dirs = $archive->list_texts;
- my $count = scalar @dirs;
+ $count = scalar @dirs;
+
+ ARCHIVE_LOOP:
for (my $i = 0; $i < $count; $i++) {
- print 'Convert [' . ($i + 1) . "/$count] ";
# Split path information
my ($prefix, $corpus, $doc, $text) = $archive->split_path($dirs[$i]);
unless ($overwrite) {
-
my $filename = catfile(
$output,
get_file_name(catdir($doc, $text)) . '.json' . ($gzip ? '.gz' : '')
);
+
if (-e $filename) {
+ $iter++;
print "Skip $filename\n";
next;
};
};
+ # Get the next fork
+ my $pid = $pool->start and next ARCHIVE_LOOP;
+
# Create temporary file
my $temp = File::Temp->newdir;
+ my $msg;
+
# Extract from archive
if ($archive->extract($dirs[$i], $temp)) {
@@ -188,13 +251,17 @@
my $dir = catdir($input, $doc, $text);
# Write file
- write_file($dir);
+ $msg = write_file($dir);
+
+ $temp = undef;
+ $pool->finish(0, \$msg);
}
else {
- print "Unable to extract " . $dirs[$i] . "\n";
- };
- $temp = undef;
+ $temp = undef;
+ $msg = "Unable to extract " . $dirs[$i] . "\n";
+ $pool->finish(1, \$msg);
+ };
};
}
@@ -202,5 +269,8 @@
print "Input is neither a directory nor an archive.\n\n";
};
+$pool->wait_all_children;
+
+print timestr(timediff(Benchmark->new, $t))."\n\n";
__END__