f4d39b1b7f
resolves: rhbz#1743537
753 lines
23 KiB
Diff
753 lines
23 KiB
Diff
From 78976a9bc0592317be2e41b7f3703803f7971e1e Mon Sep 17 00:00:00 2001
|
|
From: Jiri Vymazal <jvymazal@redhat.com>
|
|
Date: Wed, 25 Sep 2019 15:18:57 +0200
|
|
Subject: [PATCH] plugin code restructuring, added remote option
|
|
|
|
Decomposed ReadJournal() a bit, also now coupling journald
|
|
variables in one struct, added few warning messages and debug
|
|
prints to help with bug hunts in future, also got rid of two
|
|
needless journald calls. WorkAroundJournalBug now deprecated.
|
|
Added option to pull journald records from outside local machine.
|
|
---
|
|
plugins/imjournal/imjournal.c | 368 ++++++++++++++++++++--------------
|
|
1 file changed, 214 insertions(+), 154 deletions(-)
|
|
|
|
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
|
|
index 2e27922b51..8008f159c4 100644
|
|
--- a/plugins/imjournal/imjournal.c
|
|
+++ b/plugins/imjournal/imjournal.c
|
|
@@ -81,8 +81,9 @@ static struct configSettings_s {
|
|
int iDfltFacility;
|
|
int bUseJnlPID;
|
|
char *usePid;
|
|
- int bWorkAroundJournalBug;
|
|
+ int bWorkAroundJournalBug; /* deprecated, left for backwards compatibility only */
|
|
int bFsync;
|
|
+ int bRemote;
|
|
} cs;
|
|
|
|
static rsRetVal facilityHdlr(uchar **pp, void *pVal);
|
|
@@ -100,7 +101,8 @@ static struct cnfparamdescr modpdescr[] = {
|
|
{ "usepidfromsystem", eCmdHdlrBinary, 0 },
|
|
{ "usepid", eCmdHdlrString, 0 },
|
|
{ "workaroundjournalbug", eCmdHdlrBinary, 0 },
|
|
- { "fsync", eCmdHdlrBinary, 0 }
|
|
+ { "fsync", eCmdHdlrBinary, 0 },
|
|
+ { "remote", eCmdHdlrBinary, 0 }
|
|
};
|
|
static struct cnfparamblk modpblk =
|
|
{ CNFPARAMBLK_VERSION,
|
|
@@ -120,8 +122,6 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1
|
|
static const char *pidFieldName; /* read-only after startup */
|
|
static int bPidFallBack;
|
|
static ratelimit_t *ratelimiter = NULL;
|
|
-static sd_journal *j;
|
|
-static sbool reloaded = 0;
|
|
static struct {
|
|
statsobj_t *stats;
|
|
STATSCOUNTER_DEF(ctrSubmitted, mutCtrSubmitted)
|
|
@@ -134,34 +134,58 @@ static struct {
|
|
uint64 ratelimitDiscardedInInterval;
|
|
uint64 diskUsageBytes;
|
|
} statsCounter;
|
|
-static char *last_cursor = NULL;
|
|
+struct journalContext_s { /* structure encapsulating all the journald_API-related stuff */
|
|
+ sd_journal *j; /* main object encapsulating journal for us, has to be used in every sd_journal*() call */
|
|
+ sbool reloaded; /* we have reloaded journal after detecting rotation */
|
|
+ sbool atHead; /* true if we are at start of journal (no seek was done) */
|
|
+ char *cursor; /* should point to last valid journald entry we processed */
|
|
+};
|
|
+static struct journalContext_s journalContext = {NULL, 0, 1, NULL};
|
|
|
|
#define J_PROCESS_PERIOD 1024 /* Call sd_journal_process() every 1,024 records */
|
|
|
|
-static rsRetVal persistJournalState(int trySave);
|
|
+static rsRetVal persistJournalState(void);
|
|
static rsRetVal loadJournalState(void);
|
|
|
|
static rsRetVal openJournal(void) {
|
|
int r;
|
|
DEFiRet;
|
|
|
|
- if ((r = sd_journal_open(&j, SD_JOURNAL_LOCAL_ONLY)) < 0) {
|
|
- LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_open() failed");
|
|
- iRet = RS_RET_IO_ERROR;
|
|
+ if (journalContext.j) {
|
|
+ LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imjournal: opening journal when already opened.\n");
|
|
}
|
|
- if ((r = sd_journal_get_fd(j)) < 0) {
|
|
- LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_get_fd() failed");
|
|
+ if ((r = sd_journal_open(&journalContext.j, cs.bRemote? 0 : SD_JOURNAL_LOCAL_ONLY)) < 0) {
|
|
+ LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_open() failed");
|
|
iRet = RS_RET_IO_ERROR;
|
|
}
|
|
+ journalContext.atHead = 1;
|
|
RETiRet;
|
|
}
|
|
|
|
/* trySave shoulod only be true if there is no journald error preceeding this call */
|
|
-static void closeJournal(int trySave) {
|
|
- if (cs.stateFile) { /* can't persist without a state file */
|
|
- persistJournalState(trySave);
|
|
+static void closeJournal(void) {
|
|
+ if (!journalContext.j) {
|
|
+ LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imjournal: closing NULL journal.\n");
|
|
}
|
|
- sd_journal_close(j);
|
|
+ sd_journal_close(journalContext.j);
|
|
+ journalContext.j = NULL; /* setting to NULL here as journald API will not do that for us... */
|
|
+}
|
|
+
|
|
+static int journalGetData(const char *field, const void **data, size_t *length)
|
|
+{
|
|
+ int ret;
|
|
+
|
|
+ ret = sd_journal_get_data(journalContext.j, field, data, length);
|
|
+ if (ret == -EADDRNOTAVAIL) {
|
|
+ LogError(-ret, RS_RET_ERR, "imjournal: Tried to get data without a 'next' call.\n");
|
|
+ if ((ret = sd_journal_next(journalContext.j)) < 0) {
|
|
+ LogError(-ret, RS_RET_ERR, "imjournal: sd_journal_next() failed\n");
|
|
+ } else {
|
|
+ ret = sd_journal_get_data(journalContext.j, field, data, length);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return ret;
|
|
}
|
|
|
|
|
|
@@ -223,12 +247,84 @@ sanitizeValue(const char *in, size_t len, char **out)
|
|
}
|
|
|
|
|
|
+/* Read JSON part of single journald message and return it as JSON object
|
|
+ */
|
|
+static rsRetVal
|
|
+readJSONfromJournalMsg(struct fjson_object **json)
|
|
+{
|
|
+ DEFiRet;
|
|
+ const void *get;
|
|
+ const void *equal_sign;
|
|
+ struct fjson_object *jval;
|
|
+ size_t l;
|
|
+ long prefixlen = 0;
|
|
+
|
|
+ CHKmalloc(*json = fjson_object_new_object());
|
|
+
|
|
+ SD_JOURNAL_FOREACH_DATA(journalContext.j, get, l) {
|
|
+ char *data;
|
|
+ char *name;
|
|
+
|
|
+ /* locate equal sign, this is always present */
|
|
+ equal_sign = memchr(get, '=', l);
|
|
+
|
|
+ /* ... but we know better than to trust the specs */
|
|
+ if (equal_sign == NULL) {
|
|
+ LogError(0, RS_RET_ERR, "SD_JOURNAL_FOREACH_DATA()"
|
|
+ "returned a malformed field (has no '='): '%s'", (char*)get);
|
|
+ continue; /* skip the entry */
|
|
+ }
|
|
+
|
|
+ /* get length of journal data prefix */
|
|
+ prefixlen = ((char *)equal_sign - (char *)get);
|
|
+
|
|
+ CHKmalloc(name = strndup(get, prefixlen));
|
|
+
|
|
+ prefixlen++; /* remove '=' */
|
|
+
|
|
+ CHKiRet_Hdlr(sanitizeValue(((const char *)get) + prefixlen, l - prefixlen, &data)) {
|
|
+ free (name);
|
|
+ FINALIZE;
|
|
+ }
|
|
+
|
|
+ /* and save them to json object */
|
|
+ jval = fjson_object_new_string((char *)data);
|
|
+ fjson_object_object_add(*json, name, jval);
|
|
+ free (data);
|
|
+ free (name);
|
|
+ }
|
|
+finalize_it:
|
|
+ RETiRet;
|
|
+}
|
|
+
|
|
+
|
|
+/* Try to obtain current journald cursor and save it to journalContext struct.
|
|
+ */
|
|
+static rsRetVal
|
|
+updateJournalCursor(void)
|
|
+{
|
|
+ DEFiRet;
|
|
+ char *c = NULL;
|
|
+ int r;
|
|
+
|
|
+ if ((r = sd_journal_get_cursor(journalContext.j, &c)) < 0) {
|
|
+ LogError(-r, RS_RET_ERR, "imjournal: Could not get journald cursor!\n");
|
|
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
+ }
|
|
+ /* save journal cursor (at this point we can be sure it is valid) */
|
|
+ free(journalContext.cursor);
|
|
+ journalContext.cursor = c;
|
|
+finalize_it:
|
|
+ RETiRet;
|
|
+}
|
|
+
|
|
+
|
|
/* enqueue the the journal message into the message queue.
|
|
* The provided msg string is not freed - thus must be done
|
|
* by the caller.
|
|
*/
|
|
static rsRetVal
|
|
-enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *tp, struct json_object *json,
|
|
+enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *tp, struct fjson_object *json,
|
|
int sharedJsonProperties)
|
|
{
|
|
struct syslogTime st;
|
|
@@ -267,15 +363,17 @@ int sharedJsonProperties)
|
|
STATSCOUNTER_INC(statsCounter.ctrSubmitted, statsCounter.mutCtrSubmitted);
|
|
|
|
finalize_it:
|
|
- if (iRet == RS_RET_DISCARDMSG)
|
|
+ if (iRet == RS_RET_DISCARDMSG) {
|
|
STATSCOUNTER_INC(statsCounter.ctrDiscarded, statsCounter.mutCtrDiscarded);
|
|
+ } else if (iRet != RS_RET_OK) {
|
|
+ LogError(0, RS_RET_ERR, "imjournal: error during enqMsg().\n");
|
|
+ }
|
|
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
-/* Read journal log while data are available, each read() reads one
|
|
- * record of printk buffer.
|
|
+/* Read journal log while data are available, each read() reads one journald record.
|
|
*/
|
|
static rsRetVal
|
|
readjournal(void)
|
|
@@ -285,39 +383,32 @@ readjournal(void)
|
|
struct timeval tv;
|
|
uint64_t timestamp;
|
|
|
|
- struct json_object *json = NULL;
|
|
+ struct fjson_object *json = NULL;
|
|
int r;
|
|
|
|
/* Information from messages */
|
|
char *message = NULL;
|
|
char *sys_iden;
|
|
char *sys_iden_help = NULL;
|
|
- char *c = NULL;
|
|
|
|
const void *get;
|
|
const void *pidget;
|
|
size_t length;
|
|
size_t pidlength;
|
|
|
|
- const void *equal_sign;
|
|
- struct json_object *jval;
|
|
- size_t l;
|
|
-
|
|
- long prefixlen = 0;
|
|
-
|
|
int severity = cs.iDfltSeverity;
|
|
int facility = cs.iDfltFacility;
|
|
|
|
/* Get message text */
|
|
- if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) {
|
|
- message = strdup("");
|
|
+ if (journalGetData("MESSAGE", &get, &length) < 0) {
|
|
+ CHKmalloc(message = strdup(""));
|
|
} else {
|
|
CHKiRet(sanitizeValue(((const char *)get) + 8, length - 8, &message));
|
|
}
|
|
STATSCOUNTER_INC(statsCounter.ctrRead, statsCounter.mutCtrRead);
|
|
|
|
/* Get message severity ("priority" in journald's terminology) */
|
|
- if (sd_journal_get_data(j, "PRIORITY", &get, &length) >= 0) {
|
|
+ if (journalGetData("PRIORITY", &get, &length) >= 0) {
|
|
if (length == 10) {
|
|
severity = ((char *)get)[9] - '0';
|
|
if (severity < 0 || 7 < severity) {
|
|
@@ -332,7 +423,7 @@ readjournal(void)
|
|
}
|
|
|
|
/* Get syslog facility */
|
|
- if (sd_journal_get_data(j, "SYSLOG_FACILITY", &get, &length) >= 0) {
|
|
+ if (journalGetData("SYSLOG_FACILITY", &get, &length) >= 0) {
|
|
// Note: the journal frequently contains invalid facilities!
|
|
if (length == 17 || length == 18) {
|
|
facility = ((char *)get)[16] - '0';
|
|
@@ -352,14 +443,14 @@ readjournal(void)
|
|
}
|
|
|
|
/* Get message identifier, client pid and add ':' */
|
|
- if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) {
|
|
+ if (journalGetData("SYSLOG_IDENTIFIER", &get, &length) >= 0) {
|
|
CHKiRet(sanitizeValue(((const char *)get) + 18, length - 18, &sys_iden));
|
|
} else {
|
|
CHKmalloc(sys_iden = strdup("journal"));
|
|
}
|
|
|
|
/* trying to get PID, default is "SYSLOG_PID" property */
|
|
- if (sd_journal_get_data(j, pidFieldName, &pidget, &pidlength) >= 0) {
|
|
+ if (journalGetData(pidFieldName, &pidget, &pidlength) >= 0) {
|
|
char *sys_pid;
|
|
int val_ofs;
|
|
|
|
@@ -372,7 +463,7 @@ readjournal(void)
|
|
free (sys_pid);
|
|
} else {
|
|
/* this is fallback, "SYSLOG_PID" doesn't exist so trying to get "_PID" property */
|
|
- if (bPidFallBack && sd_journal_get_data(j, "_PID", &pidget, &pidlength) >= 0) {
|
|
+ if (bPidFallBack && journalGetData("_PID", &pidget, &pidlength) >= 0) {
|
|
char *sys_pid;
|
|
int val_ofs;
|
|
|
|
@@ -396,55 +487,15 @@ readjournal(void)
|
|
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
|
}
|
|
|
|
- json = json_object_new_object();
|
|
-
|
|
- SD_JOURNAL_FOREACH_DATA(j, get, l) {
|
|
- char *data;
|
|
- char *name;
|
|
-
|
|
- /* locate equal sign, this is always present */
|
|
- equal_sign = memchr(get, '=', l);
|
|
-
|
|
- /* ... but we know better than to trust the specs */
|
|
- if (equal_sign == NULL) {
|
|
- LogError(0, RS_RET_ERR, "SD_JOURNAL_FOREACH_DATA()"
|
|
- "returned a malformed field (has no '='): '%s'", (char*)get);
|
|
- continue; /* skip the entry */
|
|
- }
|
|
-
|
|
- /* get length of journal data prefix */
|
|
- prefixlen = ((char *)equal_sign - (char *)get);
|
|
-
|
|
- name = strndup(get, prefixlen);
|
|
- CHKmalloc(name);
|
|
-
|
|
- prefixlen++; /* remove '=' */
|
|
-
|
|
- CHKiRet_Hdlr(sanitizeValue(((const char *)get) + prefixlen, l - prefixlen, &data)) {
|
|
- free (name);
|
|
- FINALIZE;
|
|
- }
|
|
-
|
|
- /* and save them to json object */
|
|
- jval = json_object_new_string((char *)data);
|
|
- json_object_object_add(json, name, jval);
|
|
- free (data);
|
|
- free (name);
|
|
- }
|
|
+ CHKiRet(readJSONfromJournalMsg(&json));
|
|
|
|
/* calculate timestamp */
|
|
- if (sd_journal_get_realtime_usec(j, ×tamp) >= 0) {
|
|
+ if (sd_journal_get_realtime_usec(journalContext.j, ×tamp) >= 0) {
|
|
tv.tv_sec = timestamp / 1000000;
|
|
tv.tv_usec = timestamp % 1000000;
|
|
}
|
|
|
|
- if (cs.bWorkAroundJournalBug) {
|
|
- /* save journal cursor (at this point we can be sure it is valid) */
|
|
- if (!sd_journal_get_cursor(j, &c)) {
|
|
- free(last_cursor);
|
|
- last_cursor = c;
|
|
- }
|
|
- }
|
|
+ iRet = updateJournalCursor();
|
|
|
|
/* submit message */
|
|
enqMsg((uchar *)message, (uchar *) sys_iden_help, facility, severity, &tv, json, 0);
|
|
@@ -456,32 +507,22 @@ readjournal(void)
|
|
}
|
|
|
|
|
|
-/* This function gets journal cursor and saves it into state file.
|
|
- * If WorkAroundJournalBug option is turned on it does use cursor saved previously.
|
|
- * If it is false and if "trySave" is false it skips altogether.
|
|
+/* This function saves journal cursor into state file.
|
|
+ * It must be checked that stateFile is configured prior to calling this.
|
|
*/
|
|
static rsRetVal
|
|
-persistJournalState(int trySave)
|
|
+persistJournalState(void)
|
|
{
|
|
DEFiRet;
|
|
FILE *sf = NULL; /* state file */
|
|
char tmp_sf[MAXFNAME];
|
|
size_t n;
|
|
|
|
- if (cs.bWorkAroundJournalBug) {
|
|
- /* first check that we have valid cursor */
|
|
- if (!last_cursor) {
|
|
- ABORT_FINALIZE(RS_RET_OK);
|
|
- }
|
|
- } else if (trySave) {
|
|
- int ret;
|
|
- free(last_cursor);
|
|
- if ((ret = sd_journal_get_cursor(j, &last_cursor))) {
|
|
- LogError(-ret, RS_RET_ERR, "imjournal: sd_journal_get_cursor() failed");
|
|
- last_cursor = NULL;
|
|
- ABORT_FINALIZE(RS_RET_ERR);
|
|
- }
|
|
- } else { /* not trying to get cursor out of invalid journal state */
|
|
+ DBGPRINTF("Persisting journal position, cursor: %s, at head? %d\n",
|
|
+ journalContext.cursor, journalContext.atHead);
|
|
+
|
|
+ /* first check that we have valid cursor */
|
|
+ if (!journalContext.cursor) {
|
|
ABORT_FINALIZE(RS_RET_OK);
|
|
}
|
|
|
|
@@ -500,7 +541,7 @@ persistJournalState(int trySave)
|
|
ABORT_FINALIZE(RS_RET_FOPEN_FAILURE);
|
|
}
|
|
|
|
- if(fputs(last_cursor, sf) == EOF) {
|
|
+ if(fputs(journalContext.cursor, sf) == EOF) {
|
|
LogError(errno, RS_RET_IO_ERROR, "imjournal: failed to save cursor to: '%s'", tmp_sf);
|
|
ABORT_FINALIZE(RS_RET_IO_ERROR);
|
|
}
|
|
@@ -541,43 +582,59 @@ persistJournalState(int trySave)
|
|
|
|
static rsRetVal skipOldMessages(void);
|
|
|
|
-#define POLL_TIMEOUT 900000 /* timeout for poll is 900ms */
|
|
-
|
|
static rsRetVal
|
|
-pollJournal(void)
|
|
+handleRotation(void)
|
|
{
|
|
DEFiRet;
|
|
- int err;
|
|
+ int r;
|
|
|
|
- err = sd_journal_wait(j, POLL_TIMEOUT);
|
|
- if (err == SD_JOURNAL_INVALIDATE && !reloaded) {
|
|
- STATSCOUNTER_INC(statsCounter.ctrRotations, statsCounter.mutCtrRotations);
|
|
- closeJournal(0);
|
|
+ LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal files changed, reloading...\n");
|
|
+ STATSCOUNTER_INC(statsCounter.ctrRotations, statsCounter.mutCtrRotations);
|
|
+ closeJournal();
|
|
|
|
- iRet = openJournal();
|
|
- if (iRet != RS_RET_OK) {
|
|
- ABORT_FINALIZE(RS_RET_ERR);
|
|
- }
|
|
+ iRet = openJournal();
|
|
+ if (iRet != RS_RET_OK) {
|
|
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
+ }
|
|
|
|
- /* If we have locally saved cursor there is no need to read it from state file */
|
|
- if (cs.bWorkAroundJournalBug && last_cursor)
|
|
- {
|
|
- if (sd_journal_seek_cursor(j, last_cursor) != 0) {
|
|
- LogError(0, RS_RET_ERR, "imjournal: "
|
|
- "couldn't seek to cursor `%s'\n", last_cursor);
|
|
- iRet = RS_RET_ERR;
|
|
- }
|
|
- /* Need to advance because cursor points at last processed message */
|
|
- sd_journal_next(j);
|
|
+ /* If we have locally saved cursor there is no need to read it from state file */
|
|
+ if (journalContext.cursor)
|
|
+ {
|
|
+ if (sd_journal_seek_cursor(journalContext.j, journalContext.cursor) != 0) {
|
|
+ LogError(0, RS_RET_ERR, "imjournal: "
|
|
+ "couldn't seek to cursor `%s'\n", journalContext.cursor);
|
|
+ iRet = RS_RET_ERR;
|
|
}
|
|
- else if (cs.stateFile) {
|
|
- iRet = loadJournalState();
|
|
+ journalContext.atHead = 0;
|
|
+ /* Need to advance because cursor points at last processed message */
|
|
+ if ((r = sd_journal_next(journalContext.j)) < 0) {
|
|
+ LogError(-r, RS_RET_ERR, "imjournal: sd_journal_next() failed");
|
|
+ iRet = RS_RET_ERR;
|
|
}
|
|
- LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded...");
|
|
- reloaded = 1;
|
|
+ }
|
|
+ else if (cs.stateFile) {
|
|
+ iRet = loadJournalState();
|
|
+ }
|
|
+ journalContext.reloaded = 1;
|
|
+
|
|
+finalize_it:
|
|
+ RETiRet;
|
|
+}
|
|
+
|
|
+#define POLL_TIMEOUT 900000 /* timeout for poll is 900ms */
|
|
+
|
|
+static rsRetVal
|
|
+pollJournal(void)
|
|
+{
|
|
+ DEFiRet;
|
|
+ int err;
|
|
+
|
|
+ err = sd_journal_wait(journalContext.j, POLL_TIMEOUT);
|
|
+ if (err == SD_JOURNAL_INVALIDATE && !journalContext.reloaded) {
|
|
+ CHKiRet(handleRotation());
|
|
}
|
|
else {
|
|
- reloaded = 0;
|
|
+ journalContext.reloaded = 0;
|
|
}
|
|
|
|
finalize_it:
|
|
@@ -591,12 +648,13 @@ skipOldMessages(void)
|
|
int r;
|
|
DEFiRet;
|
|
|
|
- if ((r = sd_journal_seek_tail(j)) < 0) {
|
|
+ if ((r = sd_journal_seek_tail(journalContext.j)) < 0) {
|
|
LogError(-r, RS_RET_ERR,
|
|
"imjournal: sd_journal_seek_tail() failed");
|
|
ABORT_FINALIZE(RS_RET_ERR);
|
|
}
|
|
- if ((r = sd_journal_previous(j)) < 0) {
|
|
+ journalContext.atHead = 0;
|
|
+ if ((r = sd_journal_previous(journalContext.j)) < 0) {
|
|
LogError(-r, RS_RET_ERR,
|
|
"imjournal: sd_journal_previous() failed");
|
|
ABORT_FINALIZE(RS_RET_ERR);
|
|
@@ -615,6 +673,9 @@ loadJournalState(void)
|
|
int r;
|
|
FILE *r_sf;
|
|
|
|
+ DBGPRINTF("Loading journal position, at head? %d, reloaded? %d\n",
|
|
+ journalContext.atHead, journalContext.reloaded);
|
|
+
|
|
if (cs.stateFile[0] != '/') {
|
|
char *new_stateFile;
|
|
if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) {
|
|
@@ -639,13 +700,14 @@ loadJournalState(void)
|
|
if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) {
|
|
char readCursor[128 + 1];
|
|
if (fscanf(r_sf, "%128s\n", readCursor) != EOF) {
|
|
- if (sd_journal_seek_cursor(j, readCursor) != 0) {
|
|
+ if (sd_journal_seek_cursor(journalContext.j, readCursor) != 0) {
|
|
LogError(0, RS_RET_ERR, "imjournal: "
|
|
"couldn't seek to cursor `%s'\n", readCursor);
|
|
iRet = RS_RET_ERR;
|
|
} else {
|
|
+ journalContext.atHead = 0;
|
|
char * tmp_cursor = NULL;
|
|
- sd_journal_next(j);
|
|
+ sd_journal_next(journalContext.j);
|
|
/*
|
|
* This is resolving the situation when system is after reboot and boot_id
|
|
* doesn't match so cursor pointing into "future".
|
|
@@ -658,14 +720,15 @@ loadJournalState(void)
|
|
* but if cursor has been intentionally compromised it could stop logging even
|
|
* with persistent journal.
|
|
* */
|
|
- if ((r = sd_journal_get_cursor(j, &tmp_cursor)) < 0) {
|
|
+ if ((r = sd_journal_get_cursor(journalContext.j, &tmp_cursor)) < 0) {
|
|
LogError(-r, RS_RET_IO_ERROR, "imjournal: "
|
|
"loaded invalid cursor, seeking to the head of journal\n");
|
|
- if ((r = sd_journal_seek_head(j)) < 0) {
|
|
+ if ((r = sd_journal_seek_head(journalContext.j)) < 0) {
|
|
LogError(-r, RS_RET_ERR, "imjournal: "
|
|
"sd_journal_seek_head() failed, when cursor is invalid\n");
|
|
iRet = RS_RET_ERR;
|
|
}
|
|
+ journalContext.atHead = 1;
|
|
}
|
|
free(tmp_cursor);
|
|
}
|
|
@@ -680,18 +743,15 @@ loadJournalState(void)
|
|
if (iRet != RS_RET_OK && cs.bIgnoreNonValidStatefile) {
|
|
/* ignore state file errors */
|
|
iRet = RS_RET_OK;
|
|
- LogError(0, NO_ERRCODE, "imjournal: ignoring invalid state file %s",
|
|
- cs.stateFile);
|
|
+ LogError(0, NO_ERRCODE, "imjournal: ignoring invalid state file %s", cs.stateFile);
|
|
if (cs.bIgnorePrevious) {
|
|
skipOldMessages();
|
|
}
|
|
}
|
|
} else {
|
|
- LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: "
|
|
- "open on state file `%s' failed\n", cs.stateFile);
|
|
+ LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: open on state file `%s' failed\n", cs.stateFile);
|
|
if (cs.bIgnorePrevious) {
|
|
- /* Seek to the very end of the journal and ignore all
|
|
- * older messages. */
|
|
+ /* Seek to the very end of the journal and ignore all older messages. */
|
|
skipOldMessages();
|
|
}
|
|
}
|
|
@@ -704,7 +764,7 @@ static void
|
|
tryRecover(void) {
|
|
LogMsg(0, RS_RET_OK, LOG_INFO, "imjournal: trying to recover from journal error");
|
|
STATSCOUNTER_INC(statsCounter.ctrRecoveryAttempts, statsCounter.mutCtrRecoveryAttempts);
|
|
- closeJournal(0);
|
|
+ closeJournal();
|
|
srSleep(10, 0); // do not hammer machine with too-frequent retries
|
|
openJournal();
|
|
}
|
|
@@ -723,8 +783,7 @@ CODESTARTrunInput
|
|
/* Load our position in the journal from the state file. */
|
|
CHKiRet(loadJournalState());
|
|
} else if (cs.bIgnorePrevious) {
|
|
- /* Seek to the very end of the journal and ignore all
|
|
- * older messages. */
|
|
+ /* Seek to the very end of the journal and ignore all older messages. */
|
|
skipOldMessages();
|
|
}
|
|
|
|
@@ -758,7 +817,7 @@ CODESTARTrunInput
|
|
while (glbl.GetGlobalInputTermState() == 0) {
|
|
int r;
|
|
|
|
- r = sd_journal_next(j);
|
|
+ r = sd_journal_next(journalContext.j);
|
|
if (r < 0) {
|
|
LogError(-r, RS_RET_ERR, "imjournal: sd_journal_next() failed");
|
|
tryRecover();
|
|
@@ -766,8 +825,12 @@ CODESTARTrunInput
|
|
}
|
|
|
|
if (r == 0) {
|
|
+ if (journalContext.atHead) {
|
|
+ LogMsg(0, RS_RET_OK, LOG_WARNING, "imjournal: "
|
|
+ "Journal indicates no msgs when positioned at head.\n");
|
|
+ }
|
|
/* No new messages, wait for activity. */
|
|
- if (pollJournal() != RS_RET_OK && !reloaded) {
|
|
+ if (pollJournal() != RS_RET_OK && !journalContext.reloaded) {
|
|
tryRecover();
|
|
}
|
|
continue;
|
|
@@ -776,7 +839,7 @@ CODESTARTrunInput
|
|
/*
|
|
* update journal disk usage before reading the new message.
|
|
*/
|
|
- const int e = sd_journal_get_usage(j, (uint64_t *)&statsCounter.diskUsageBytes);
|
|
+ const int e = sd_journal_get_usage(journalContext.j, (uint64_t *)&statsCounter.diskUsageBytes);
|
|
if (e < 0) {
|
|
LogError(-e, RS_RET_ERR, "imjournal: sd_get_usage() failed");
|
|
}
|
|
@@ -787,21 +850,12 @@ CODESTARTrunInput
|
|
}
|
|
|
|
count++;
|
|
-
|
|
- if ((count % J_PROCESS_PERIOD) == 0) {
|
|
- /* Give the journal a periodic chance to detect rotated journal files to be cleaned up. */
|
|
- r = sd_journal_process(j);
|
|
- if (r < 0) {
|
|
- LogError(-r, RS_RET_ERR, "imjournal: sd_journal_process() failed");
|
|
- tryRecover();
|
|
- continue;
|
|
- }
|
|
- }
|
|
+ journalContext.atHead = 0;
|
|
|
|
if (cs.stateFile) { /* can't persist without a state file */
|
|
/* TODO: This could use some finer metric. */
|
|
if ((count % cs.iPersistStateInterval) == 0) {
|
|
- persistJournalState(1);
|
|
+ persistJournalState();
|
|
}
|
|
}
|
|
}
|
|
@@ -825,6 +879,7 @@ CODESTARTbeginCnfLoad
|
|
cs.usePid = NULL;
|
|
cs.bWorkAroundJournalBug = 1;
|
|
cs.bFsync = 0;
|
|
+ cs.bRemote = 0;
|
|
ENDbeginCnfLoad
|
|
|
|
|
|
@@ -881,7 +936,7 @@ BEGINfreeCnf
|
|
CODESTARTfreeCnf
|
|
free(cs.stateFile);
|
|
free(cs.usePid);
|
|
- free(last_cursor);
|
|
+ free(journalContext.cursor);
|
|
statsobj.Destruct(&(statsCounter.stats));
|
|
ENDfreeCnf
|
|
|
|
@@ -894,7 +949,10 @@ ENDwillRun
|
|
/* close journal */
|
|
BEGINafterRun
|
|
CODESTARTafterRun
|
|
- closeJournal(1);
|
|
+ if (cs.stateFile) { /* can't persist without a state file */
|
|
+ persistJournalState();
|
|
+ }
|
|
+ closeJournal();
|
|
ratelimitDestruct(ratelimiter);
|
|
ENDafterRun
|
|
|
|
@@ -966,6 +1024,8 @@ CODESTARTsetModCnf
|
|
cs.bWorkAroundJournalBug = (int) pvals[i].val.d.n;
|
|
} else if (!strcmp(modpblk.descr[i].name, "fsync")) {
|
|
cs.bFsync = (int) pvals[i].val.d.n;
|
|
+ } else if (!strcmp(modpblk.descr[i].name, "remote")) {
|
|
+ cs.bRemote = (int) pvals[i].val.d.n;
|
|
} else {
|
|
dbgprintf("imjournal: program error, non-handled "
|
|
"param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
|
|
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
|
|
index 8008f159c4..18e4d25ebb 100644
|
|
--- a/plugins/imjournal/imjournal.c
|
|
+++ b/plugins/imjournal/imjournal.c
|
|
@@ -3,7 +3,7 @@
|
|
* To test under Linux:
|
|
* emmit log message into systemd journal
|
|
*
|
|
- * Copyright (C) 2008-2017 Adiscon GmbH
|
|
+ * Copyright (C) 2008-2019 Adiscon GmbH
|
|
*
|
|
* This file is part of rsyslog.
|
|
*
|
|
@@ -676,16 +676,6 @@ loadJournalState(void)
|
|
DBGPRINTF("Loading journal position, at head? %d, reloaded? %d\n",
|
|
journalContext.atHead, journalContext.reloaded);
|
|
|
|
- if (cs.stateFile[0] != '/') {
|
|
- char *new_stateFile;
|
|
- if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) {
|
|
- LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n");
|
|
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
|
- }
|
|
- free (cs.stateFile);
|
|
- cs.stateFile = new_stateFile;
|
|
- }
|
|
-
|
|
/* if state file not exists (on very first run), skip */
|
|
if (access(cs.stateFile, F_OK|R_OK) == -1 && errno == ENOENT) {
|
|
if (cs.bIgnorePrevious) {
|
|
@@ -885,6 +875,17 @@ ENDbeginCnfLoad
|
|
|
|
BEGINendCnfLoad
|
|
CODESTARTendCnfLoad
|
|
+ /* bad trick to handle old and new style config all in old-style var */
|
|
+ if(cs.stateFile != NULL && cs.stateFile[0] != '/') {
|
|
+ char *new_stateFile;
|
|
+ if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) {
|
|
+ LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n");
|
|
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
|
+ }
|
|
+ free (cs.stateFile);
|
|
+ cs.stateFile = new_stateFile;
|
|
+ }
|
|
+finalize_it:
|
|
ENDendCnfLoad
|
|
|
|
|
|
@@ -1032,7 +1033,6 @@ CODESTARTsetModCnf
|
|
}
|
|
}
|
|
|
|
-
|
|
finalize_it:
|
|
if (pvals != NULL)
|
|
cnfparamvalsDestruct(pvals, &modpblk);
|