Markopy
Utilizing Markov Models for brute forcing attacks
evaluate.py
Go to the documentation of this file.
1 
2 
3 
4 
8 from abc import abstractmethod
9 import re
10 import allogate as logging
11 import statistics
12 import os
13 from copy import copy
14 import glob
15 import re
16 
17 
21 
22 class Evaluator:
23  """!
24  @brief Abstract class to evaluate and score integrity/validty
25  @belongsto Python::Markopy::Evaluation
26  """
27  def __init__(self, filename: str) -> None:
28  """!
29  @brief default constructor for evaluator
30  @param filename filename to evaluate. Can be a pattern
31  """
32  self.filenamefilename = filename
33  self.checkschecks = []
34  self.TEST_PASS_SYMBOLTEST_PASS_SYMBOL = b"\xe2\x9c\x85".decode()
35  self.TEST_FAIL_SYMBOLTEST_FAIL_SYMBOL = b"\xe2\x9d\x8c".decode()
36  self.all_checks_passedall_checks_passed = True
37  self.filesfiles = []
38  if("*" in filename):
39  self.filesfiles = glob.glob(filename)
40  else:
41  self.filesfiles.append(filename)
42  return True
43 
44  def evaluate(self) -> bool:
45  "! @brief base evaluation function"
46  for file in self.filesfiles:
47  self._evaluate_evaluate(file)
48 
49  self.check_funcscheck_funcs = [func for func in dir(self) if (callable(getattr(self, func)) and func.startswith("check_"))]
50 
51  @abstractmethod
52  def _evaluate(self, file) -> list:
53  """!
54  @brief internal evaluation function for a single file
55  @param file filename to evaluate
56  """
57  if(not os.path.isfile(file)):
58  logging.pprint(f"Given file {file} is not a valid filename")
59  return False
60  else:
61  return open(file, "rb").read().split(b"\n")
62 
63 
64 
65  def success(self, checkname):
66  """!
67  @brief pass a test
68  @param checkname text to display with the check
69  """
70  self.checkschecks.append((checkname, self.TEST_PASS_SYMBOLTEST_PASS_SYMBOL))
71 
72  def fail(self, checkname):
73  """!
74  @brief fail a test
75  @param checkname text to display with the check
76  """
77 
78  self.all_checks_passedall_checks_passed = False
79  self.checkschecks.append((checkname, self.TEST_FAIL_SYMBOLTEST_FAIL_SYMBOL))
80 
81  def finalize(self):
82  "! @brief finalize an evaluation and print checks"
83  print("\n################ Checks ################ ")
84  for test in self.checkschecks:
85  logging.pprint(f"{test[0]:30}:{test[1]} ")
86  print("\n")
87  self.checkschecks = []
88  return self.all_checks_passedall_checks_passed
89 
91  """!
92  @brief evaluate a model
93  @belongsto Python::Markopy::Evaluation
94  @extends Python::Markopy::Evaluation::Evaluator
95  """
96  def __init__(self, filename: str) -> None:
97  "! @brief default constructor"
98  valid = super().__init__(filename)
99 
100  if not valid:
101  return False
102 
103  def evaluate(self):
104  "! @brief evaluate a model"
105  logging.VERBOSITY=2
106  logging.SHOW_STACK_THRESHOLD=3
107  super().evaluate()
108  for file in self.filesfiles:
109  logging.pprint(f"Model: {file.split('/')[-1]}: ",2)
110  edges = super()._evaluate(file)
111  if not edges:
112  continue
113  self.lnodeslnodes = {}
114  self.rnodesrnodes = {}
115  self.ewsews = []
116  self.edge_countedge_count = len(edges)
117  for edge in edges:
118  if(edge ==b''):
119  self.edge_countedge_count-=1
120  continue
121  try:
122  e = edge.split(b',')
123  self.ewsews.append(int(edge[2:-2:1]))
124  if(e[0] not in self.lnodeslnodes):
125  self.lnodeslnodes[e[0]]=1
126  else:
127  self.lnodeslnodes[e[0]]+=1
128  if(e[-1] not in self.rnodesrnodes):
129  self.rnodesrnodes[e[-1]]=1
130  else:
131  self.rnodesrnodes[e[-1]]+=1
132  except Exception as e:
133  print(e)
134  logging.pprint(f"Model file is corrupted.", 0)
135  continue
136 
137  self.lnode_countlnode_count = len(self.lnodeslnodes)
138  self.rnode_countrnode_count = len(self.rnodesrnodes)
139  logging.pprint(f"total edges: {self.edge_count}", 1)
140  logging.pprint(f"unique left nodes: {self.lnode_count}", 1)
141  logging.pprint(f"unique right nodes: {self.rnode_count}", 1)
142 
143  for check in self.check_funcscheck_funcs:
144  try:
145  self.__getattribute__(check)()
146  except Exception as e:
147  print(e)
148  self.failfail(f"Exceptionn in {check}")
149  self.finalizefinalize()
150 
151  def check_dangling(self):
152  "! @brief check if model has dangling nodes"
153  if(self.lnode_countlnode_count == self.rnode_countrnode_count):
154  self.successsuccess("No dangling nodes")
155  else:
156  logging.pprint(f"Dangling nodes found, lnodes and rnodes do not match", 0)
157  self.failfail("No dangling nodes")
158 
159  def check_structure(self):
160  "! @brief check model structure for validity"
161  if((self.lnode_countlnode_count-1) * (self.rnode_countrnode_count-1) + 2*(self.lnode_countlnode_count-1)):
162  self.successsuccess("Model structure")
163  else:
164  logging.pprint(f"Model did not satisfy structural integrity check (lnode_count-1) * (rnode_count-1) + 2*(lnode_count-1)", 0)
165  self.failfail("Model structure")
166 
168  "! @brief check model standart deviation between edge weights"
169  mean = sum(self.ewsews) / len(self.ewsews)
170  variance = sum([((x - mean) ** 2) for x in self.ewsews]) / len(self.ewsews)
171  res = variance ** 0.5
172  self.stdevstdev = res
173  if(res==0):
174  logging.pprint(f"Model seems to be untrained", 0)
175  self.failfail("Model has any training")
176  else:
177  self.successsuccess("Model has any training")
178  if(res<3000):
179  logging.pprint(f"Model is not adequately trained. Might result in inadequate results", 1)
180  self.failfail("Model has training")
181  self.failfail(f"Model training score: {round(self.stdev,2)}")
182  else:
183  self.successsuccess("Model has training")
184  self.successsuccess(f"Model training score: {round(self.stdev)}")
185 
186  def check_min(self):
187  "! @brief check 0 edge weights distribution"
188  count = 0
189  for ew in self.ewsews:
190  if ew==0:
191  count+=1
192  if(count > self.rnode_countrnode_count*0.8):
193  self.failfail("Too many 0 edges")
194  logging.pprint(f"0 weighted edges are dangerous and may halt the model.", 0)
195  else:
196  self.successsuccess("0 edges below threshold")
197 
199  "! @brief check minimum 10% of the edges"
200  sample = self.ewsews[int(self.edge_countedge_count*0.1)]
201  #print(f"10per: {sample}")
202  avg = sum(self.ewsews) / len(self.ewsews)
203  #print(f"avg: {avg}")
204  med = statistics.median(self.ewsews)
205  #print(f"med: {med}")
206 
207  def check_lean(self):
208  "! @brief check which way model is leaning. Left, or right"
209  sample = self.ewsews[int(self.edge_countedge_count*0.1)]
210  avg = sum(self.ewsews) / len(self.ewsews)
211  med = statistics.median(self.ewsews)
212 
213  if(med*10<sample):
214  logging.pprint("Median is too left leaning and might indicate high entropy")
215  self.failfail("Median too left leaning")
216  else:
217  self.successsuccess("Median in expected ratio")
218  pass
219 
220  if(sample*5>avg):
221  logging.pprint("Least probable 10% too close to average, might indicate inadequate training")
222  self.failfail("Bad bottom 10%")
223  else:
224  self.successsuccess("Good bottom 10%")
225  pass
226 
227 
228  def check_distrib(self):
229  "! @deprecated"
230  sorted_ews = copy(self.ewsews)
231  sorted_ews.sort(reverse=True)
232  ratio1 = sorted_ews[0]/sorted_ews[int(self.edge_countedge_count/2)]
233  ratio2 = sorted_ews[int(self.edge_countedge_count/2)]/sorted_ews[int(self.edge_countedge_count*0.1)]
234  #print(ratio1)
235  #print(ratio2)
236 
237 
239  """!
240  @brief evaluate a corpus
241  @belongsto Python::Markopy::Evaluation
242  @extends Python::Markopy::Evaluation::Evaluator
243  """
244  def __init__(self, filename: str) -> None:
245  """!
246  @brief default constructor
247  @param filename filename or pattern to check
248  """
249  valid = super().__init__(filename)
250  if not valid:
251  return False
252 
253  def evaluate(self):
254  "! @brief evalute a corpus. Might take a long time"
255  logging.pprint("WARNING: This takes a while with larger corpus files", 2)
256  logging.VERBOSITY=2
257  logging.SHOW_STACK_THRESHOLD=3
258  super().evaluate()
259  for file in self.filesfiles:
260 
261  delimiter = ''
262  sum=0
263  max=0
264  total_chars = 0
265  lines_count = 0
266  bDelimiterConflict=False
267  logging.pprint(f"Corpus: {file.split('/')[-1]}: ",2)
268  with open(file, "rb") as corpus:
269  for line in corpus:
270  lines_count+=1
271  match = re.match(r"([0-9]+)(.)(.*)\n", line.decode()).groups()
272  if(delimiter and delimiter!=match[1]):
273  bDelimiterConflict = True
274 
275  elif(not delimiter):
276  delimiter = match[1]
277  logging.pprint(f"Delimiter is: {delimiter.encode()}")
278  sum +=int(match[0])
279  total_chars += len(match[2])
280  if(int(match[0])>max):
281  max=int(match[0])
282 
283  if(bDelimiterConflict):
284  self.failfail("Incorrect delimiter found")
285  else:
286  self.successsuccess("No structural conflicts")
287 
288  logging.pprint(f"Total number of lines: {lines_count}")
289  logging.pprint(f"Sum of all string weights: {sum}")
290  logging.pprint(f"Character total: {total_chars}")
291  logging.pprint(f"Average length: {total_chars/lines_count}")
292  logging.pprint(f"Average weight: {sum/lines_count}")
293 
294  self.finalizefinalize()
295  def _evaluate(self, file) -> list:
296  """!
297  @brief evaluate a single file. Remove reading file because it should be read line by line.
298  @param file corpus filename to evaluate
299  """
300  if(not os.path.isfile(file)):
301  logging.pprint(f"Given file {file} is not a valid filename")
302  return False
303  else:
304  return True
305 
None __init__(self, str filename)
default constructor
Definition: evaluate.py:244
list _evaluate(self, file)
evaluate a single file.
Definition: evaluate.py:295
Abstract class to evaluate and score integrity/validty.
Definition: evaluate.py:22
def success(self, checkname)
pass a test
Definition: evaluate.py:65
None __init__(self, str filename)
default constructor for evaluator
Definition: evaluate.py:27
list _evaluate(self, file)
internal evaluation function for a single file
Definition: evaluate.py:52
def fail(self, checkname)
fail a test
Definition: evaluate.py:72
None __init__(self, str filename)
default constructor for evaluator
Definition: evaluate.py:96