blob: c42ea5163c17eb3526a4f4f08d03e42f9c2b9924 [file] [log] [blame]
Akron8b990522016-07-06 16:45:57 +02001package KorAP::XML::ForkPool;
2use strict;
3use warnings;
4use Parallel::ForkManager;
Akronc11f7982017-02-21 21:20:14 +01005use v5.10;
6use Sys::Info;
7use Sys::Info::Constants qw( :device_cpu );
Akron8b990522016-07-06 16:45:57 +02008
9# Construct a new fork pool
10sub new {
11 my $class = shift;
12 my %param = @_;
13
14 bless {
15 jobs => $param{jobs} // 0,
16 iter => 1, # Current text in process
17 overwrite => $param{overwrite},
18 output => $param{output},
19 cache => $param{cache}
20 }, $class;
21};
22
23sub new_pool {
24 my $self = shift;
25
26 # Zero means: everything runs in the parent process
27 my $pool = Parallel::ForkManager->new($self->{jobs});
28
29 # Report per processed text
30 $pool->run_on_finish(
31 sub {
32 my ($pid, $code) = @_;
33 my $data = pop;
34
35 print 'Convert [' . ($self->{jobs} > 0 ? "\$$pid:" : '') .
Akronc11f7982017-02-21 21:20:14 +010036 ($self->{iter}++) . '/' . $self->{count} . ']';
Akron8b990522016-07-06 16:45:57 +020037 print ($code ? " $code" : '') . " $$data\n";
38 }
39 );
40
41 return $pool;
42};
43
44sub process_directory {
45 my $self = shift;
46 my $input = shift;
47
48 my $pool = $self->new_pool;
49
50 print "Reading data ...\n";
51
52 my $it = Directory::Iterator->new($input);
53 my @dirs;
54 my $dir;
55
56 while (1) {
57 if (!$it->is_directory && ($dir = $it->get) && $dir =~ s{/data\.xml$}{}) {
58 push @dirs, $dir;
59 $it->prune;
60 };
61 last unless $it->next;
62 };
63
64 $self->{count} = scalar @dirs;
65
66 DIRECTORY_LOOP:
67 for (my $i = 0; $i < $count; $i++) {
68
69 unless ($self->{overwrite}) {
70 my $filename = catfile(
Akronc11f7982017-02-21 21:20:14 +010071 $output,
72 get_file_name($dirs[$i]) . '.json' . ($gzip ? '.gz' : '')
Akron8b990522016-07-06 16:45:57 +020073 );
74
75 if (-e $filename) {
Akronc11f7982017-02-21 21:20:14 +010076 $iter++;
77 print "Skip $filename\n";
78 next;
Akron8b990522016-07-06 16:45:57 +020079 };
80 };
81
82 # Get the next fork
83 my $pid = $pool->start and next DIRECTORY_LOOP;
84 my $msg;
85 $msg = write_file($dirs[$i]);
86 $pool->finish(0, \$msg);
87 };
88
89 $pool->wait_all_children;
90
91 # Delete cache file
92 unlink($cache_file) if $cache_delete;
93};
94
95
96sub process_archive {
97 my $self = shift;
98 my $archive = shift;
99 my @input = @_;
100
101 unless ($archive->test_unzip) {
102 print "Unzip is not installed or incompatible.\n\n";
103 exit(1);
104 };
105
106 # Add further annotation archived
107 $archive->attach($_) foreach @input;
108
109 print "Start processing ...\n";
110
111 my @dirs = $archive->list_texts;
112 $self->{count} = scalar @dirs;
113
114 # Creae new pool
115 my $pool = $self->new_pool;
116
117 ARCHIVE_LOOP:
118 for (my $i = 0; $i < $count; $i++) {
119
120 # Split path information
121 my ($prefix, $corpus, $doc, $text) = $archive->split_path($dirs[$i]);
122
123 unless ($self->{overwrite}) {
124
125 my $filename = catfile(
Akronc11f7982017-02-21 21:20:14 +0100126 $output,
127 get_file_name(
128 catfile($corpus, $doc, $text)
129 . '.json' . ($gzip ? '.gz' : '')
130 )
Akron8b990522016-07-06 16:45:57 +0200131 );
132
133 if (-e $filename) {
Akronc11f7982017-02-21 21:20:14 +0100134 $iter++;
135 print "Skip $filename\n";
136 next;
Akron8b990522016-07-06 16:45:57 +0200137 };
138 };
139
140 # Get the next fork
141 my $pid = $pool->start and next ARCHIVE_LOOP;
142
143 # Create temporary file
144 my $temp = File::Temp->newdir;
145
146 my $msg;
147
148 # Extract from archive
149 if ($archive->extract($dirs[$i], $temp)) {
150
151 # Create corpus directory
152 my $input = catdir("$temp", $corpus);
153
154 # Temporary directory
155 my $dir = catdir($input, $doc, $text);
156
157 # Write file
158 $msg = write_file($dir);
159 $temp = undef;
160 $pool->finish(0, \$msg);
161 }
162 else {
163 $temp = undef;
164 $msg = "Unable to extract " . $dirs[$i] . "\n";
165 $pool->finish(1, \$msg);
166 };
167 };
168
169 $pool->wait_all_children;
170
171 # Delete cache file
172 unlink($cache_file) if $cache_delete;
173};
174
175
1761;