daza | a805f3a | 2021-02-09 11:10:15 +0100 | [diff] [blame] | 1 | from collections import defaultdict, OrderedDict |
| 2 | import re |
| 3 | |
| 4 | # CoNLL-U Format - https://universaldependencies.org/format.html |
| 5 | |
| 6 | |
| 7 | def get_token_type(type_str): |
| 8 | if type_str =="CoNLL09_Token": |
| 9 | return CoNLL09_Token |
| 10 | elif type_str == "RNNTagger_Token": |
| 11 | return RNNTagger_Token |
| 12 | elif type_str == "CoNLLUP_Token": |
| 13 | return CoNLLUP_Token |
| 14 | elif type_str == "TigerNew_Token": |
| 15 | return TigerNew_Token |
| 16 | else: |
| 17 | raise NotImplementedError(f"I don't know what to do with {type_str} token type!") |
| 18 | |
| 19 | |
| 20 | class TigerNew_Token(): |
| 21 | def __init__(self, raw_line, word_ix): |
| 22 | info = raw_line.split() # [FORM, XPOS] |
| 23 | self.info = info |
| 24 | self.id = word_ix + 1 # 1-based ID as in the CoNLL file |
| 25 | self.position = word_ix # 0-based position in sentence |
| 26 | self.word = info[0] |
| 27 | self.lemma = "_" |
| 28 | self.pos_universal = "_" |
| 29 | self.pos_tag = info[1] |
| 30 | self.detail_tag = "_" |
| 31 | self.head = "_" |
| 32 | self.dep_tag = "_" |
| 33 | self.blank = "_" |
| 34 | self.auto_score = "_" |
| 35 | |
| 36 | def get_info(self): |
| 37 | return [str(self.id), self.word, self.lemma, self.pos_universal, self.pos_tag, self.detail_tag, |
| 38 | str(self.head), self.dep_tag, self.blank, self.auto_score] |
| 39 | |
| 40 | def get_conllU_line(self, separator="\t"): |
| 41 | info = self.get_info() |
| 42 | return separator.join(info) |
| 43 | |
| 44 | |
| 45 | class RNNTagger_Token(): |
| 46 | def __init__(self, raw_line, word_ix): |
| 47 | info = raw_line.split() # [FORM, XPOS.FEATS, LEMMA] |
| 48 | self.info = info |
| 49 | self.id = word_ix + 1 # 1-based ID as in the CoNLL file |
| 50 | self.position = word_ix # 0-based position in sentence |
| 51 | self.word = info[0] |
| 52 | self.lemma = info[2] |
| 53 | self.pos_universal = "_" |
| 54 | self.pos_tag, self.detail_tag = self._process_tag(info[1]) # 'NN.Gen.Sg.Fem' |
| 55 | self.head = "_" |
| 56 | self.dep_tag = "_" |
| 57 | self.blank = "_" |
| 58 | self.auto_score = "_" |
| 59 | |
| 60 | def _process_tag(self, tag): |
| 61 | if tag == "_" or "." not in tag: return tag, "_" |
| 62 | info = tag.split(".") |
| 63 | return info[0], "|".join(info[1:]) |
| 64 | |
| 65 | def get_info(self): |
| 66 | return [str(self.id), self.word, self.lemma, self.pos_universal, self.pos_tag, self.detail_tag, |
| 67 | str(self.head), self.dep_tag, self.blank, self.auto_score] |
| 68 | |
| 69 | def get_conllU_line(self, separator="\t"): |
| 70 | info = self.get_info() |
| 71 | return separator.join(info) |
| 72 | |
| 73 | |
| 74 | class CoNLLUP_Token(): |
| 75 | def __init__(self, raw_line, word_ix): |
| 76 | info = raw_line.split() |
| 77 | # print(info) |
| 78 | # [ID, FORM, LEMMA, UPOS, XPOS, FEATS, HEAD, DEPREL, DEPS, MISC] |
| 79 | # [11, Prügel, Prügel, NN, NN, _, _, _, _, 1.000000] |
| 80 | self.info = info |
| 81 | self.id = info[0] # 1-based ID as in the CoNLL file |
| 82 | self.position = word_ix # 0-based position in sentence |
| 83 | self.word = info[1] |
| 84 | self.lemma = info[2] |
| 85 | self.pos_universal = info[3] |
| 86 | self.pos_tag = self._process_tag(info[4]) # 'XPOS=NE|Case=Nom|Gender=Masc|Number=Sing' TODO: Reuse MorphInfo in the self.detail_tag |
| 87 | self.detail_tag = info[5] |
| 88 | self.head = info[6] |
| 89 | self.dep_tag = info[7] |
| 90 | self.blank = info[8] # ??? |
| 91 | self.auto_score = info[9] |
| 92 | |
| 93 | def _process_tag(self, tag): |
| 94 | if tag == "_" or "|" not in tag: return tag # The XPOS=NE|Case=Nom... is only for Turku! |
| 95 | info = tag.split("|") |
| 96 | info = [x.split("=") for x in info] |
| 97 | return info[0][1] |
| 98 | |
| 99 | def get_info(self): |
| 100 | return [str(self.id), self.word, self.lemma, self.pos_universal, self.pos_tag, self.detail_tag, |
| 101 | str(self.head), self.dep_tag, self.blank, self.auto_score] |
| 102 | |
| 103 | def get_conllU_line(self, separator="\t"): |
| 104 | info = self.get_info() |
| 105 | return separator.join(info) |
| 106 | |
| 107 | |
| 108 | |
| 109 | class CoNLL09_Token(): |
| 110 | def __init__(self, raw_line, word_ix): |
| 111 | info = raw_line.split() |
| 112 | # print(info) |
| 113 | # # ['1', 'Frau', 'Frau', 'Frau', 'NN', 'NN', '_', 'nom|sg|fem', '5', '5', 'CJ', 'CJ', '_', '_', 'AM-DIS', '_'] |
| 114 | self.info = info |
| 115 | self.id = info[0] # 1-based ID as in the CoNLL file |
| 116 | self.position = word_ix # 0-based position in sentence |
| 117 | self.word = info[1] |
| 118 | self.lemma = info[2] |
| 119 | self.pos_universal = "_" # _convert_to_universal(self.pos_tag, self.lemma) |
| 120 | self.pos_tag = info[4] |
| 121 | self.head = info[8] |
| 122 | self.dep_tag = info[10] |
| 123 | self.detail_tag = "_" |
| 124 | self.is_pred = True if info[12] == "Y" else False |
| 125 | if self.is_pred: |
| 126 | self.pred_sense = info[13].strip("[]") |
| 127 | self.pred_sense_id = str(self.position) + "##" + self.pred_sense |
| 128 | else: |
| 129 | self.pred_sense = None |
| 130 | self.pred_sense_id = "" |
| 131 | if len(info) > 14: |
| 132 | self.labels = info[14:] |
| 133 | else: |
| 134 | self.labels = [] |
| 135 | |
| 136 | def get_conllU_line(self, separator="\t"): |
| 137 | # We want: [ID, FORM, LEMMA, UPOS, XPOS, FEATS, HEAD, DEPREL, DEPS, MISC] |
| 138 | tok_id = str(self.id) #.split("_")[0] |
| 139 | conllUinfo = [tok_id, self.word, self.lemma, self.pos_universal, self.pos_tag, self.detail_tag, self.head, self.dep_tag, "_", "_"] |
| 140 | return separator.join(conllUinfo) |
| 141 | |
| 142 | def get_conll09_line(self, delim="\t"): |
| 143 | # We want: |
| 144 | # 1 Frau Frau Frau NN NN _ nom|sg|fem 5 5 CJ CJ _ _ AM-DIS _ |
| 145 | # 10 fall fall fall VB VB _ _ 8 8 VC VC Y fall.01 _ _ _ _ _ |
| 146 | is_pred_str = "Y" if self.is_pred else "_" |
| 147 | sense_str = self.pred_sense if self.is_pred else "_" |
| 148 | info = [self.id, self.word, self.lemma, self.lemma, self.pos_tag, self.pos_tag, "_", self.detail_tag, |
| 149 | self.head, self.head, self.dep_tag, self.dep_tag, is_pred_str, sense_str] + self.labels |
| 150 | return delim.join(info) |
| 151 | |
| 152 | |
| 153 | |
| 154 | ################################# GETTING SENTENCE ANNOTATIONS #################################### |
| 155 | class AnnotatedSentence(): |
| 156 | def __init__(self): |
| 157 | self.metadata = [] |
| 158 | self.tokens = [] |
| 159 | |
| 160 | def get_words(self): |
| 161 | return [tok.word for tok in self.tokens] |
| 162 | |
| 163 | def get_sentence(self): |
| 164 | return " ".join([tok.word for tok in self.tokens]) |
| 165 | |
| 166 | def get_pos_tags(self, universal=False): |
| 167 | if universal: |
| 168 | return [tok.pos_universal for tok in self.tokens] |
| 169 | else: |
| 170 | return [tok.pos_tag for tok in self.tokens] |
| 171 | |
| 172 | |
| 173 | def get_annotation(raw_lines, raw_meta, token_class): |
| 174 | ann = AnnotatedSentence() |
| 175 | ann.metadata = [m.strip("\n") for m in raw_meta] |
| 176 | # Annotate the predicates and senses |
| 177 | real_index = 0 |
| 178 | for i, line in enumerate(raw_lines): |
| 179 | tok = token_class(line, real_index) |
| 180 | ann.tokens.append(tok) |
| 181 | real_index += 1 |
| 182 | return ann |
| 183 | |
| 184 | |
Marc Kupietz | a01314f | 2021-02-11 17:02:08 +0100 | [diff] [blame] | 185 | def read_conll(line_generator, chunk_size, token_class=CoNLLUP_Token, comment_str="###C:", our_foundry="spacy"): |
daza | a805f3a | 2021-02-09 11:10:15 +0100 | [diff] [blame] | 186 | n_sents = 0 |
| 187 | annotated_sentences, buffer_meta, buffer_lst = [], [], [] |
| 188 | for i, line in enumerate(line_generator): |
| 189 | if line.startswith(comment_str): |
Marc Kupietz | 209f8ae | 2024-02-05 06:44:30 +0100 | [diff] [blame] | 190 | line = re.sub(r'(foundry\s*=\s*).*', r"\1" + our_foundry, line) |
| 191 | line = re.sub(r'(filename\s*=\s* .[^/]*/[^/]+/[^/]+/).*', r"\1" + our_foundry + "/morpho.xml", line) |
Marc Kupietz | a01314f | 2021-02-11 17:02:08 +0100 | [diff] [blame] | 192 | buffer_meta.append(line) |
daza | a805f3a | 2021-02-09 11:10:15 +0100 | [diff] [blame] | 193 | continue |
| 194 | if len(line.split()) > 0: |
| 195 | buffer_lst.append(line) |
| 196 | else: |
| 197 | ann = get_annotation(buffer_lst, buffer_meta, token_class) |
| 198 | n_sents += 1 |
| 199 | buffer_lst, buffer_meta = [], [] |
| 200 | annotated_sentences.append(ann) |
| 201 | if chunk_size > 0 and n_sents == chunk_size: break |
| 202 | # logger.info("Read {} Sentences!".format(n_sents)) |
| 203 | return annotated_sentences, n_sents |
| 204 | |
| 205 | |
| 206 | def read_conll_generator(filepath, token_class=CoNLLUP_Token, sent_sep=None, comment_str="###C:"): |
| 207 | buffer_meta, buffer_lst = [], [] |
| 208 | sentence_finished = False |
| 209 | with open(filepath) as f: |
| 210 | for i, line in enumerate(f.readlines()): |
| 211 | if sent_sep and sent_sep in line: sentence_finished = True |
| 212 | if line.startswith(comment_str): |
| 213 | continue |
| 214 | if len(line.split()) > 0 and not sentence_finished: |
| 215 | buffer_lst.append(line) |
| 216 | else: |
| 217 | ann = get_annotation(buffer_lst, buffer_meta, token_class) |
| 218 | buffer_lst, buffer_meta = [], [] |
| 219 | sentence_finished = False |
| 220 | yield ann |