Added experimental support für parallel processing

Change-Id: I6c80e63a49da915510f26f2eb58fa4f5101a16db
diff --git a/Changes b/Changes
index e0477b1..3458b08 100644
--- a/Changes
+++ b/Changes
@@ -1,3 +1,6 @@
+0.10 2016-02-15
+        - Added EXPERIMENTAL support for parallel jobs.
+
 0.09 2016-02-15
         - Fixed temporary directory handling in scripts.
 	- Improved skipping for archive handling in scripts.
diff --git a/Makefile.PL b/Makefile.PL
index 7bfcb5e..73832f7 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -20,6 +20,7 @@
     'Try::Tiny'      => 0.21,
     'Array::IntSpan' => 2.003,
     'List::MoreUtils' => 0.33,
+    'Parallel::ForkManager' => 1.17,
     'IO::Dir::Recursive' => 0.03,
     'File::Temp'     => 0,
     'Directory::Iterator' => 0,
diff --git a/lib/KorAP/XML/Archive.pm b/lib/KorAP/XML/Archive.pm
index d8dbc27..3090155 100644
--- a/lib/KorAP/XML/Archive.pm
+++ b/lib/KorAP/XML/Archive.pm
@@ -126,6 +126,7 @@
 
 1;
 
+
 __END__
 
 =POD
diff --git a/lib/KorAP/XML/Krill.pm b/lib/KorAP/XML/Krill.pm
index c11588c..c7e8793 100644
--- a/lib/KorAP/XML/Krill.pm
+++ b/lib/KorAP/XML/Krill.pm
@@ -17,7 +17,7 @@
 #       Due to the kind of processing, processed metadata may be stored in
 #       a multiprocess cache instead.
 
-our $VERSION = '0.09';
+our $VERSION = '0.10';
 
 our @ATTR = qw/text_sigle
 	       doc_sigle
diff --git a/script/korapxml2krill_dir b/script/korapxml2krill_dir
index 6293a7d..b955dcf 100644
--- a/script/korapxml2krill_dir
+++ b/script/korapxml2krill_dir
@@ -3,12 +3,14 @@
 use warnings;
 use lib 'lib';
 use FindBin;
-use File::Temp qw/tempdir/;
+use File::Temp;
 use File::Spec::Functions qw/catfile catdir/;
 use Getopt::Long;
 use Directory::Iterator;
 use KorAP::XML::Krill;
 use KorAP::XML::Archive;
+use Benchmark qw/:hireswallclock/;
+use Parallel::ForkManager;
 
 my $local = $FindBin::Bin;
 
@@ -29,6 +31,7 @@
 # 2016/02/15
 # - Fixed temporary directory bug
 # - Improved skipping before unzipping
+# - Added EXPERIMENTAL concurrency support
 
 sub printversion {
   print "Version " . $KorAP::XML::Krill::VERSION . "\n\n";
@@ -59,6 +62,8 @@
                                   combining the foundry name with a # and the layer name.
   --primary|-p                    Output primary data or not. Defaults to true.
                                   Can be flagged using --no-primary as well.
+  --jobs|-j                       Define the number of concurrent jobs in seperated forks,
+                                  defaults to 0. This is EXPERIMENTAL!
   --human|-m                      Represent the data human friendly,
                                   while the output defaults to JSON
   --pretty|-y                     Pretty print json output
@@ -76,7 +81,9 @@
 };
 
 my ($input, $output, $text, $gzip, $log_level, @skip,
-    $token_base, $primary, @allow, $pretty, $overwrite);
+    $token_base, $primary, @allow, $pretty,
+    $overwrite);
+my $jobs = 0;
 GetOptions(
   'input|i=s'   => \$input,
   'output|o=s'  => \$output,
@@ -89,6 +96,7 @@
   'allow|a=s'   => \@allow,
   'primary|p!'  => \$primary,
   'pretty|y'    => \$pretty,
+  'jobs|j=i'    => \$jobs,
   'help|h'      => sub { printhelp },
   'version|v'   => sub { printversion }
 );
@@ -108,7 +116,10 @@
   my $anno = shift;
   my $file = get_file_name($anno);
 
