Fix monitor problem with split UTF8 characters
Sometimes, depending on the buffering, or length of data being received, the end of the data could be the 1st byte or so of a UTF8 character. This would cause a crash when trying to decode the raw data buffer. This switches it to only decode once a full line has been found. It also adds tests for the LogMonitor class.
This commit is contained in:
parent
74128fcd59
commit
6160d340c4
@ -74,30 +74,25 @@ class LogRequestHandler(socketserver.BaseRequestHandler):
|
|||||||
Loops until self.server.kill is True
|
Loops until self.server.kill is True
|
||||||
"""
|
"""
|
||||||
log.info("Processing logs from %s", self.client_address)
|
log.info("Processing logs from %s", self.client_address)
|
||||||
line = ""
|
data = b""
|
||||||
while True:
|
while True:
|
||||||
if self.server.kill:
|
if self.server.kill:
|
||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = str(self.request.recv(4096), "utf8")
|
data += self.request.recv(4096)
|
||||||
|
for line in data.splitlines(keepends=True):
|
||||||
|
if line.endswith(b"\n"):
|
||||||
|
# Ignore invalid UTF8 inside lines
|
||||||
|
self.iserror(str(line[:-1], "utf8", "ignore"))
|
||||||
if self.fp:
|
if self.fp:
|
||||||
self.fp.write(data)
|
self.fp.write(str(line, "utf8", "ignore"))
|
||||||
self.fp.flush()
|
self.fp.flush()
|
||||||
|
data = b""
|
||||||
# check the data for errors and set error flag
|
|
||||||
# need to assemble it into lines so we can test for the error
|
|
||||||
# string.
|
|
||||||
while data:
|
|
||||||
more = data.split("\n", 1)
|
|
||||||
line += more[0]
|
|
||||||
if len(more) > 1:
|
|
||||||
self.iserror(line)
|
|
||||||
line = ""
|
|
||||||
data = more[1]
|
|
||||||
else:
|
else:
|
||||||
data = None
|
# Not the end of the line, keep for later
|
||||||
|
data = line
|
||||||
|
break
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
pass
|
pass
|
||||||
except Exception as e: # pylint: disable=broad-except
|
except Exception as e: # pylint: disable=broad-except
|
||||||
@ -106,7 +101,6 @@ class LogRequestHandler(socketserver.BaseRequestHandler):
|
|||||||
|
|
||||||
def finish(self):
|
def finish(self):
|
||||||
log.info("Shutting down log processing")
|
log.info("Shutting down log processing")
|
||||||
self.request.close()
|
|
||||||
if self.fp:
|
if self.fp:
|
||||||
self.fp.close()
|
self.fp.close()
|
||||||
|
|
||||||
@ -202,3 +196,4 @@ class LogMonitor(object):
|
|||||||
"""Force shutdown of the monitoring thread"""
|
"""Force shutdown of the monitoring thread"""
|
||||||
self.server.kill = True
|
self.server.kill = True
|
||||||
self.server_thread.join()
|
self.server_thread.join()
|
||||||
|
self.server.server_close()
|
||||||
|
69
tests/pylorax/test_monitor.py
Normal file
69
tests/pylorax/test_monitor.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
import socket
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from pylorax.monitor import LogMonitor
|
||||||
|
|
||||||
|
class LogMonitorTest(unittest.TestCase):
|
||||||
|
def test_monitor(self):
|
||||||
|
monitor = LogMonitor(timeout=1)
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((monitor.host, monitor.port))
|
||||||
|
s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8"))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertFalse(monitor.server.log_check())
|
||||||
|
s.sendall("\nAnother line\nTraceback (Not a real traceback)\n".encode("utf8"))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertTrue(monitor.server.log_check())
|
||||||
|
self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)")
|
||||||
|
finally:
|
||||||
|
monitor.shutdown()
|
||||||
|
|
||||||
|
def test_monitor_IGNORED(self):
|
||||||
|
monitor = LogMonitor(timeout=1)
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((monitor.host, monitor.port))
|
||||||
|
s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8"))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertFalse(monitor.server.log_check())
|
||||||
|
s.sendall("\nAnother line\nIGNORED: Traceback (Not a real traceback)\n".encode("utf8"))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertFalse(monitor.server.log_check())
|
||||||
|
self.assertEqual(monitor.server.error_line, "")
|
||||||
|
finally:
|
||||||
|
monitor.shutdown()
|
||||||
|
|
||||||
|
def test_monitor_timeout(self):
|
||||||
|
# Timeout is in minutes so to shorten the test we pass 0.1
|
||||||
|
monitor = LogMonitor(timeout=0.1)
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((monitor.host, monitor.port))
|
||||||
|
s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8"))
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertFalse(monitor.server.log_check())
|
||||||
|
time.sleep(7)
|
||||||
|
self.assertTrue(monitor.server.log_check())
|
||||||
|
self.assertEqual(monitor.server.error_line, "")
|
||||||
|
finally:
|
||||||
|
monitor.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_monitor_utf8(self):
|
||||||
|
## If a utf8 character spans the end of the 4096 byte buffer it will fail to
|
||||||
|
## decode. Test to make sure it is reassembled correctly.
|
||||||
|
monitor = LogMonitor(timeout=1)
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((monitor.host, monitor.port))
|
||||||
|
|
||||||
|
# Simulate a UTF8 character that gets broken into parts by buffering, etc.
|
||||||
|
data = "Just a test string\nTraceback (Not a real traceback)\nWith A"
|
||||||
|
s.sendall(data.encode("utf8") + b"\xc3")
|
||||||
|
time.sleep(1)
|
||||||
|
self.assertTrue(monitor.server.log_check())
|
||||||
|
self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)")
|
||||||
|
finally:
|
||||||
|
monitor.shutdown()
|
Loading…
Reference in New Issue
Block a user