import os import re import time from datetime import datetime, timedelta from io import SEEK_END from typing import List, Tuple, Any class HttpdErrorLog: """Checking the httpd error log for errors and warnings, including limiting checks from a recent known position forward. """ RE_ERRLOG_WARN = re.compile(r'.*\[[^:]+:warn].*') RE_ERRLOG_ERROR = re.compile(r'.*\[[^:]+:error].*') RE_APLOGNO = re.compile(r'.*\[[^:]+:(error|warn)].* (?PAH\d+): .+') def __init__(self, path: str): self._path = path self._ignored_matches = [] self._ignored_lognos = set() # remember the file position we started with self._start_pos = 0 if os.path.isfile(self._path): with open(self._path) as fd: self._start_pos = fd.seek(0, SEEK_END) self._recent_pos = self._start_pos self._recent_errors = [] self._recent_warnings = [] self._caught_errors = set() self._caught_warnings = set() self._caught_matches = set() def __repr__(self): return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._recent_errors)}, " \ f"warnings: {' '.join(self._recent_warnings)}]" @property def path(self) -> str: return self._path def clear_log(self): if os.path.isfile(self.path): os.truncate(self.path, 0) self._start_pos = self._recent_pos = 0 self._recent_errors = [] self._recent_warnings = [] self._caught_errors = set() self._caught_warnings = set() self._caught_matches = set() def _lookup_matches(self, line: str, matches: List[str]) -> bool: for m in matches: if re.match(m, line): return True return False def _lookup_lognos(self, line: str, lognos: set) -> bool: if len(lognos) > 0: m = self.RE_APLOGNO.match(line) if m and m.group('aplogno') in lognos: return True return False def clear_ignored_matches(self): self._ignored_matches = [] def add_ignored_matches(self, matches: List[str]): for m in matches: self._ignored_matches.append(re.compile(m)) def clear_ignored_lognos(self): self._ignored_lognos = set() def add_ignored_lognos(self, lognos: List[str]): for l in lognos: self._ignored_lognos.add(l) def _is_ignored(self, line: str) -> bool: if self._lookup_matches(line, self._ignored_matches): return True if self._lookup_lognos(line, self._ignored_lognos): return True return False def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []): """After a test case triggered errors/warnings on purpose, add those to our 'caught' list so the do not get reported as 'missed'. """ self._recent_errors = [] self._recent_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd: fd.seek(self._recent_pos, os.SEEK_SET) lognos_set = set(lognos) for line in fd: if self._is_ignored(line): continue if self._lookup_matches(line, matches): self._caught_matches.add(line) continue m = self.RE_ERRLOG_WARN.match(line) if m and self._lookup_lognos(line, lognos_set): self._caught_warnings.add(line) continue m = self.RE_ERRLOG_ERROR.match(line) if m and self._lookup_lognos(line, lognos_set): self._caught_errors.add(line) continue self._recent_pos = fd.tell() def get_missed(self) -> Tuple[List[str], List[str]]: errors = [] warnings = [] self._recent_errors = [] self._recent_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd: fd.seek(self._start_pos, os.SEEK_SET) for line in fd: if self._is_ignored(line): continue if line in self._caught_matches: continue m = self.RE_ERRLOG_WARN.match(line) if m and line not in self._caught_warnings: warnings.append(line) continue m = self.RE_ERRLOG_ERROR.match(line) if m and line not in self._caught_errors: errors.append(line) continue self._start_pos = self._recent_pos = fd.tell() self._caught_errors = set() self._caught_warnings = set() self._caught_matches = set() return errors, warnings def scan_recent(self, pattern: re.Pattern, timeout=10): if not os.path.isfile(self.path): return False with open(self.path) as fd: end = datetime.now() + timedelta(seconds=timeout) while True: fd.seek(self._recent_pos, os.SEEK_SET) for line in fd: if pattern.match(line): return True if datetime.now() > end: raise TimeoutError(f"pattern not found in error log after {timeout} seconds") time.sleep(.1) return False