import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""Basically tail -f"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error_has_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""result"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""Parse string to datetime"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""Return a datetime that is line_time plus one minute"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self._parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self._find_error_has_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""Basically tail -f"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""Parse string to datetime"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""Return a datetime that is line_time plus one minute"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self._parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self._find_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""Basically tail -f"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _has_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""Parse string to datetime"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""Return a datetime that is line_time plus one minute"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self._parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self._has_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
increase_base_timefunction.datetime.timedelta(minutes=1)is easily extracted to another constant:INC_MINUTE = datetime.timedelta(minutes=1)find_error(line, base_time, line_time)function.
Besides of simplifyingERRORpattern search (as @Graipher mentioned) the function has another issue as it introduces an unnecessary coupling/dependency on two external identifiersbase_timeandline_time- whereas the sufficient function's responsibility is "check ifERRORoccurs within the current log line" (returning boolean result)
That dependency should be eliminated.
To "beat" that, the crucial conditional withinmonitorfunction that calls thefind functionfind_errorfunction can be restructured to present a natural order of mutually exclusive conditions:... if line_time > base_time: base_time = self.increase_base_time(line_time) count = 0 elif self.find_error(): count += 1
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""
Basically"""Basically tail -f
"""f"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""
Parse"""Parse string to datetime
"""datetime"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""
Return"""Return a datetime that is line_time plus one minute
"""minute"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self.parse_datetime_parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self.find_error_find_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
increase_base_timefunction.datetime.timedelta(minutes=1)is easily extracted to another constant:INC_MINUTE = datetime.timedelta(minutes=1)find_error(line, base_time, line_time)function.
Besides of simplifyingERRORpattern search (as @Graipher mentioned) the function has another issue as it introduces an unnecessary coupling/dependency on two external identifiersbase_timeandline_time- whereas the sufficient function's responsibility is "check ifERRORoccurs within the current log line" (returning boolean result)
That dependency should be eliminated.
To "beat" that, the crucial conditional withinmonitorfunction that calls thefind functioncan be restructured to present a natural order of mutually exclusive conditions:... if line_time > base_time: base_time = self.increase_base_time(line_time) count = 0 elif self.find_error(): count += 1
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""
Basically tail -f
"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""
Parse string to datetime
"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""
Return a datetime that is line_time plus one minute
"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self.parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self.find_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
increase_base_timefunction.datetime.timedelta(minutes=1)is easily extracted to another constant:INC_MINUTE = datetime.timedelta(minutes=1)find_error(line, base_time, line_time)function.
Besides of simplifyingERRORpattern search (as @Graipher mentioned) the function has another issue as it introduces an unnecessary coupling/dependency on two external identifiersbase_timeandline_time- whereas the sufficient function's responsibility is "check ifERRORoccurs within the current log line" (returning boolean result)
That dependency should be eliminated.
To "beat" that, the crucial conditional withinmonitorfunction that calls thefind_errorfunction can be restructured to present a natural order of mutually exclusive conditions:... if line_time > base_time: base_time = self.increase_base_time(line_time) count = 0 elif self.find_error(): count += 1
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""Basically tail -f"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""Parse string to datetime"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""Return a datetime that is line_time plus one minute"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self._parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self._find_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()
... about making this class based, but it really doesn't make sense to do it ...
I'd not agree with that, because:
- same arguments
filename/fandlineare passed around across multiple functions - with custom class approach the things can be initialized/defined at once (on class definition or instantiation phase)
- with OOP approach I'm confident that functions like
parse_time(line)orfind_error(line, ...)are not passed with undesirable or "tricky"lineargument, as the line being currently read can be encapsulated.
List of issues (with support of OOP approach):
- pattern
date_regex = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'.
Instead of generating regex pattern on each call ofparse_timefunction - we'll just make it a classLogMonitorconstant calledDATE_REGEX.
Furthermore, we can precompile the regex pattern withre.compliefunction:
using
re.compile()and saving the resulting regular expression object for reuse is more efficient when the expression will be used several times in a single program
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
increase_base_timefunction.datetime.timedelta(minutes=1)is easily extracted to another constant:INC_MINUTE = datetime.timedelta(minutes=1)find_error(line, base_time, line_time)function.
Besides of simplifyingERRORpattern search (as @Graipher mentioned) the function has another issue as it introduces an unnecessary coupling/dependency on two external identifiersbase_timeandline_time- whereas the sufficient function's responsibility is "check ifERRORoccurs within the current log line" (returning boolean result)
That dependency should be eliminated.
To "beat" that, the crucial conditional withinmonitorfunction that calls thefind functioncan be restructured to present a natural order of mutually exclusive conditions:... if line_time > base_time: base_time = self.increase_base_time(line_time) count = 0 elif self.find_error(): count += 1
In below OOP implementation the input filename is passed at once to LogMonitor constructor as LogMonitor(sys.argv[1]), the functions are logically renamed, the needed state/behavior is encapsulated, the above mentioned OOP benefits/arguments included.
import sys
import time
import datetime
import os
import re
class LogMonitor:
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
INC_MINUTE = datetime.timedelta(minutes=1)
def __init__(self, filename):
self._fname = filename
self._log = self._read_log() # generator
self._curr_line = ''
def _read_log(self):
"""
Basically tail -f
"""
with open(self._fname) as logfile:
logfile.seek(0, os.SEEK_END)
while True:
new_line = logfile.readline()
if new_line:
self._curr_line = new_line
yield self._curr_line
else:
time.sleep(0.1)
def _find_error(self):
"""
Search for ERROR keyword in log line. Returns boolean result of search
"""
return 'ERROR' in self._curr_line
def _parse_datetime(self):
"""
Parse string to datetime
"""
date_string = self.DATE_REGEX.search(self._curr_line).group(0)
return datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
def increase_base_time(self, line_time):
"""
Return a datetime that is line_time plus one minute
"""
return line_time + self.INC_MINUTE
def run(self):
"""
Read from a log file.
For each line check if log line time is later than base comparison time.
If so, update the base time and set error count to 0.
Check for errors in line. Increment error count if any are found.
Check the total error count. If it's greater than five, restart the logged
process.
Check if process has been restarted before. If so, kill the logged process.
"""
count = 0
base_time = datetime.datetime.min
restarted = False
for line in self._log:
line_time = self.parse_datetime()
if line_time > base_time:
base_time = self.increase_base_time(line_time)
count = 0
elif self.find_error():
count += 1
if count >= 5:
if restarted:
print("Kill the process") # A placeholder, sorry
else:
print("Restart the process") # Also a placeholder, sorry
count = 0
restarted = True
if __name__ == "__main__":
monitor = LogMonitor(sys.argv[1])
monitor.run()