daza | cd9cf01 | 2020-08-31 17:19:11 +0200 | [diff] [blame^] | 1 | # TODO: write a client to make multiple requests to the server! |
| 2 | import subprocess, json, time |
| 3 | import requests, glob, logging |
| 4 | import os.path, sys |
| 5 | from CoNLL_Annotation import get_annotation, CoNLLUP_Token |
| 6 | |
| 7 | # TODO: Add logging instead of Prints! |
| 8 | |
| 9 | DEREKO_DIR = "/export/netapp/kupietz/N-GRAMM-STUDIE/conllu/" |
| 10 | |
| 11 | def get_filenames(data_dir): |
| 12 | filenames = [] |
| 13 | for filepath in glob.iglob(f'{data_dir}/*.conllu.gz', recursive=False): |
| 14 | fname = filepath.split("/")[-1] |
| 15 | filenames.append(filepath) |
| 16 | return sorted(filenames) |
| 17 | |
| 18 | |
| 19 | def expand_file(f): |
| 20 | # Expand the .tgz file |
| 21 | fname = f[:-3] |
| 22 | if not os.path.isfile(fname): |
| 23 | p = subprocess.call(f"gunzip -c {f} > {fname}", shell=True) |
| 24 | if p == 0: |
| 25 | logger.info("Successfully uncompressed file") |
| 26 | else: |
| 27 | logger.info(f"Couldn't expand file {f}") |
| 28 | raise Exception |
| 29 | else: |
| 30 | logger.info(f"File {fname} is already uncompressed. Skipping this step...") |
| 31 | |
| 32 | # Substitute the Commentary Lines on the Expanded file |
| 33 | fixed_filename = f"{fname}.fixed" |
| 34 | p = subprocess.call(f"sed 's/^# /###C: /g' {fname}", shell=True, stdout=open(fixed_filename, "w")) # stdout=subprocess.PIPE |
| 35 | if p == 0: |
| 36 | logger.info("Successfully fixed comments on file") |
| 37 | else: |
| 38 | logger.info(f"Something went wrong when substituting commentaries") |
| 39 | raise Exception |
| 40 | return fixed_filename |
| 41 | |
| 42 | |
| 43 | |
| 44 | def _file_generator(file_path): |
| 45 | with open(file_path, "r") as data_file: |
| 46 | logger.info("Reading instances from lines in file at: %s", file_path) |
| 47 | for line in data_file: |
| 48 | if not line: continue |
| 49 | yield line |
| 50 | |
| 51 | |
| 52 | def read_conll(line_generator, chunk_size): |
| 53 | n_sents = 0 |
| 54 | annotated_sentences, buffer_meta, buffer_lst = [], [], [] |
| 55 | for i, line in enumerate(line_generator): |
| 56 | if n_sents == chunk_size: break |
| 57 | if line.startswith("###C:"): |
| 58 | buffer_meta.append(line) |
| 59 | continue |
| 60 | if len(line.split()) > 0: |
| 61 | buffer_lst.append(line) |
| 62 | else: |
| 63 | ann = get_annotation(buffer_lst, buffer_meta) |
| 64 | n_sents += 1 |
| 65 | buffer_lst, buffer_meta = [], [] |
| 66 | annotated_sentences.append(ann) |
| 67 | # logger.info("Read {} Sentences!".format(n_sents)) |
| 68 | return annotated_sentences, n_sents |
| 69 | |
| 70 | |
| 71 | |
| 72 | def get_file_chunk(line_generator, chunk_size): |
| 73 | file_has_next = True |
| 74 | chunk, n_sents = read_conll(line_generator, chunk_size) |
| 75 | if n_sents == 0: file_has_next = False |
| 76 | raw_text = "" |
| 77 | for anno in chunk: |
| 78 | raw_text += "\n".join(anno.metadata) + "\n" |
| 79 | for tok in anno.tokens: |
| 80 | raw_text += tok.get_conllU_line() + "\n" |
| 81 | raw_text += "\n" |
| 82 | return raw_text, file_has_next, n_sents |
| 83 | |
| 84 | |
| 85 | def turku_parse_file(raw_text, filename, chunk_ix): |
| 86 | f = filename.split(".")[0] |
| 87 | out_file_str = f"{f}.parsed.{chunk_ix}.conllu" |
| 88 | # For each file make a request to obtain the parse back |
| 89 | logger.info(f"Sending Request {chunk_ix} to Parser Server...") |
| 90 | response = requests.post("http://localhost:7689/", data=raw_text.encode('utf-8')) |
| 91 | response_to_file(response.text, out_file_str) |
| 92 | |
| 93 | |
| 94 | |
| 95 | def response_to_file(response_str, fname): |
| 96 | fout = open(fname, "w") |
| 97 | fout.write(response_str) |
| 98 | fout.close() |
| 99 | |
| 100 | |
| 101 | if __name__ == "__main__": |
| 102 | conll_files = get_filenames(DEREKO_DIR)[:1] # This is for Development Purposes only process the first [at most] 5 files |
| 103 | print(conll_files) |
| 104 | # conll_files = ["tutorial_examples/mini_test_raw.conllu.gz"] |
| 105 | file_has_next, chunk_ix = True, 0 |
| 106 | CHUNK_SIZE = 20000 |
| 107 | |
| 108 | # ===================================================================================== |
| 109 | # LOGGING INFO ... |
| 110 | # ===================================================================================== |
| 111 | logger = logging.getLogger(__name__) |
| 112 | console_hdlr = logging.StreamHandler(sys.stdout) |
| 113 | file_hdlr = logging.FileHandler(filename=f"ParseTests.log") |
| 114 | logging.basicConfig(level=logging.INFO, handlers=[console_hdlr, file_hdlr]) |
| 115 | logger.info("Start Logging") |
| 116 | logger.info(f"Chunking in Files of {CHUNK_SIZE} Sentences") |
| 117 | |
| 118 | # ===================================================================================== |
| 119 | # PROCESS (PARSE) ALL FILES FOUND ... |
| 120 | # ===================================================================================== |
| 121 | for f in conll_files: |
| 122 | start = time.time() |
| 123 | text_filename = expand_file(f) |
| 124 | line_generator = _file_generator(text_filename) |
| 125 | total_processed_sents = 0 |
| 126 | while file_has_next: |
| 127 | raw_text, file_has_next, n_sents = get_file_chunk(line_generator, chunk_size=CHUNK_SIZE) |
| 128 | total_processed_sents += n_sents |
| 129 | if len(raw_text) > 0: |
| 130 | turku_parse_file(raw_text, text_filename, chunk_ix) |
| 131 | now = time.time() |
| 132 | elapsed = (now - start) |
| 133 | logger.info(f"Time Elapsed: {elapsed}. Processed {total_processed_sents}. [{total_processed_sents/elapsed} Sents/sec]\n") # Toks/Sec??? |
| 134 | chunk_ix += 1 |
| 135 | end = time.time() |
| 136 | logger.info(f"Processing File {f} took {(end - start)} seconds!") |
| 137 | |
| 138 | |