From 538bd9b42dabf1145cf7ab77f32347d516b01e88 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 12 May 2020 18:56:34 +0200 Subject: [PATCH] journald: rework pid change handling Let's introduce an explicit line ending marker for line endings due to pid change. Let's also make sure we don't get confused with buffer management. Fixes: #15654 (cherry picked from commit 45ba1ea5e9264d385fa565328fe957ef1d78caa1) Resolves: #2029426 --- src/journal/journald-stream.c | 99 +++++++++++++++++++++++------------ 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/src/journal/journald-stream.c b/src/journal/journald-stream.c index ab1a855943..5be5b0939c 100644 --- a/src/journal/journald-stream.c +++ b/src/journal/journald-stream.c @@ -54,6 +54,7 @@ typedef enum LineBreak { LINE_BREAK_NUL, LINE_BREAK_LINE_MAX, LINE_BREAK_EOF, + LINE_BREAK_PID_CHANGE, _LINE_BREAK_MAX, _LINE_BREAK_INVALID = -1, } LineBreak; @@ -310,6 +311,7 @@ static int stdout_stream_log( [LINE_BREAK_NUL] = "_LINE_BREAK=nul", [LINE_BREAK_LINE_MAX] = "_LINE_BREAK=line-max", [LINE_BREAK_EOF] = "_LINE_BREAK=eof", + [LINE_BREAK_PID_CHANGE] = "_LINE_BREAK=pid-change", }; const char *c = line_break_field_table[line_break]; @@ -431,21 +433,43 @@ static int stdout_stream_line(StdoutStream *s, char *p, LineBreak line_break) { assert_not_reached("Unknown stream state"); } -static int stdout_stream_scan(StdoutStream *s, bool force_flush) { - char *p; - size_t remaining; +static int stdout_stream_found( + StdoutStream *s, + char *p, + size_t l, + LineBreak line_break) { + + char saved; int r; assert(s); + assert(p); + + /* Let's NUL terminate the specified buffer for this call, and revert back afterwards */ + saved = p[l]; + p[l] = 0; + r = stdout_stream_line(s, p, line_break); + p[l] = saved; - p = s->buffer; - remaining = s->length; + return r; +} + +static int stdout_stream_scan( + StdoutStream *s, + char *p, + size_t remaining, + LineBreak force_flush, + size_t *ret_consumed) { - /* XXX: This function does nothing if (s->length == 0) */ + size_t consumed = 0; + int r; + + assert(s); + assert(p); for (;;) { LineBreak line_break; - size_t skip; + size_t skip, found; char *end1, *end2; end1 = memchr(p, '\n', remaining); @@ -453,43 +477,40 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) { if (end2) { /* We found a NUL terminator */ - skip = end2 - p + 1; + found = end2 - p; + skip = found + 1; line_break = LINE_BREAK_NUL; } else if (end1) { /* We found a \n terminator */ - *end1 = 0; - skip = end1 - p + 1; + found = end1 - p; + skip = found + 1; line_break = LINE_BREAK_NEWLINE; } else if (remaining >= s->server->line_max) { /* Force a line break after the maximum line length */ - *(p + s->server->line_max) = 0; - skip = remaining; + found = skip = s->server->line_max; line_break = LINE_BREAK_LINE_MAX; } else break; - r = stdout_stream_line(s, p, line_break); + r = stdout_stream_found(s, p, found, line_break); if (r < 0) return r; - remaining -= skip; p += skip; + consumed += skip; + remaining -= skip; } - if (force_flush && remaining > 0) { - p[remaining] = 0; - r = stdout_stream_line(s, p, LINE_BREAK_EOF); + if (force_flush >= 0 && remaining > 0) { + r = stdout_stream_found(s, p, remaining, force_flush); if (r < 0) return r; - p += remaining; - remaining = 0; + consumed += remaining; } - if (p > s->buffer) { - memmove(s->buffer, p, remaining); - s->length = remaining; - } + if (ret_consumed) + *ret_consumed = consumed; return 0; } @@ -497,11 +518,12 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) { static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) { uint8_t buf[CMSG_SPACE(sizeof(struct ucred))]; StdoutStream *s = userdata; + size_t limit, consumed; struct ucred *ucred = NULL; struct cmsghdr *cmsg; struct iovec iovec; - size_t limit; ssize_t l; + char *p; int r; struct msghdr msghdr = { @@ -529,7 +551,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, /* Try to make use of the allocated buffer in full, but never read more than the configured line size. Also, * always leave room for a terminating NUL we might need to add. */ limit = MIN(s->allocated - 1, s->server->line_max); - + assert(s->length <= limit); iovec = IOVEC_MAKE(s->buffer + s->length, limit - s->length); l = recvmsg(s->fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC); @@ -543,7 +565,7 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, cmsg_close_all(&msghdr); if (l == 0) { - stdout_stream_scan(s, true); + (void) stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_EOF, NULL); goto terminate; } @@ -562,22 +584,33 @@ static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, * in the meantime. */ if (ucred && ucred->pid != s->ucred.pid) { - /* force out any previously half-written lines from a - * different process, before we switch to the new ucred - * structure for everything we just added */ - r = stdout_stream_scan(s, true); + /* Force out any previously half-written lines from a different process, before we switch to + * the new ucred structure for everything we just added */ + r = stdout_stream_scan(s, s->buffer, s->length, /* force_flush = */ LINE_BREAK_PID_CHANGE, NULL); if (r < 0) goto terminate; - s->ucred = *ucred; s->context = client_context_release(s->server, s->context); + + p = s->buffer + s->length; + } else { + p = s->buffer; + l += s->length; } - s->length += l; - r = stdout_stream_scan(s, false); + /* Always copy in the new credentials */ + if (ucred) + s->ucred = *ucred; + + r = stdout_stream_scan(s, p, l, _LINE_BREAK_INVALID, &consumed); if (r < 0) goto terminate; + /* Move what wasn't consumed to the front of the buffer */ + assert(consumed <= (size_t) l); + s->length = l - consumed; + memmove(s->buffer, p + consumed, s->length); + return 1; terminate: