Added experimental support für parallel processing
Change-Id: I6c80e63a49da915510f26f2eb58fa4f5101a16db
diff --git a/script/korapxml2krill_dir b/script/korapxml2krill_dir
index 6293a7d..b955dcf 100644
--- a/script/korapxml2krill_dir
+++ b/script/korapxml2krill_dir
@@ -3,12 +3,14 @@
use warnings;
use lib 'lib';
use FindBin;
-use File::Temp qw/tempdir/;
+use File::Temp;
use File::Spec::Functions qw/catfile catdir/;
use Getopt::Long;
use Directory::Iterator;
use KorAP::XML::Krill;
use KorAP::XML::Archive;
+use Benchmark qw/:hireswallclock/;
+use Parallel::ForkManager;
my $local = $FindBin::Bin;
@@ -29,6 +31,7 @@
# 2016/02/15
# - Fixed temporary directory bug
# - Improved skipping before unzipping
+# - Added EXPERIMENTAL concurrency support
sub printversion {
print "Version " . $KorAP::XML::Krill::VERSION . "\n\n";
@@ -59,6 +62,8 @@
combining the foundry name with a # and the layer name.
--primary|-p Output primary data or not. Defaults to true.
Can be flagged using --no-primary as well.
+ --jobs|-j Define the number of concurrent jobs in seperated forks,
+ defaults to 0. This is EXPERIMENTAL!
--human|-m Represent the data human friendly,
while the output defaults to JSON
--pretty|-y Pretty print json output
@@ -76,7 +81,9 @@
};
my ($input, $output, $text, $gzip, $log_level, @skip,
- $token_base, $primary, @allow, $pretty, $overwrite);
+ $token_base, $primary, @allow, $pretty,
+ $overwrite);
+my $jobs = 0;
GetOptions(
'input|i=s' => \$input,
'output|o=s' => \$output,
@@ -89,6 +96,7 @@
'allow|a=s' => \@allow,
'primary|p!' => \$primary,
'pretty|y' => \$pretty,
+ 'jobs|j=i' => \$jobs,
'help|h' => sub { printhelp },
'version|v' => sub { printversion }
);
@@ -108,7 +116,10 @@
my $anno = shift;
my $file = get_file_name($anno);
- my $call = 'perl ' . $local . '/korapxml2krill -i ' . $anno . ' -o ' . $output . '/' . $file . '.json';
+ # TODO: This should be done directly with a data structure! KorAP::XML::Wrap
+
+ my $call = 'perl ' . $local . '/korapxml2krill -i ' .
+ $anno . ' -o ' . $output . '/' . $file . '.json';
$call .= '.gz -z' if $gzip;
$call .= ' -m' if $text;
$call .= ' -w' if $overwrite;
@@ -118,16 +129,37 @@
$call .= ' -y ' . $pretty if $pretty;
$call .= ' -a ' . $_ foreach @allow;
$call .= ' -s ' . $_ foreach @skip;
- print "$file ";
system($call);
- print "\n";
+ return "$file";
};
+# Zero means: everything runs in the parent process
+my $pool = Parallel::ForkManager->new($jobs);
+
+my $count = 0;
+my $iter = 0;
+
+# Report on fork message
+$pool->run_on_finish (
+ sub {
+ my ($pid, $code) = shift;
+ my $data = pop;
+ print 'Convert ['. ($jobs > 0 ? "$pid:" : '') .
+ ($iter++) . "/$count]" .
+ ($code ? " $code" : '') .
+ " $$data\n";
+ }
+);
+
+my $t;
+print "Reading data ...\n";
+
# Input is a directory
if (-d $input) {
my $it = Directory::Iterator->new($input);
my @dirs;
my $dir;
+
while (1) {
if (!$it->is_directory && ($dir = $it->get) && $dir =~ s{/data\.xml$}{}) {
push @dirs, $dir;
@@ -136,10 +168,32 @@
last unless $it->next;
};
- my $count = scalar @dirs;
+ print "Start processing ...\n";
+ $t = Benchmark->new;
+ $count = scalar @dirs;
+
+ DIRECTORY_LOOP:
for (my $i = 0; $i < $count; $i++) {
- print 'Convert [' . ($i + 1) . "/$count] ";
- write_file($dirs[$i]);
+
+ unless ($overwrite) {
+ my $filename = catfile(
+ $output,
+ get_file_name($dirs[$i]) . '.json' . ($gzip ? '.gz' : '')
+ );
+
+ if (-e $filename) {
+ $iter++;
+ print "Skip $filename\n";
+ next;
+ };
+ };
+
+ # Get the next fork
+ my $pid = $pool->start and next DIRECTORY_LOOP;
+ my $msg;
+
+ $msg = write_file($dirs[$i]);
+ $pool->finish(0, \$msg);
};
}
@@ -155,29 +209,38 @@
exit(1);
};
+ print "Start processing ...\n";
+ $t = Benchmark->new;
my @dirs = $archive->list_texts;
- my $count = scalar @dirs;
+ $count = scalar @dirs;
+
+ ARCHIVE_LOOP:
for (my $i = 0; $i < $count; $i++) {
- print 'Convert [' . ($i + 1) . "/$count] ";
# Split path information
my ($prefix, $corpus, $doc, $text) = $archive->split_path($dirs[$i]);
unless ($overwrite) {
-
my $filename = catfile(
$output,
get_file_name(catdir($doc, $text)) . '.json' . ($gzip ? '.gz' : '')
);
+
if (-e $filename) {
+ $iter++;
print "Skip $filename\n";
next;
};
};
+ # Get the next fork
+ my $pid = $pool->start and next ARCHIVE_LOOP;
+
# Create temporary file
my $temp = File::Temp->newdir;
+ my $msg;
+
# Extract from archive
if ($archive->extract($dirs[$i], $temp)) {
@@ -188,13 +251,17 @@
my $dir = catdir($input, $doc, $text);
# Write file
- write_file($dir);
+ $msg = write_file($dir);
+
+ $temp = undef;
+ $pool->finish(0, \$msg);
}
else {
- print "Unable to extract " . $dirs[$i] . "\n";
- };
- $temp = undef;
+ $temp = undef;
+ $msg = "Unable to extract " . $dirs[$i] . "\n";
+ $pool->finish(1, \$msg);
+ };
};
}
@@ -202,5 +269,8 @@
print "Input is neither a directory nor an archive.\n\n";
};
+$pool->wait_all_children;
+
+print timestr(timediff(Benchmark->new, $t))."\n\n";
__END__