Use binary mode to tail the file
In python 3 f.seek() on text doesn't work like it does in py2/C because text is now unicode. So change read_tail to use byte mode and take unicode into account. Also add tests for it.
This commit is contained in:
parent
90626f97b6
commit
0708302464
@ -1,7 +1,7 @@
|
|||||||
#
|
#
|
||||||
# sysutils.py
|
# sysutils.py
|
||||||
#
|
#
|
||||||
# Copyright (C) 2009-2015 Red Hat, Inc.
|
# Copyright (C) 2009-2019 Red Hat, Inc.
|
||||||
#
|
#
|
||||||
# This program is free software; you can redistribute it and/or modify
|
# This program is free software; you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU General Public License as published by
|
# it under the terms of the GNU General Public License as published by
|
||||||
@ -133,13 +133,28 @@ def flatconfig(filename):
|
|||||||
|
|
||||||
def read_tail(path, size):
|
def read_tail(path, size):
|
||||||
"""Read up to `size` kibibytes from the end of a file"""
|
"""Read up to `size` kibibytes from the end of a file"""
|
||||||
with open(path, "r") as f:
|
|
||||||
|
# NOTE: In py3 text files are unicode, not bytes so we have to open it as bytes
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
return _read_file_end(f, size)
|
||||||
|
|
||||||
|
def _read_file_end(f, size):
|
||||||
|
"""Read the end of a file
|
||||||
|
|
||||||
|
This skips to the next line to avoid starting in the middle of a unicode character.
|
||||||
|
And returns "" in the case of a UnicodeDecodeError
|
||||||
|
"""
|
||||||
f.seek(0, 2)
|
f.seek(0, 2)
|
||||||
end = f.tell()
|
end = f.tell()
|
||||||
if end < 1024 * size:
|
if end < 1024 * size:
|
||||||
f.seek(0, 0)
|
f.seek(0, 0)
|
||||||
else:
|
else:
|
||||||
f.seek(end - (1024 * size))
|
f.seek(end - (1024 * size))
|
||||||
# Find the start of the next line and return the rest
|
data = f.read()
|
||||||
f.readline()
|
try:
|
||||||
return f.read()
|
# Find the first newline in the block
|
||||||
|
newline = min(1+data.find(b'\n'), len(data))
|
||||||
|
text = data[newline:].decode("UTF-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return ""
|
||||||
|
return text
|
||||||
|
@ -14,11 +14,13 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
|
import io
|
||||||
import unittest
|
import unittest
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from pylorax.sysutils import joinpaths, touch, replace, chown_, chmod_, remove, linktree
|
from pylorax.sysutils import joinpaths, touch, replace, chown_, chmod_, remove, linktree
|
||||||
|
from pylorax.sysutils import _read_file_end
|
||||||
|
|
||||||
class SysUtilsTest(unittest.TestCase):
|
class SysUtilsTest(unittest.TestCase):
|
||||||
def joinpaths_test(self):
|
def joinpaths_test(self):
|
||||||
@ -72,3 +74,73 @@ class SysUtilsTest(unittest.TestCase):
|
|||||||
linktree(os.path.join(tdname, "one"), os.path.join(tdname, "copy"))
|
linktree(os.path.join(tdname, "one"), os.path.join(tdname, "copy"))
|
||||||
|
|
||||||
self.assertTrue(os.path.exists(os.path.join(tdname, "copy", "two", "three", "lorax-link-test-file")))
|
self.assertTrue(os.path.exists(os.path.join(tdname, "copy", "two", "three", "lorax-link-test-file")))
|
||||||
|
|
||||||
|
def _generate_lines(self, unicode=False):
|
||||||
|
# helper to generate several KiB of lines of text
|
||||||
|
bio = io.BytesIO()
|
||||||
|
for i in range(0,1024):
|
||||||
|
if not unicode:
|
||||||
|
bio.write(b"Here is another line to test. It is line #%d\n" % i)
|
||||||
|
else:
|
||||||
|
bio.write(b"Here is \xc3\xa0n\xc3\xb2ther line t\xc3\xb2 test. It is line #%d\n" % i)
|
||||||
|
bio.seek(0)
|
||||||
|
return bio
|
||||||
|
|
||||||
|
def read_file_end_test(self):
|
||||||
|
"""Test reading from the end of a file"""
|
||||||
|
self.maxDiff = None
|
||||||
|
|
||||||
|
# file of just lines
|
||||||
|
f = self._generate_lines()
|
||||||
|
|
||||||
|
# Grab the end of the 'file' to compare with, starting at the next line (hard-coded)
|
||||||
|
f.seek(-987, 2)
|
||||||
|
result = f.read().decode("utf-8")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result)
|
||||||
|
|
||||||
|
# file of lines with no final \n, chop off the trailing \n
|
||||||
|
f.seek(-1,2)
|
||||||
|
f.truncate()
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result[:-1])
|
||||||
|
|
||||||
|
# short file, truncate it at 1023 characters
|
||||||
|
f.seek(1023)
|
||||||
|
f.truncate()
|
||||||
|
# Grab the end of the file, starting at the next line (hard-coded)
|
||||||
|
f.seek(44)
|
||||||
|
result = f.read().decode("utf-8")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result)
|
||||||
|
|
||||||
|
# short file with no line endings
|
||||||
|
f.seek(43)
|
||||||
|
f.truncate()
|
||||||
|
# Grab the whole file
|
||||||
|
f.seek(0)
|
||||||
|
result = f.read().decode("utf-8")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result)
|
||||||
|
|
||||||
|
# file with unicode in it
|
||||||
|
f = self._generate_lines(unicode=True)
|
||||||
|
|
||||||
|
# Grab the end of the 'file' to compare with, starting at the next line (hard-coded)
|
||||||
|
f.seek(-1000, 2)
|
||||||
|
result = f.read().decode("utf-8")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result)
|
||||||
|
|
||||||
|
# file with unicode right on block boundary, so that a decode of it would fail if it didn't
|
||||||
|
# move to the next line.
|
||||||
|
f.seek(-1000, 2)
|
||||||
|
result = f.read().decode("utf-8")
|
||||||
|
f.seek(-1025, 2)
|
||||||
|
f.write(b"\xc3\xb2")
|
||||||
|
f.seek(0)
|
||||||
|
self.assertEqual(_read_file_end(f, 1), result)
|
||||||
|
|
||||||
|
# Test for UnicodeDecodeError returning an empty string
|
||||||
|
f = io.BytesIO(b"\xff\xff\xffHere is a string with invalid unicode in it.")
|
||||||
|
self.assertEqual(_read_file_end(f, 1), "")
|
||||||
|
Loading…
Reference in New Issue
Block a user