70082ab638
pmproxy and libpcp_web fixes. Adds pcp-ss tool. Resolves: rhbz#1981222 Resolves: rhbz#1981223
923 lines
32 KiB
Diff
923 lines
32 KiB
Diff
3f5ba2218 libpcp_web: add mutex to struct webgroup protecting the context dict
|
|
107633192 src/libpcp: be more careful when calling __pmLogChangeVol()
|
|
49bdfdfff libpcp: redefine __pmLogSetTime()
|
|
5e3b792d3 libpcp_web: plug mem leak in redisMapInsert during daily log-rolling
|
|
2a00a90b0 libpcp_web/discovery: improve lock handling and scalability
|
|
|
|
commit 3f5ba221842e6a02e9fb22e23c754854271c3c9a
|
|
Author: Mark Goodwin <mgoodwin@redhat.com>
|
|
Date: Wed Jun 9 16:44:30 2021 +1000
|
|
|
|
libpcp_web: add mutex to struct webgroup protecting the context dict
|
|
|
|
Add a mutex to the local webgroups structure in libpcp_web and
|
|
use it to protect multithreaded parallel updates (dictAdd,
|
|
dictDelete) to the groups->contexts dict and the dict traversal
|
|
in the timer driven garbage collector.
|
|
|
|
Tested by qa/297 and related tests and also an updated version
|
|
of qa/1457 (which now stress tests parallel http and https/tls
|
|
pmproxy RESTAPI calls .. in a later commit).
|
|
|
|
Related: RHBZ#1947989
|
|
Resolves: https://github.com/performancecopilot/pcp/issues/1311
|
|
|
|
diff --git a/src/libpcp_web/src/webgroup.c b/src/libpcp_web/src/webgroup.c
|
|
index 08c2518ed..35f05441b 100644
|
|
--- a/src/libpcp_web/src/webgroup.c
|
|
+++ b/src/libpcp_web/src/webgroup.c
|
|
@@ -51,14 +51,20 @@ typedef struct webgroups {
|
|
uv_loop_t *events;
|
|
unsigned int active;
|
|
uv_timer_t timer;
|
|
+ uv_mutex_t mutex;
|
|
} webgroups;
|
|
|
|
static struct webgroups *
|
|
webgroups_lookup(pmWebGroupModule *module)
|
|
{
|
|
- if (module->privdata == NULL)
|
|
+ struct webgroups *groups = module->privdata;
|
|
+
|
|
+ if (module->privdata == NULL) {
|
|
module->privdata = calloc(1, sizeof(struct webgroups));
|
|
- return (struct webgroups *)module->privdata;
|
|
+ groups = (struct webgroups *)module->privdata;
|
|
+ uv_mutex_init(&groups->mutex);
|
|
+ }
|
|
+ return groups;
|
|
}
|
|
|
|
static int
|
|
@@ -94,8 +100,11 @@ webgroup_drop_context(struct context *context, struct webgroups *groups)
|
|
context->garbage = 1;
|
|
uv_timer_stop(&context->timer);
|
|
}
|
|
- if (groups)
|
|
+ if (groups) {
|
|
+ uv_mutex_lock(&groups->mutex);
|
|
dictDelete(groups->contexts, &context->randomid);
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
+ }
|
|
uv_close((uv_handle_t *)&context->timer, webgroup_release_context);
|
|
}
|
|
}
|
|
@@ -207,13 +216,16 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
|
|
cp->context = -1;
|
|
cp->timeout = polltime;
|
|
|
|
+ uv_mutex_lock(&groups->mutex);
|
|
if ((cp->randomid = random()) < 0 ||
|
|
dictFind(groups->contexts, &cp->randomid) != NULL) {
|
|
infofmt(*message, "random number failure on new web context");
|
|
pmwebapi_free_context(cp);
|
|
*status = -ESRCH;
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
return NULL;
|
|
}
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
cp->origin = sdscatfmt(sdsempty(), "%i", cp->randomid);
|
|
cp->name.sds = sdsdup(hostspec ? hostspec : LOCALHOST);
|
|
cp->realm = sdscatfmt(sdsempty(), "pmapi/%i", cp->randomid);
|
|
@@ -242,7 +254,9 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
|
|
pmwebapi_free_context(cp);
|
|
return NULL;
|
|
}
|
|
+ uv_mutex_lock(&groups->mutex);
|
|
dictAdd(groups->contexts, &cp->randomid, cp);
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
|
|
/* leave until the end because uv_timer_init makes this visible in uv_run */
|
|
handle = (uv_handle_t *)&cp->timer;
|
|
@@ -261,25 +275,34 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
|
|
static void
|
|
webgroup_garbage_collect(struct webgroups *groups)
|
|
{
|
|
- dictIterator *iterator = dictGetSafeIterator(groups->contexts);
|
|
+ dictIterator *iterator;
|
|
dictEntry *entry;
|
|
context_t *cp;
|
|
|
|
if (pmDebugOptions.http || pmDebugOptions.libweb)
|
|
fprintf(stderr, "%s: started\n", "webgroup_garbage_collect");
|
|
|
|
- while ((entry = dictNext(iterator)) != NULL) {
|
|
- cp = (context_t *)dictGetVal(entry);
|
|
- if (cp->garbage && cp->privdata == groups) {
|
|
- if (pmDebugOptions.http || pmDebugOptions.libweb)
|
|
- fprintf(stderr, "GC context %u (%p)\n", cp->randomid, cp);
|
|
- webgroup_drop_context(cp, groups);
|
|
+ /* do context GC if we get the lock (else don't block here) */
|
|
+ if (uv_mutex_trylock(&groups->mutex) == 0) {
|
|
+ iterator = dictGetSafeIterator(groups->contexts);
|
|
+ for (entry = dictNext(iterator); entry;) {
|
|
+ cp = (context_t *)dictGetVal(entry);
|
|
+ entry = dictNext(iterator);
|
|
+ if (cp->garbage && cp->privdata == groups) {
|
|
+ if (pmDebugOptions.http || pmDebugOptions.libweb)
|
|
+ fprintf(stderr, "GC context %u (%p)\n", cp->randomid, cp);
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
+ webgroup_drop_context(cp, groups);
|
|
+ uv_mutex_lock(&groups->mutex);
|
|
+ }
|
|
}
|
|
+ dictReleaseIterator(iterator);
|
|
+ uv_mutex_unlock(&groups->mutex);
|
|
}
|
|
- dictReleaseIterator(iterator);
|
|
|
|
/* TODO - trim maps, particularly instmap if proc metrics are not excluded */
|
|
|
|
+ /* TODO move the following to a new stats timer */
|
|
if (groups->metrics_handle) {
|
|
mmv_stats_set(groups->metrics_handle, "contextmap.size",
|
|
NULL, dictSize(contextmap));
|
|
|
|
commit 107633192326b27ae571d4d4955052b8d86222c2
|
|
Author: Ken McDonell <kenj@kenj.id.au>
|
|
Date: Fri Jul 2 16:52:48 2021 +1000
|
|
|
|
src/libpcp: be more careful when calling __pmLogChangeVol()
|
|
|
|
Mark observed a SEGV which looks like __pmLogFetch() died because
|
|
ctxp->c_archctl->ac_mfp was (unexpectedly) NULL.
|
|
|
|
See: https://github.com/performancecopilot/pcp/issues/1338
|
|
|
|
Initial guess is that a physical file was removed by concurrent
|
|
activity (like pmlogger_check or pmlogger_daily), causing
|
|
__pmLogChangeVol() to fail ... and this was not being checked for
|
|
on the __pmLogFetch() path and in a couple of other places.
|
|
|
|
modified: interp.c
|
|
modified: logutil.c
|
|
|
|
diff --git a/src/libpcp/src/interp.c b/src/libpcp/src/interp.c
|
|
index d7effbc1e..c8f6fe382 100644
|
|
--- a/src/libpcp/src/interp.c
|
|
+++ b/src/libpcp/src/interp.c
|
|
@@ -1312,7 +1312,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
|
|
}
|
|
|
|
/* get to the last remembered place */
|
|
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ if (sts < 0)
|
|
+ goto all_done;
|
|
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
|
|
|
|
seen_mark = 0; /* interested in <mark> records seen from here on */
|
|
@@ -1397,7 +1399,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
|
|
* at least one metric requires a bound from earlier in the log ...
|
|
* position ourselves, ... and search
|
|
*/
|
|
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ if (sts < 0)
|
|
+ goto all_done;
|
|
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
|
|
done = 0;
|
|
|
|
@@ -1542,7 +1546,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
|
|
* at least one metric requires a bound from later in the log ...
|
|
* position ourselves ... and search
|
|
*/
|
|
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ if (sts < 0)
|
|
+ goto all_done;
|
|
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
|
|
done = 0;
|
|
|
|
diff --git a/src/libpcp/src/logutil.c b/src/libpcp/src/logutil.c
|
|
index fe35ed422..0ef76de25 100644
|
|
--- a/src/libpcp/src/logutil.c
|
|
+++ b/src/libpcp/src/logutil.c
|
|
@@ -1992,7 +1992,10 @@ __pmLogFetch(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **result)
|
|
all_derived = check_all_derived(numpmid, pmidlist);
|
|
|
|
/* re-establish position */
|
|
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
+ if (sts < 0)
|
|
+ goto func_return;
|
|
+ assert(ctxp->c_archctl->ac_mfp != NULL);
|
|
__pmFseek(ctxp->c_archctl->ac_mfp,
|
|
(long)ctxp->c_archctl->ac_offset, SEEK_SET);
|
|
|
|
@@ -2489,10 +2492,12 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
/* index either not available, or not useful */
|
|
if (mode == PM_MODE_FORW) {
|
|
__pmLogChangeVol(acp, lcp->l_minvol);
|
|
+ assert(acp->ac_mfp != NULL);
|
|
__pmFseek(acp->ac_mfp, (long)(sizeof(__pmLogLabel) + 2*sizeof(int)), SEEK_SET);
|
|
}
|
|
else if (mode == PM_MODE_BACK) {
|
|
__pmLogChangeVol(acp, lcp->l_maxvol);
|
|
+ assert(acp->ac_mfp != NULL);
|
|
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
|
|
}
|
|
|
|
@@ -3141,6 +3146,7 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
|
|
|
|
/* Set up to scan backwards from the end of the archive. */
|
|
__pmLogChangeVol(acp, lcp->l_maxvol);
|
|
+ assert(acp->ac_mfp != NULL);
|
|
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
|
|
ctxp->c_archctl->ac_offset = __pmFtell(acp->ac_mfp);
|
|
assert(ctxp->c_archctl->ac_offset >= 0);
|
|
|
|
commit 49bdfdfff83ac165de2bdc9a40e61a56512585d8
|
|
Author: Ken McDonell <kenj@kenj.id.au>
|
|
Date: Sun Jul 4 10:07:09 2021 +1000
|
|
|
|
libpcp: redefine __pmLogSetTime()
|
|
|
|
The problem is that if physical files for the data volumes of an
|
|
archive are removed (asynchronously by someone else) while we're
|
|
trying to switch volumes then we don't handle this safely.
|
|
|
|
The previous commit 10763319 was as stop-gap to address Mark's SEGV
|
|
issue at https://github.com/performancecopilot/pcp/issues/1338 and
|
|
simply handled direct calls to __pmLogChangeVol() and ensured the
|
|
return status was checked.
|
|
|
|
I was aware, then Coverity made a lot more people aware, that this
|
|
"fix" was incomplete, specifically the calls to __pmLogChangeVol()
|
|
from within __pmLogSetTime() were not checked.
|
|
|
|
To fix the latter we have to change the type of __pmLogSetTime() from
|
|
void to int so we can return status to indicate that __pmLogChangeVol()
|
|
has failed. And then make sure all the callers of __pmLogSetTime()
|
|
check the status returned from that function.
|
|
|
|
modified: src/libpcp/src/fetch.c
|
|
modified: src/libpcp/src/internal.h
|
|
modified: src/libpcp/src/logutil.c
|
|
|
|
Because this introduces some new -Dlog diagnostics, qa/251 needed
|
|
a bit of a make-over.
|
|
|
|
diff --git a/qa/251 b/qa/251
|
|
index 2b8a07917..f9b293e98 100755
|
|
--- a/qa/251
|
|
+++ b/qa/251
|
|
@@ -37,7 +37,7 @@ _filter()
|
|
|
|
status=1 # failure is the default!
|
|
$sudo rm -rf $tmp.* $seq.full
|
|
-trap "cd $here; rm -rf $tmp; exit \$status" 0 1 2 3 15
|
|
+trap "cd $here; rm -rf $tmp $tmp.*; exit \$status" 0 1 2 3 15
|
|
|
|
# real QA test starts here
|
|
mkdir $tmp
|
|
@@ -50,56 +50,62 @@ cd $tmp
|
|
for inst in "bin-100" "bin-100,bin-500,bin-900"
|
|
do
|
|
echo
|
|
- echo "All volumes present ... $inst ..."
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ echo "All volumes present ... $inst ..." | tee -a $here/$seq.full
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
|
|
echo
|
|
- echo "First volume missing ... $inst ..."
|
|
+ echo "First volume missing ... $inst ..." | tee -a $here/$seq.full
|
|
mv ok-mv-bar.0 foo.0
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
mv foo.0 ok-mv-bar.0
|
|
|
|
echo
|
|
- echo "Last volume missing ... $inst ..."
|
|
+ echo "Last volume missing ... $inst ..." | tee -a $here/$seq.full
|
|
mv ok-mv-bar.3 foo.3
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
mv foo.3 ok-mv-bar.3
|
|
|
|
echo
|
|
- echo "Second volume missing ... $inst ..."
|
|
+ echo "Second volume missing ... $inst ..." | tee -a $here/$seq.full
|
|
mv ok-mv-bar.1 foo.1
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
mv foo.1 ok-mv-bar.1
|
|
|
|
echo
|
|
- echo "Second last volume missing ... $inst ..."
|
|
+ echo "Second last volume missing ... $inst ..." | tee -a $here/$seq.full
|
|
mv ok-mv-bar.2 foo.2
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
mv foo.2 ok-mv-bar.2
|
|
|
|
echo
|
|
- echo "All volumes but second missing ... $inst ..."
|
|
+ echo "All volumes but second missing ... $inst ..." | tee -a $here/$seq.full
|
|
mv ok-mv-bar.0 foo.0
|
|
mv ok-mv-bar.2 foo.2
|
|
mv ok-mv-bar.3 foo.3
|
|
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
|
|
- egrep 'Skip|Change' err
|
|
- _filter <out
|
|
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
|
|
+ cat $tmp.err >>$here/$seq.full
|
|
+ grep '^__pmLogChangeVol:' $tmp.err
|
|
+ _filter <$tmp.out
|
|
[ -f die ] && exit
|
|
mv foo.0 ok-mv-bar.0
|
|
mv foo.2 ok-mv-bar.2
|
|
diff --git a/src/libpcp/src/fetch.c b/src/libpcp/src/fetch.c
|
|
index 5328a2807..01d5bf7fc 100644
|
|
--- a/src/libpcp/src/fetch.c
|
|
+++ b/src/libpcp/src/fetch.c
|
|
@@ -458,6 +458,7 @@ pmSetMode(int mode, const struct timeval *when, int delta)
|
|
/* assume PM_CONTEXT_ARCHIVE */
|
|
if (l_mode == PM_MODE_INTERP ||
|
|
l_mode == PM_MODE_FORW || l_mode == PM_MODE_BACK) {
|
|
+ int lsts;
|
|
if (when != NULL) {
|
|
/*
|
|
* special case of NULL for timestamp
|
|
@@ -468,7 +469,18 @@ pmSetMode(int mode, const struct timeval *when, int delta)
|
|
}
|
|
ctxp->c_mode = mode;
|
|
ctxp->c_delta = delta;
|
|
- __pmLogSetTime(ctxp);
|
|
+ lsts = __pmLogSetTime(ctxp);
|
|
+ if (lsts < 0) {
|
|
+ /*
|
|
+ * most unlikely; not much we can do here but expect
|
|
+ * PMAPI error to be returned once pmFetch's start
|
|
+ */
|
|
+ if (pmDebugOptions.log) {
|
|
+ char errmsg[PM_MAXERRMSGLEN];
|
|
+ fprintf(stderr, "pmSetMode: __pmLogSetTime failed: %s\n",
|
|
+ pmErrStr_r(lsts, errmsg, sizeof(errmsg)));
|
|
+ }
|
|
+ }
|
|
__pmLogResetInterp(ctxp);
|
|
sts = 0;
|
|
}
|
|
diff --git a/src/libpcp/src/internal.h b/src/libpcp/src/internal.h
|
|
index 977efdcf6..fd8d6e740 100644
|
|
--- a/src/libpcp/src/internal.h
|
|
+++ b/src/libpcp/src/internal.h
|
|
@@ -407,7 +407,7 @@ extern int __pmLogGenerateMark(__pmLogCtl *, int, pmResult **) _PCP_HIDDEN;
|
|
extern int __pmLogFetchInterp(__pmContext *, int, pmID *, pmResult **) _PCP_HIDDEN;
|
|
extern int __pmGetArchiveLabel(__pmLogCtl *, pmLogLabel *) _PCP_HIDDEN;
|
|
extern pmTimeval *__pmLogStartTime(__pmArchCtl *) _PCP_HIDDEN;
|
|
-extern void __pmLogSetTime(__pmContext *) _PCP_HIDDEN;
|
|
+extern int __pmLogSetTime(__pmContext *) _PCP_HIDDEN;
|
|
extern void __pmLogResetInterp(__pmContext *) _PCP_HIDDEN;
|
|
extern void __pmArchCtlFree(__pmArchCtl *) _PCP_HIDDEN;
|
|
extern int __pmLogChangeArchive(__pmContext *, int) _PCP_HIDDEN;
|
|
diff --git a/src/libpcp/src/logutil.c b/src/libpcp/src/logutil.c
|
|
index 0ef76de25..2ea559bfe 100644
|
|
--- a/src/libpcp/src/logutil.c
|
|
+++ b/src/libpcp/src/logutil.c
|
|
@@ -1995,7 +1995,6 @@ __pmLogFetch(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **result)
|
|
sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
|
|
if (sts < 0)
|
|
goto func_return;
|
|
- assert(ctxp->c_archctl->ac_mfp != NULL);
|
|
__pmFseek(ctxp->c_archctl->ac_mfp,
|
|
(long)ctxp->c_archctl->ac_offset, SEEK_SET);
|
|
|
|
@@ -2010,7 +2009,9 @@ more:
|
|
* no serial access, so need to make sure we are
|
|
* starting in the correct place
|
|
*/
|
|
- __pmLogSetTime(ctxp);
|
|
+ sts = __pmLogSetTime(ctxp);
|
|
+ if (sts < 0)
|
|
+ goto func_return;
|
|
ctxp->c_archctl->ac_offset = __pmFtell(ctxp->c_archctl->ac_mfp);
|
|
ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_curvol;
|
|
/*
|
|
@@ -2299,7 +2300,7 @@ VolSkip(__pmArchCtl *acp, int mode, int j)
|
|
return PM_ERR_EOL;
|
|
}
|
|
|
|
-void
|
|
+int
|
|
__pmLogSetTime(__pmContext *ctxp)
|
|
{
|
|
__pmArchCtl *acp = ctxp->c_archctl;
|
|
@@ -2356,6 +2357,7 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
if (lcp->l_numti) {
|
|
/* we have a temporal index, use it! */
|
|
int j = -1;
|
|
+ int try;
|
|
int toobig = 0;
|
|
int match = 0;
|
|
int vol;
|
|
@@ -2406,9 +2408,13 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
acp->ac_serial = 1;
|
|
|
|
if (match) {
|
|
+ try = j;
|
|
j = VolSkip(acp, mode, j);
|
|
- if (j < 0)
|
|
- return;
|
|
+ if (j < 0) {
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #1\n", mode, try);
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
|
|
if (mode == PM_MODE_BACK)
|
|
acp->ac_serial = 0;
|
|
@@ -2418,9 +2424,13 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
}
|
|
}
|
|
else if (j < 1) {
|
|
+ try = 0;
|
|
j = VolSkip(acp, PM_MODE_FORW, 0);
|
|
- if (j < 0)
|
|
- return;
|
|
+ if (j < 0) {
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #2\n", PM_MODE_FORW, try);
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
|
|
if (pmDebugOptions.log) {
|
|
fprintf(stderr, " before start ti@");
|
|
@@ -2428,9 +2438,13 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
}
|
|
}
|
|
else if (j == numti) {
|
|
+ try = numti-1;
|
|
j = VolSkip(acp, PM_MODE_BACK, numti-1);
|
|
- if (j < 0)
|
|
- return;
|
|
+ if (j < 0) {
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #3\n", PM_MODE_BACK, try);
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
|
|
if (mode == PM_MODE_BACK)
|
|
acp->ac_serial = 0;
|
|
@@ -2450,9 +2464,13 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
t_hi = __pmTimevalSub(&lcp->l_ti[j].ti_stamp, &ctxp->c_origin);
|
|
t_lo = __pmTimevalSub(&ctxp->c_origin, &lcp->l_ti[j-1].ti_stamp);
|
|
if (t_hi <= t_lo && !toobig) {
|
|
+ try = j;
|
|
j = VolSkip(acp, mode, j);
|
|
- if (j < 0)
|
|
- return;
|
|
+ if (j < 0) {
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #4\n", mode, try);
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
|
|
if (mode == PM_MODE_FORW)
|
|
acp->ac_serial = 0;
|
|
@@ -2462,9 +2480,13 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
}
|
|
}
|
|
else {
|
|
+ try = j-1;
|
|
j = VolSkip(acp, mode, j-1);
|
|
- if (j < 0)
|
|
- return;
|
|
+ if (j < 0) {
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #5\n", mode, try);
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
|
|
if (mode == PM_MODE_BACK)
|
|
acp->ac_serial = 0;
|
|
@@ -2490,14 +2512,37 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
}
|
|
else {
|
|
/* index either not available, or not useful */
|
|
+ int j;
|
|
if (mode == PM_MODE_FORW) {
|
|
- __pmLogChangeVol(acp, lcp->l_minvol);
|
|
- assert(acp->ac_mfp != NULL);
|
|
+ for (j = lcp->l_minvol; j <= lcp->l_maxvol; j++) {
|
|
+ if (__pmLogChangeVol(acp, j) >= 0)
|
|
+ break;
|
|
+ }
|
|
+ if (j > lcp->l_maxvol) {
|
|
+ /* no volume found */
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, " index not useful, no volume between %d...%d\n",
|
|
+ lcp->l_minvol, lcp->l_maxvol);
|
|
+ acp->ac_curvol = -1;
|
|
+ acp->ac_mfp = NULL;
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)(sizeof(__pmLogLabel) + 2*sizeof(int)), SEEK_SET);
|
|
}
|
|
else if (mode == PM_MODE_BACK) {
|
|
- __pmLogChangeVol(acp, lcp->l_maxvol);
|
|
- assert(acp->ac_mfp != NULL);
|
|
+ for (j = lcp->l_maxvol; j >= lcp->l_minvol; j--) {
|
|
+ if (__pmLogChangeVol(acp, j) >= 0)
|
|
+ break;
|
|
+ }
|
|
+ if (j < lcp->l_minvol) {
|
|
+ /* no volume found */
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, " index not useful, no volume between %d...%d\n",
|
|
+ lcp->l_maxvol, lcp->l_minvol);
|
|
+ acp->ac_curvol = -1;
|
|
+ acp->ac_mfp = NULL;
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
|
|
}
|
|
|
|
@@ -2513,6 +2558,8 @@ __pmLogSetTime(__pmContext *ctxp)
|
|
acp->ac_offset = __pmFtell(acp->ac_mfp);
|
|
assert(acp->ac_offset >= 0);
|
|
acp->ac_vol = acp->ac_curvol;
|
|
+
|
|
+ return 0;
|
|
}
|
|
|
|
/* Read the label of the current archive. */
|
|
@@ -3100,6 +3147,7 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
|
|
pmTimeval save_origin;
|
|
int save_mode;
|
|
int sts;
|
|
+ int j;
|
|
|
|
/*
|
|
* Check whether there is a previous archive to switch to.
|
|
@@ -3145,12 +3193,23 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
|
|
}
|
|
|
|
/* Set up to scan backwards from the end of the archive. */
|
|
- __pmLogChangeVol(acp, lcp->l_maxvol);
|
|
- assert(acp->ac_mfp != NULL);
|
|
+ for (j = lcp->l_maxvol; j >= lcp->l_minvol; j--) {
|
|
+ if (__pmLogChangeVol(acp, j) >= 0)
|
|
+ break;
|
|
+ }
|
|
+ if (j < lcp->l_minvol) {
|
|
+ /* no volume found */
|
|
+ if (pmDebugOptions.log)
|
|
+ fprintf(stderr, "LogChangeToPreviousArchive: no volume between %d...%d\n",
|
|
+ lcp->l_maxvol, lcp->l_minvol);
|
|
+ acp->ac_curvol = -1;
|
|
+ acp->ac_mfp = NULL;
|
|
+ return PM_ERR_LOGFILE;
|
|
+ }
|
|
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
|
|
- ctxp->c_archctl->ac_offset = __pmFtell(acp->ac_mfp);
|
|
- assert(ctxp->c_archctl->ac_offset >= 0);
|
|
- ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_curvol;
|
|
+ acp->ac_offset = __pmFtell(acp->ac_mfp);
|
|
+ assert(acp->ac_offset >= 0);
|
|
+ acp->ac_vol = acp->ac_curvol;
|
|
|
|
/*
|
|
* Check for temporal overlap here. Do this last in case the API client
|
|
|
|
commit 5e3b792d3d8ae60f2cebbd51c37b9b0722c3b26e
|
|
Author: Mark Goodwin <mgoodwin@redhat.com>
|
|
Date: Tue Jul 6 20:09:28 2021 +1000
|
|
|
|
libpcp_web: plug mem leak in redisMapInsert during daily log-rolling
|
|
|
|
When pmlogger_daily processes daily archives, the resulting
|
|
merged archive(s) are discovered and processed by pmproxy
|
|
(if the discovery module is enabled). Since the metadata and
|
|
logvol data in each merged archive is likely to have already
|
|
been previously processed (but discovery doesn't know this),
|
|
we see a lot of dict updates for existing keys and values that
|
|
are already mapped.
|
|
|
|
Static analysis by Coverity (CID323605 Resource Leak) shows
|
|
when redisMapInsert calls dictAdd for an existing key, the
|
|
new value field is assigned but the old value is not free'd,
|
|
and so it leaks.
|
|
|
|
Related: RHBZ1975069 and https://github.com/performancecopilot/pcp/issues/1318
|
|
|
|
diff --git a/src/libpcp_web/src/maps.c b/src/libpcp_web/src/maps.c
|
|
index 013ef02d3..ce20476c9 100644
|
|
--- a/src/libpcp_web/src/maps.c
|
|
+++ b/src/libpcp_web/src/maps.c
|
|
@@ -160,6 +160,12 @@ redisMapLookup(redisMap *map, sds key)
|
|
void
|
|
redisMapInsert(redisMap *map, sds key, sds value)
|
|
{
|
|
+ redisMapEntry *entry = redisMapLookup(map, key);
|
|
+
|
|
+ if (entry) {
|
|
+ /* fix for Coverity CID323605 Resource Leak */
|
|
+ dictDelete(map, key);
|
|
+ }
|
|
dictAdd(map, key, value);
|
|
}
|
|
|
|
|
|
commit 2a00a90b0bc3aecb8465fd32aef1ddbe745b2c91
|
|
Author: Mark Goodwin <mgoodwin@redhat.com>
|
|
Date: Tue Jul 6 20:43:01 2021 +1000
|
|
|
|
libpcp_web/discovery: improve lock handling and scalability
|
|
|
|
Rework the global log-rolling lock detection with finer grain
|
|
(per-pmlogger directory) detection, and break early in
|
|
process_meta() and process_logvol() if a lock file is found
|
|
in the same directory as a monitored archive. This is much
|
|
more scalable since archive directories that are not locked
|
|
can continue to be processed and ingested whilst log-rolling
|
|
progresses elsewhere. Also uses much less CPU time since we
|
|
don't need a traversal of all monitored archives looking for
|
|
locks on every fs change event.
|
|
|
|
Also improve process_logvol/meta handling for archives that
|
|
are deleted whilst being processed by the discovery module.
|
|
In conjunction with Kenj's changes in libpcp - stop processing
|
|
metadata and logvols if pmFetchArchive returns -ENOENT .. the
|
|
archive has been deleted so there is no point further ingesting
|
|
it's data.
|
|
|
|
Related: RHBZ1975069
|
|
Related: https://github.com/performancecopilot/pcp/issues/1338
|
|
|
|
diff --git a/src/libpcp_web/src/discover.c b/src/libpcp_web/src/discover.c
|
|
index 991055ce5..964813f66 100644
|
|
--- a/src/libpcp_web/src/discover.c
|
|
+++ b/src/libpcp_web/src/discover.c
|
|
@@ -33,9 +33,6 @@ static char *pmDiscoverFlagsStr(pmDiscover *);
|
|
#define PM_DISCOVER_HASHTAB_SIZE 32
|
|
static pmDiscover *discover_hashtable[PM_DISCOVER_HASHTAB_SIZE];
|
|
|
|
-/* pmlogger_daily log-roll lock count */
|
|
-static int logrolling = 0;
|
|
-
|
|
/* number of archives or directories currently being monitored */
|
|
static int n_monitored = 0;
|
|
|
|
@@ -426,28 +423,6 @@ is_deleted(pmDiscover *p, struct stat *sbuf)
|
|
return ret;
|
|
}
|
|
|
|
-static int
|
|
-check_for_locks()
|
|
-{
|
|
- int i;
|
|
- pmDiscover *p;
|
|
- char sep = pmPathSeparator();
|
|
- char path[MAXNAMELEN];
|
|
-
|
|
- for (i=0; i < PM_DISCOVER_HASHTAB_SIZE; i++) {
|
|
- for (p = discover_hashtable[i]; p; p = p->next) {
|
|
- if (p->flags & PM_DISCOVER_FLAGS_DIRECTORY) {
|
|
- pmsprintf(path, sizeof(path), "%s%c%s", p->context.name, sep, "lock");
|
|
- if (access(path, F_OK) == 0)
|
|
- return 1;
|
|
- }
|
|
- }
|
|
- }
|
|
-
|
|
- /* no locks */
|
|
- return 0;
|
|
-}
|
|
-
|
|
static void
|
|
check_deleted(pmDiscover *p)
|
|
{
|
|
@@ -465,37 +440,8 @@ fs_change_callBack(uv_fs_event_t *handle, const char *filename, int events, int
|
|
pmDiscover *p;
|
|
char *s;
|
|
sds path;
|
|
- int locksfound = 0;
|
|
struct stat statbuf;
|
|
|
|
- /*
|
|
- * check if logs are currently being rolled by pmlogger_daily et al
|
|
- * in any of the directories we are tracking. For mutex, the log control
|
|
- * scripts use a 'lock' file in each directory as it is processed.
|
|
- */
|
|
- locksfound = check_for_locks();
|
|
-
|
|
- if (!logrolling && locksfound) {
|
|
- /* log-rolling has started */
|
|
- if (pmDebugOptions.discovery)
|
|
- fprintf(stderr, "%s discovery callback: log-rolling in progress\n", stamp());
|
|
- logrolling = locksfound;
|
|
- return;
|
|
- }
|
|
-
|
|
- if (logrolling && locksfound) {
|
|
- logrolling = locksfound;
|
|
- return; /* still in progress */
|
|
- }
|
|
-
|
|
- if (logrolling && !locksfound) {
|
|
- /* log-rolling is finished: check what got deleted, and then purge */
|
|
- if (pmDebugOptions.discovery)
|
|
- fprintf(stderr, "%s discovery callback: finished log-rolling\n", stamp());
|
|
- pmDiscoverTraverse(PM_DISCOVER_FLAGS_META|PM_DISCOVER_FLAGS_DATAVOL, check_deleted);
|
|
- }
|
|
- logrolling = locksfound;
|
|
-
|
|
uv_fs_event_getpath(handle, buffer, &bytes);
|
|
path = sdsnewlen(buffer, bytes);
|
|
|
|
@@ -1037,6 +983,17 @@ pmDiscoverNewSource(pmDiscover *p, int context)
|
|
pmDiscoverInvokeSourceCallBacks(p, ×tamp);
|
|
}
|
|
|
|
+static char *
|
|
+archive_dir_lock_path(pmDiscover *p)
|
|
+{
|
|
+ char path[MAXNAMELEN], lockpath[MAXNAMELEN];
|
|
+ int sep = pmPathSeparator();
|
|
+
|
|
+ strncpy(path, p->context.name, sizeof(path)-1);
|
|
+ pmsprintf(lockpath, sizeof(lockpath), "%s%c%s", dirname(path), sep, "lock");
|
|
+ return strndup(lockpath, sizeof(lockpath));
|
|
+}
|
|
+
|
|
/*
|
|
* Process metadata records until EOF. That can span multiple
|
|
* callbacks if we get a partial record read.
|
|
@@ -1059,6 +1016,7 @@ process_metadata(pmDiscover *p)
|
|
__pmLogHdr hdr;
|
|
sds msg, source;
|
|
static uint32_t *buf = NULL;
|
|
+ char *lock_path;
|
|
int deleted;
|
|
struct stat sbuf;
|
|
static int buflen = 0;
|
|
@@ -1073,7 +1031,10 @@ process_metadata(pmDiscover *p)
|
|
fprintf(stderr, "process_metadata: %s in progress %s\n",
|
|
p->context.name, pmDiscoverFlagsStr(p));
|
|
pmDiscoverStatsAdd(p->module, "metadata.callbacks", NULL, 1);
|
|
+ lock_path = archive_dir_lock_path(p);
|
|
for (;;) {
|
|
+ if (lock_path && access(lock_path, F_OK) == 0)
|
|
+ break;
|
|
pmDiscoverStatsAdd(p->module, "metadata.loops", NULL, 1);
|
|
off = lseek(p->fd, 0, SEEK_CUR);
|
|
nb = read(p->fd, &hdr, sizeof(__pmLogHdr));
|
|
@@ -1240,6 +1201,9 @@ process_metadata(pmDiscover *p)
|
|
/* flag that all available metadata has now been read */
|
|
p->flags &= ~PM_DISCOVER_FLAGS_META_IN_PROGRESS;
|
|
|
|
+ if (lock_path)
|
|
+ free(lock_path);
|
|
+
|
|
if (pmDebugOptions.discovery)
|
|
fprintf(stderr, "%s: completed, partial=%d %s %s\n",
|
|
"process_metadata", partial, p->context.name, pmDiscoverFlagsStr(p));
|
|
@@ -1266,14 +1230,18 @@ static void
|
|
process_logvol(pmDiscover *p)
|
|
{
|
|
int sts;
|
|
- pmResult *r;
|
|
+ pmResult *r = NULL;
|
|
pmTimespec ts;
|
|
int oldcurvol;
|
|
__pmContext *ctxp;
|
|
__pmArchCtl *acp;
|
|
+ char *lock_path;
|
|
|
|
pmDiscoverStatsAdd(p->module, "logvol.callbacks", NULL, 1);
|
|
+ lock_path = archive_dir_lock_path(p);
|
|
for (;;) {
|
|
+ if (lock_path && access(lock_path, F_OK) == 0)
|
|
+ break;
|
|
pmDiscoverStatsAdd(p->module, "logvol.loops", NULL, 1);
|
|
pmUseContext(p->ctx);
|
|
ctxp = __pmHandleToPtr(p->ctx);
|
|
@@ -1312,6 +1280,7 @@ process_logvol(pmDiscover *p)
|
|
}
|
|
|
|
/* we are done - return and wait for another callback */
|
|
+ r = NULL;
|
|
break;
|
|
}
|
|
|
|
@@ -1328,14 +1297,15 @@ process_logvol(pmDiscover *p)
|
|
}
|
|
|
|
/*
|
|
- * TODO: persistently save current timestamp, so after being restarted,
|
|
- * pmproxy can resume where it left off for each archive.
|
|
+ * TODO (perhaps): persistently save current timestamp, so after being
|
|
+ * restarted, pmproxy can resume where it left off for each archive.
|
|
*/
|
|
ts.tv_sec = r->timestamp.tv_sec;
|
|
ts.tv_nsec = r->timestamp.tv_usec * 1000;
|
|
bump_logvol_decode_stats(p, r);
|
|
pmDiscoverInvokeValuesCallBack(p, &ts, r);
|
|
pmFreeResult(r);
|
|
+ r = NULL;
|
|
}
|
|
|
|
if (r) {
|
|
@@ -1348,6 +1318,9 @@ process_logvol(pmDiscover *p)
|
|
|
|
/* datavol is now up-to-date and at EOF */
|
|
p->flags &= ~PM_DISCOVER_FLAGS_DATAVOL_READY;
|
|
+
|
|
+ if (lock_path)
|
|
+ free(lock_path);
|
|
}
|
|
|
|
static void
|
|
@@ -1357,6 +1330,10 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
|
|
sds msg;
|
|
sds metaname;
|
|
|
|
+ check_deleted(p);
|
|
+ if (p->flags & PM_DISCOVER_FLAGS_DELETED)
|
|
+ return; /* ignore deleted archive */
|
|
+
|
|
if (p->ctx < 0) {
|
|
/*
|
|
* once off initialization on the first event
|
|
@@ -1366,16 +1343,23 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
|
|
|
|
/* create the PMAPI context (once off) */
|
|
if ((sts = pmNewContext(p->context.type, p->context.name)) < 0) {
|
|
- /*
|
|
- * Likely an early callback on a new (still empty) archive.
|
|
- * If so, just ignore the callback and don't log any scary
|
|
- * looking messages. We'll get another CB soon.
|
|
- */
|
|
- if (sts != PM_ERR_NODATA || pmDebugOptions.desperate) {
|
|
- infofmt(msg, "pmNewContext failed for %s: %s\n",
|
|
- p->context.name, pmErrStr(sts));
|
|
- moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
|
|
+ if (sts == -ENOENT) {
|
|
+ /* newly deleted archive */
|
|
+ p->flags |= PM_DISCOVER_FLAGS_DELETED;
|
|
}
|
|
+ else {
|
|
+ /*
|
|
+ * Likely an early callback on a new (still empty) archive.
|
|
+ * If so, just ignore the callback and don't log any scary
|
|
+ * looking messages. We'll get another CB soon.
|
|
+ */
|
|
+ if (sts != PM_ERR_NODATA || pmDebugOptions.desperate) {
|
|
+ infofmt(msg, "pmNewContext failed for %s: %s\n",
|
|
+ p->context.name, pmErrStr(sts));
|
|
+ moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
|
|
+ }
|
|
+ }
|
|
+ /* no further processing for this archive */
|
|
return;
|
|
}
|
|
pmDiscoverStatsAdd(p->module, "logvol.new_contexts", NULL, 1);
|
|
@@ -1410,8 +1394,12 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
|
|
metaname = sdsnew(p->context.name);
|
|
metaname = sdscat(metaname, ".meta");
|
|
if ((p->fd = open(metaname, O_RDONLY)) < 0) {
|
|
- infofmt(msg, "open failed for %s: %s\n", metaname, osstrerror());
|
|
- moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
|
|
+ if (p->fd == -ENOENT)
|
|
+ p->flags |= PM_DISCOVER_FLAGS_DELETED;
|
|
+ else {
|
|
+ infofmt(msg, "open failed for %s: %s\n", metaname, osstrerror());
|
|
+ moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
|
|
+ }
|
|
sdsfree(metaname);
|
|
return;
|
|
}
|