-  my $call = 'perl ' . $local . '/korapxml2krill -i ' . $anno . ' -o ' . $output . '/' . $file . '.json';
+  # TODO: This should be done directly with a data structure! KorAP::XML::Wrap
+
+  my $call = 'perl ' . $local . '/korapxml2krill -i ' .
+    $anno . ' -o ' . $output . '/' . $file . '.json';
   $call .= '.gz -z' if $gzip;
   $call .= ' -m' if $text;
   $call .= ' -w' if $overwrite;
@@ -118,16 +129,37 @@
   $call .= ' -y ' . $pretty if $pretty;
   $call .= ' -a ' . $_ foreach @allow;
   $call .= ' -s ' . $_ foreach @skip;
-  print "$file ";
   system($call);
-  print "\n";
+  return "$file";
 };
 
+# Zero means: everything runs in the parent process
+my $pool = Parallel::ForkManager->new($jobs);
+
+my $count = 0;
+my $iter = 0;
+
+# Report on fork message
+$pool->run_on_finish (
+  sub {
+    my ($pid, $code) = shift;
+    my $data = pop;
+    print 'Convert ['. ($jobs > 0 ? "$pid:" : '') .
+      ($iter++) . "/$count]" .
+	($code ? " $code" : '') .
+	  " $$data\n";
+  }
+);
+
+my $t;
+print "Reading data ...\n";
+
 # Input is a directory
 if (-d $input) {
   my $it = Directory::Iterator->new($input);
   my @dirs;
   my $dir;
+
   while (1) {
     if (!$it->is_directory && ($dir = $it->get) && $dir =~ s{/data\.xml$}{}) {
       push @dirs, $dir;
@@ -136,10 +168,32 @@
     last unless $it->next;
   };
 
-  my $count = scalar @dirs;
+  print "Start processing ...\n";
+  $t = Benchmark->new;
+  $count = scalar @dirs;
+
+ DIRECTORY_LOOP:
   for (my $i = 0; $i < $count; $i++) {
-    print 'Convert [' . ($i + 1) . "/$count] ";
-    write_file($dirs[$i]);
+
+    unless ($overwrite) {
+      my $filename = catfile(
+	$output,
+	get_file_name($dirs[$i]) . '.json' . ($gzip ? '.gz' : '')
+      );
+
+      if (-e $filename) {
+	$iter++;
+	print "Skip $filename\n";
+	next;
+      };
+    };
+
+    # Get the next fork
+    my $pid = $pool->start and next DIRECTORY_LOOP;
+    my $msg;
+
+    $msg = write_file($dirs[$i]);
+    $pool->finish(0, \$msg);
   };
 }
 
@@ -155,29 +209,38 @@
     exit(1);
   };
 
+  print "Start processing ...\n";
+  $t = Benchmark->new;
   my @dirs = $archive->list_texts;
-  my $count = scalar @dirs;
+  $count = scalar @dirs;
+
+ ARCHIVE_LOOP:
   for (my $i = 0; $i < $count; $i++) {
-    print 'Convert [' . ($i + 1) . "/$count] ";
 
     # Split path information
     my ($prefix, $corpus, $doc, $text) = $archive->split_path($dirs[$i]);
 
     unless ($overwrite) {
-
       my $filename = catfile(
 	$output,
 	get_file_name(catdir($doc, $text)) . '.json' . ($gzip ? '.gz' : '')
       );
+
       if (-e $filename) {
+	$iter++;
 	print "Skip $filename\n";
 	next;
       };
     };
 
+    # Get the next fork
+    my $pid = $pool->start and next ARCHIVE_LOOP;
+
     # Create temporary file
     my $temp = File::Temp->newdir;
 
+    my $msg;
+
     # Extract from archive
     if ($archive->extract($dirs[$i], $temp)) {
 
@@ -188,13 +251,17 @@
       my $dir = catdir($input, $doc, $text);
 
       # Write file
-      write_file($dir);
+      $msg = write_file($dir);
+
+      $temp = undef;
+      $pool->finish(0, \$msg);
     }
     else {
-      print "Unable to extract " . $dirs[$i] . "\n";
-    };
 
-    $temp = undef;
+      $temp = undef;
+      $msg = "Unable to extract " . $dirs[$i] . "\n";
+      $pool->finish(1, \$msg);
+    };
   };
 }
 
@@ -202,5 +269,8 @@
   print "Input is neither a directory nor an archive.\n\n";
 };
 
+$pool->wait_all_children;
+
+print timestr(timediff(Benchmark->new, $t))."\n\n";
 
 __END__