blob: c5eb8ed972702841d95598d9b83374ed87559387 [file] [log] [blame]
import requests, logging, json
import subprocess, time
import glob, logging
import os.path, sys
from lib.CoNLL_Annotation import read_conll, read_conll_generator
logger = logging.getLogger(__name__)
def list_to_file(my_list, out_path):
with open(out_path, "w") as out:
for item_str in my_list:
out.write(f"{item_str}\n")
def counter_to_file(my_counter, out_path):
with open(out_path, "w") as out:
for item, count in my_counter:
item_str = "\t".join(item)
out.write(f"{item_str}\t{count}\n")
def dict_to_file(my_dict, out_path):
with open(out_path, "w", encoding='utf8') as out:
json.dump(my_dict, fp=out, ensure_ascii=False)
def file_to_dict(file_path):
d = {}
with open(file_path) as f:
d = json.load(f)
return d
def write_conll_file(conll_objs, out_path):
with open(out_path, "w", encoding='utf8') as out:
for obj in conll_objs:
for tok in obj.tokens:
out.write(tok.get_conllU_line()+"\n")
out.write("\n")
def file_generator(file_path):
with open(file_path, "r") as data_file:
logger.info("Reading instances from lines in file at: %s", file_path)
for line in data_file:
if not line: continue
yield line
def get_file_annos_chunk(line_generator, chunk_size, token_class, comment_str="###C:", our_foundry="spacy"):
file_has_next = True
chunk, n_sents = read_conll(line_generator, chunk_size, token_class, comment_str=comment_str, our_foundry=our_foundry)
if n_sents == 0: file_has_next = False
sents, gld, meta = [], [], []
return chunk, file_has_next
def get_file_text_chunk(line_generator, chunk_size, token_class, comment_str="###C:"):
""" Same as get_file_annos_chunk but directly get (text, labels) pairs"""
file_has_next = True
chunk, n_sents = read_conll(line_generator, chunk_size, token_class, comment_str=comment_str)
if n_sents == 0: file_has_next = False
sents, gld, meta = [], [], []
for anno in chunk:
if len(anno.metadata) > 0: meta.append("\n".join(anno.metadata))
sents.append(anno.get_sentence())
gld.append(anno.get_pos_tags())
return sents, gld, file_has_next
def get_file_chunk(line_generator, chunk_size, token_class, comment_str="###C:"):
file_has_next = True
chunk, n_sents = read_conll(line_generator, chunk_size, token_class, comment_str=comment_str)
if n_sents < chunk_size: file_has_next = False
raw_text = ""
for anno in chunk:
if len(anno.metadata) > 0:
raw_text += "\n".join(anno.metadata) + "\n"
else:
raw_text += "\n"
for tok in anno.tokens:
raw_text += tok.get_conllU_line() + "\n"
raw_text += "\n"
return raw_text, file_has_next, n_sents
def turku_parse_file(raw_text, filename, chunk_ix):
out_file_str = f"{filename}.parsed.{chunk_ix}.conllu"
# For each file make a request to obtain the parse back
logger.info(f"Sending Request {chunk_ix} to Parser Server...")
response = requests.post("http://localhost:7689/", data=raw_text.encode('utf-8'))
response_to_file(response.text, out_file_str)
def response_to_file(response_str, fname):
fout = open(fname, "w")
fout.write(response_str)
fout.close()
def expand_file(f, substitute_comment=False):
# Expand the .gz file
fname = f[:-3]
if not os.path.isfile(fname):
p = subprocess.call(f"gunzip -c {f} > {fname}", shell=True)
if p == 0:
logger.info("Successfully uncompressed file")
else:
logger.info(f"Couldn't expand file {f}")
raise Exception
else:
logger.info(f"File {fname} is already uncompressed. Skipping this step...")
# Substitute the Commentary Lines on the Expanded file
if substitute_comment:
fixed_filename = f"{fname}.fixed"
p = subprocess.call(f"sed 's/^# /###C: /g' {fname}", shell=True, stdout=open(fixed_filename, "w")) # stdout=subprocess.PIPE
if p == 0:
logger.info("Successfully fixed comments on file")
else:
logger.info(f"Something went wrong when substituting commentaries")
raise Exception
return fixed_filename
else:
return fname