import pcp-5.3.1-6.el9

This commit is contained in:
CentOS Sources 2021-11-03 10:04:42 -04:00 committed by Stepan Oksanichenko
commit b04daac761
8 changed files with 17447 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
SOURCES/pcp-5.3.1.src.tar.gz

1
.pcp.metadata Normal file
View File

@ -0,0 +1 @@
5b693868cb11c09b87880ca2d57e672a3bfd63e6 SOURCES/pcp-5.3.1.src.tar.gz

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,849 @@
commit 2bad6aef10339f000f7cb578108db5ee80bd640c
Author: Mark Goodwin <mgoodwin@redhat.com>
Date: Wed Jun 9 17:04:33 2021 +1000
pmproxy: add mutex for client req lists, fix https/tls support, QA
Add a new mutext to struct proxy and use it to protect parallel
multithreaded updates to the proxy->first client list.
Also use the same mutext to protect updates to the pending_writes
client list and avoid the doubly linked list corruption that was
causing parallel https/tls requests to get stuck spinning in
flush_secure_module(), as reported in BZ#1947989.
qa/1457 is extensively updated to test parallel http, https/tls
(and combinations of http and https/tls) RESTAPI calls. Previously
it only tested a single https/tls call.
With these changes, parallel https/tls RESTAPI requests from the
grafana-pcp datasource to pmproxy now work correctly whereas previously
pmproxy would hang/spin.
Resolves: RHBZ#1947989 - pmproxy hangs and consume 100% cpu if the
redis datasource is configured with TLS.
Related: https://github.com/performancecopilot/pcp/issues/1311
diff --git a/qa/1457 b/qa/1457
index 94969f6e0..8bf395944 100755
--- a/qa/1457
+++ b/qa/1457
@@ -2,7 +2,7 @@
# PCP QA Test No. 1457
# Exercise HTTPS access to the PMWEBAPI(3).
#
-# Copyright (c) 2019 Red Hat.
+# Copyright (c) 2019,2021 Red Hat.
#
seq=`basename $0`
@@ -138,14 +138,59 @@ else
fi
date >>$seq.full
-echo "=== checking TLS operation ===" | tee -a $seq.full
-# (-k) allows us to use self-signed (insecure) certificates, so for testing only
-# (-v) provides very detailed TLS connection information, for debugging only
-curl -k --get 2>$tmp.err \
- "https://localhost:$port/pmapi/metric?name=sample.long.ten" \
- | _filter_json
-cat $tmp.err >>$seq.full
+echo "=== checking serial http operation ===" | tee -a $seq.full
+for i in 1 2 3 4; do
+ curl -Gs "http://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$i >$tmp.out$i
+done
+for i in 1 2 3 4; do
+echo === out$i === | tee -a $seq.full
+_filter_json < $tmp.out$i
+done
+
+date >>$seq.full
+echo "=== checking parallel http operation ===" | tee -a $seq.full
+for i in 1 2 3 4; do
+ curl -Gs "http://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$i >$tmp.out$i & 2>/dev/null eval pid$i=$!
+done
+wait $pid1 $pid2 $pid3 $pid4
+for i in 1 2 3 4; do
+echo === out$i === | tee -a $seq.full
+_filter_json < $tmp.out$i
+done
+
+date >>$seq.full
+echo "=== checking serial https/TLS operation ===" | tee -a $seq.full
+for i in 1 2 3 4; do
+ curl -k -Gs "https://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$i >$tmp.out$i
+done
+for i in 1 2 3 4; do
+echo === out$i === | tee -a $seq.full
+_filter_json < $tmp.out$i
+done
+
date >>$seq.full
+echo "=== checking parallel https/TLS operation ===" | tee -a $seq.full
+for i in 1 2 3 4; do
+ curl -k -Gs "https://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$i >$tmp.out$i & 2>/dev/null eval pid$i=$!
+done
+wait $pid1 $pid2 $pid3 $pid4
+for i in 1 2 3 4; do
+echo === out$i === | tee -a $seq.full
+_filter_json < $tmp.out$i
+done
+
+date >>$seq.full
+echo "=== checking parallel mixed http and https/TLS operations ===" | tee -a $seq.full
+for i in 1 3 5 7; do
+ j=`expr $i + 1`
+ curl -k -Gs "http://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$i >$tmp.out$i & 2>/dev/null eval pid$i=$!
+ curl -k -Gs "https://localhost:$port/pmapi/metric?name=sample.long.ten" 2>$tmp.err$j >$tmp.out$j & 2>/dev/null eval pid$j=$!
+done
+wait $pid1 $pid2 $pid3 $pid4 $pid5 $pid6 $pid7 $pid8
+for i in 1 2 3 4 5 6 7 8; do
+echo === out$i === | tee -a $seq.full
+_filter_json < $tmp.out$i
+done
echo "=== check pmproxy is running ==="
pminfo -v -h localhost@localhost:$port hinv.ncpu
@@ -156,7 +201,7 @@ else
fi
# valgrind takes awhile to shutdown too
-pmsignal $pid
+pmsignal $pid >/dev/null 2>&1
pmsleep 3.5
echo "=== valgrind stdout ===" | tee -a $seq.full
cat $tmp.valout | _filter_valgrind
@@ -164,6 +209,9 @@ cat $tmp.valout | _filter_valgrind
echo "=== valgrind stderr ===" | tee -a $seq.full
cat $tmp.valerr | _filter_pmproxy_log | _filter_port
+# final kill if it's spinning
+$sudo kill -9 $pid >/dev/null 2>&1
+
# success, all done
status=0
exit
diff --git a/qa/1457.out b/qa/1457.out
index a7b64cdc5..422176db2 100644
--- a/qa/1457.out
+++ b/qa/1457.out
@@ -1,5 +1,539 @@
QA output created by 1457
-=== checking TLS operation ===
+=== checking serial http operation ===
+=== out1 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out2 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out3 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out4 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== checking parallel http operation ===
+=== out1 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out2 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out3 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out4 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== checking serial https/TLS operation ===
+=== out1 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out2 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out3 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out4 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== checking parallel https/TLS operation ===
+=== out1 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out2 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out3 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out4 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== checking parallel mixed http and https/TLS operations ===
+=== out1 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out2 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out3 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out4 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out5 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out6 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out7 ===
+{
+ "context": "CONTEXT"
+ "metrics": [
+ {
+ "name": "sample.long.ten",
+ "series": "SERIES"
+ "pmid": "29.0.11",
+ "type": "32",
+ "sem": "instant",
+ "units": "none",
+ "labels": {
+ "agent": "sample",
+ "cluster": "zero",
+ "domainname": "DOMAINNAME"
+ "hostname": "HOSTNAME"
+ "role": "testing"
+ },
+ "text-oneline": "10 as a 32-bit integer",
+ "text-help": "10 as a 32-bit integer"
+ }
+ ]
+}
+=== out8 ===
{
"context": "CONTEXT"
"metrics": [
diff --git a/qa/group b/qa/group
index 462dffaaa..77cac788d 100644
--- a/qa/group
+++ b/qa/group
@@ -1818,7 +1818,7 @@ x11
1436 pmda.postgresql local
1437 pmda.kvm local
1455 pmlogrewrite labels pmdumplog local
-1457 pmproxy local
+1457 pmproxy libpcp_web threads secure local
1480 pmda.lmsensors local
1489 python pmrep pmimport local
1490 python local labels
diff --git a/src/pmproxy/src/secure.c b/src/pmproxy/src/secure.c
index 77265894c..072e2a085 100644
--- a/src/pmproxy/src/secure.c
+++ b/src/pmproxy/src/secure.c
@@ -16,13 +16,25 @@
#include <openssl/opensslv.h>
#include <openssl/ssl.h>
+/* called with proxy->mutex locked */
static void
remove_connection_from_queue(struct client *client)
{
+ struct proxy *proxy = client->proxy;
+
if (client->secure.pending.writes_buffer != NULL)
free(client->secure.pending.writes_buffer);
- if (client->secure.pending.prev != NULL)
- *client->secure.pending.prev = client->secure.pending.next;
+ if (client->secure.pending.prev == NULL) {
+ /* next (if any) becomes first in pending_writes list */
+ proxy->pending_writes = client->secure.pending.next;
+ if (proxy->pending_writes)
+ proxy->pending_writes->secure.pending.prev = NULL;
+ }
+ else {
+ /* link next and prev */
+ client->secure.pending.prev->secure.pending.next = client->secure.pending.next;
+ client->secure.pending.next->secure.pending.prev = client->secure.pending.prev;
+ }
memset(&client->secure.pending, 0, sizeof(client->secure.pending));
}
@@ -32,7 +44,9 @@ on_secure_client_close(struct client *client)
if (pmDebugOptions.auth || pmDebugOptions.http)
fprintf(stderr, "%s: client %p\n", "on_secure_client_close", client);
+ uv_mutex_lock(&client->proxy->mutex);
remove_connection_from_queue(client);
+ uv_mutex_unlock(&client->proxy->mutex);
/* client->read and client->write freed by SSL_free */
SSL_free(client->secure.ssl);
}
@@ -40,6 +54,8 @@ on_secure_client_close(struct client *client)
static void
maybe_flush_ssl(struct proxy *proxy, struct client *client)
{
+ struct client *c;
+
if (client->secure.pending.queued)
return;
@@ -47,13 +63,19 @@ maybe_flush_ssl(struct proxy *proxy, struct client *client)
client->secure.pending.writes_count > 0)
return;
- client->secure.pending.next = proxy->pending_writes;
- if (client->secure.pending.next != NULL)
- client->secure.pending.next->secure.pending.prev = &client->secure.pending.next;
- client->secure.pending.prev = &proxy->pending_writes;
+ uv_mutex_lock(&proxy->mutex);
+ if (proxy->pending_writes == NULL) {
+ proxy->pending_writes = client;
+ client->secure.pending.prev = client->secure.pending.next = NULL;
+ }
+ else {
+ for (c=proxy->pending_writes; c->secure.pending.next; c = c->secure.pending.next)
+ ; /**/
+ c->secure.pending.next = client;
+ client->secure.pending.prev = c;
+ }
client->secure.pending.queued = 1;
-
- proxy->pending_writes = client;
+ uv_mutex_unlock(&proxy->mutex);
}
static void
@@ -135,10 +157,12 @@ flush_ssl_buffer(struct client *client)
void
flush_secure_module(struct proxy *proxy)
{
- struct client *client, **head = &proxy->pending_writes;
+ struct client *client, **head;
size_t i, used;
int sts;
+ uv_mutex_lock(&proxy->mutex);
+ head = &proxy->pending_writes;
while ((client = *head) != NULL) {
flush_ssl_buffer(client);
@@ -188,6 +212,7 @@ flush_secure_module(struct proxy *proxy)
sizeof(uv_buf_t) * client->secure.pending.writes_count);
}
}
+ uv_mutex_unlock(&proxy->mutex);
}
void
diff --git a/src/pmproxy/src/server.c b/src/pmproxy/src/server.c
index 612d3613b..b5c5d84de 100644
--- a/src/pmproxy/src/server.c
+++ b/src/pmproxy/src/server.c
@@ -149,6 +149,7 @@ server_init(int portcount, const char *localpath)
pmGetProgname());
return NULL;
}
+ uv_mutex_init(&proxy->mutex);
count = portcount + (*localpath ? 1 : 0);
if (count) {
@@ -251,6 +252,7 @@ void
client_put(struct client *client)
{
unsigned int refcount;
+ struct proxy *proxy = client->proxy;
uv_mutex_lock(&client->mutex);
assert(client->refcount);
@@ -259,9 +261,11 @@ client_put(struct client *client)
if (refcount == 0) {
/* remove client from the doubly-linked list */
+ uv_mutex_lock(&proxy->mutex);
if (client->next != NULL)
client->next->prev = client->prev;
*client->prev = client->next;
+ uv_mutex_unlock(&proxy->mutex);
if (client->protocol & STREAM_PCP)
on_pcp_client_close(client);
@@ -514,10 +518,12 @@ on_client_connection(uv_stream_t *stream, int status)
client->proxy = proxy;
/* insert client into doubly-linked list at the head */
+ uv_mutex_lock(&proxy->mutex);
if ((client->next = proxy->first) != NULL)
proxy->first->prev = &client->next;
proxy->first = client;
client->prev = &proxy->first;
+ uv_mutex_unlock(&proxy->mutex);
status = uv_read_start((uv_stream_t *)&client->stream.u.tcp,
on_buffer_alloc, on_client_read);
diff --git a/src/pmproxy/src/server.h b/src/pmproxy/src/server.h
index f0b7a5f68..f93daeff4 100644
--- a/src/pmproxy/src/server.h
+++ b/src/pmproxy/src/server.h
@@ -118,7 +118,7 @@ typedef struct secure_client {
BIO *write;
struct {
struct client *next;
- struct client **prev;
+ struct client *prev;
unsigned int queued;
size_t writes_count;
uv_buf_t *writes_buffer;
@@ -166,6 +166,7 @@ typedef struct proxy {
struct dict *config; /* configuration dictionary */
uv_loop_t *events; /* global, async event loop */
uv_callback_t write_callbacks;
+ uv_mutex_t mutex; /* protects client lists and pending writes */
} proxy;
extern void proxylog(pmLogLevel, sds, void *);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,922 @@
3f5ba2218 libpcp_web: add mutex to struct webgroup protecting the context dict
107633192 src/libpcp: be more careful when calling __pmLogChangeVol()
49bdfdfff libpcp: redefine __pmLogSetTime()
5e3b792d3 libpcp_web: plug mem leak in redisMapInsert during daily log-rolling
2a00a90b0 libpcp_web/discovery: improve lock handling and scalability
commit 3f5ba221842e6a02e9fb22e23c754854271c3c9a
Author: Mark Goodwin <mgoodwin@redhat.com>
Date: Wed Jun 9 16:44:30 2021 +1000
libpcp_web: add mutex to struct webgroup protecting the context dict
Add a mutex to the local webgroups structure in libpcp_web and
use it to protect multithreaded parallel updates (dictAdd,
dictDelete) to the groups->contexts dict and the dict traversal
in the timer driven garbage collector.
Tested by qa/297 and related tests and also an updated version
of qa/1457 (which now stress tests parallel http and https/tls
pmproxy RESTAPI calls .. in a later commit).
Related: RHBZ#1947989
Resolves: https://github.com/performancecopilot/pcp/issues/1311
diff --git a/src/libpcp_web/src/webgroup.c b/src/libpcp_web/src/webgroup.c
index 08c2518ed..35f05441b 100644
--- a/src/libpcp_web/src/webgroup.c
+++ b/src/libpcp_web/src/webgroup.c
@@ -51,14 +51,20 @@ typedef struct webgroups {
uv_loop_t *events;
unsigned int active;
uv_timer_t timer;
+ uv_mutex_t mutex;
} webgroups;
static struct webgroups *
webgroups_lookup(pmWebGroupModule *module)
{
- if (module->privdata == NULL)
+ struct webgroups *groups = module->privdata;
+
+ if (module->privdata == NULL) {
module->privdata = calloc(1, sizeof(struct webgroups));
- return (struct webgroups *)module->privdata;
+ groups = (struct webgroups *)module->privdata;
+ uv_mutex_init(&groups->mutex);
+ }
+ return groups;
}
static int
@@ -94,8 +100,11 @@ webgroup_drop_context(struct context *context, struct webgroups *groups)
context->garbage = 1;
uv_timer_stop(&context->timer);
}
- if (groups)
+ if (groups) {
+ uv_mutex_lock(&groups->mutex);
dictDelete(groups->contexts, &context->randomid);
+ uv_mutex_unlock(&groups->mutex);
+ }
uv_close((uv_handle_t *)&context->timer, webgroup_release_context);
}
}
@@ -207,13 +216,16 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
cp->context = -1;
cp->timeout = polltime;
+ uv_mutex_lock(&groups->mutex);
if ((cp->randomid = random()) < 0 ||
dictFind(groups->contexts, &cp->randomid) != NULL) {
infofmt(*message, "random number failure on new web context");
pmwebapi_free_context(cp);
*status = -ESRCH;
+ uv_mutex_unlock(&groups->mutex);
return NULL;
}
+ uv_mutex_unlock(&groups->mutex);
cp->origin = sdscatfmt(sdsempty(), "%i", cp->randomid);
cp->name.sds = sdsdup(hostspec ? hostspec : LOCALHOST);
cp->realm = sdscatfmt(sdsempty(), "pmapi/%i", cp->randomid);
@@ -242,7 +254,9 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
pmwebapi_free_context(cp);
return NULL;
}
+ uv_mutex_lock(&groups->mutex);
dictAdd(groups->contexts, &cp->randomid, cp);
+ uv_mutex_unlock(&groups->mutex);
/* leave until the end because uv_timer_init makes this visible in uv_run */
handle = (uv_handle_t *)&cp->timer;
@@ -261,25 +275,34 @@ webgroup_new_context(pmWebGroupSettings *sp, dict *params,
static void
webgroup_garbage_collect(struct webgroups *groups)
{
- dictIterator *iterator = dictGetSafeIterator(groups->contexts);
+ dictIterator *iterator;
dictEntry *entry;
context_t *cp;
if (pmDebugOptions.http || pmDebugOptions.libweb)
fprintf(stderr, "%s: started\n", "webgroup_garbage_collect");
- while ((entry = dictNext(iterator)) != NULL) {
- cp = (context_t *)dictGetVal(entry);
- if (cp->garbage && cp->privdata == groups) {
- if (pmDebugOptions.http || pmDebugOptions.libweb)
- fprintf(stderr, "GC context %u (%p)\n", cp->randomid, cp);
- webgroup_drop_context(cp, groups);
+ /* do context GC if we get the lock (else don't block here) */
+ if (uv_mutex_trylock(&groups->mutex) == 0) {
+ iterator = dictGetSafeIterator(groups->contexts);
+ for (entry = dictNext(iterator); entry;) {
+ cp = (context_t *)dictGetVal(entry);
+ entry = dictNext(iterator);
+ if (cp->garbage && cp->privdata == groups) {
+ if (pmDebugOptions.http || pmDebugOptions.libweb)
+ fprintf(stderr, "GC context %u (%p)\n", cp->randomid, cp);
+ uv_mutex_unlock(&groups->mutex);
+ webgroup_drop_context(cp, groups);
+ uv_mutex_lock(&groups->mutex);
+ }
}
+ dictReleaseIterator(iterator);
+ uv_mutex_unlock(&groups->mutex);
}
- dictReleaseIterator(iterator);
/* TODO - trim maps, particularly instmap if proc metrics are not excluded */
+ /* TODO move the following to a new stats timer */
if (groups->metrics_handle) {
mmv_stats_set(groups->metrics_handle, "contextmap.size",
NULL, dictSize(contextmap));
commit 107633192326b27ae571d4d4955052b8d86222c2
Author: Ken McDonell <kenj@kenj.id.au>
Date: Fri Jul 2 16:52:48 2021 +1000
src/libpcp: be more careful when calling __pmLogChangeVol()
Mark observed a SEGV which looks like __pmLogFetch() died because
ctxp->c_archctl->ac_mfp was (unexpectedly) NULL.
See: https://github.com/performancecopilot/pcp/issues/1338
Initial guess is that a physical file was removed by concurrent
activity (like pmlogger_check or pmlogger_daily), causing
__pmLogChangeVol() to fail ... and this was not being checked for
on the __pmLogFetch() path and in a couple of other places.
modified: interp.c
modified: logutil.c
diff --git a/src/libpcp/src/interp.c b/src/libpcp/src/interp.c
index d7effbc1e..c8f6fe382 100644
--- a/src/libpcp/src/interp.c
+++ b/src/libpcp/src/interp.c
@@ -1312,7 +1312,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
}
/* get to the last remembered place */
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ if (sts < 0)
+ goto all_done;
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
seen_mark = 0; /* interested in <mark> records seen from here on */
@@ -1397,7 +1399,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
* at least one metric requires a bound from earlier in the log ...
* position ourselves, ... and search
*/
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ if (sts < 0)
+ goto all_done;
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
done = 0;
@@ -1542,7 +1546,9 @@ __pmLogFetchInterp(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **r
* at least one metric requires a bound from later in the log ...
* position ourselves ... and search
*/
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ if (sts < 0)
+ goto all_done;
__pmFseek(ctxp->c_archctl->ac_mfp, ctxp->c_archctl->ac_offset, SEEK_SET);
done = 0;
diff --git a/src/libpcp/src/logutil.c b/src/libpcp/src/logutil.c
index fe35ed422..0ef76de25 100644
--- a/src/libpcp/src/logutil.c
+++ b/src/libpcp/src/logutil.c
@@ -1992,7 +1992,10 @@ __pmLogFetch(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **result)
all_derived = check_all_derived(numpmid, pmidlist);
/* re-establish position */
- __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
+ if (sts < 0)
+ goto func_return;
+ assert(ctxp->c_archctl->ac_mfp != NULL);
__pmFseek(ctxp->c_archctl->ac_mfp,
(long)ctxp->c_archctl->ac_offset, SEEK_SET);
@@ -2489,10 +2492,12 @@ __pmLogSetTime(__pmContext *ctxp)
/* index either not available, or not useful */
if (mode == PM_MODE_FORW) {
__pmLogChangeVol(acp, lcp->l_minvol);
+ assert(acp->ac_mfp != NULL);
__pmFseek(acp->ac_mfp, (long)(sizeof(__pmLogLabel) + 2*sizeof(int)), SEEK_SET);
}
else if (mode == PM_MODE_BACK) {
__pmLogChangeVol(acp, lcp->l_maxvol);
+ assert(acp->ac_mfp != NULL);
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
}
@@ -3141,6 +3146,7 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
/* Set up to scan backwards from the end of the archive. */
__pmLogChangeVol(acp, lcp->l_maxvol);
+ assert(acp->ac_mfp != NULL);
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
ctxp->c_archctl->ac_offset = __pmFtell(acp->ac_mfp);
assert(ctxp->c_archctl->ac_offset >= 0);
commit 49bdfdfff83ac165de2bdc9a40e61a56512585d8
Author: Ken McDonell <kenj@kenj.id.au>
Date: Sun Jul 4 10:07:09 2021 +1000
libpcp: redefine __pmLogSetTime()
The problem is that if physical files for the data volumes of an
archive are removed (asynchronously by someone else) while we're
trying to switch volumes then we don't handle this safely.
The previous commit 10763319 was as stop-gap to address Mark's SEGV
issue at https://github.com/performancecopilot/pcp/issues/1338 and
simply handled direct calls to __pmLogChangeVol() and ensured the
return status was checked.
I was aware, then Coverity made a lot more people aware, that this
"fix" was incomplete, specifically the calls to __pmLogChangeVol()
from within __pmLogSetTime() were not checked.
To fix the latter we have to change the type of __pmLogSetTime() from
void to int so we can return status to indicate that __pmLogChangeVol()
has failed. And then make sure all the callers of __pmLogSetTime()
check the status returned from that function.
modified: src/libpcp/src/fetch.c
modified: src/libpcp/src/internal.h
modified: src/libpcp/src/logutil.c
Because this introduces some new -Dlog diagnostics, qa/251 needed
a bit of a make-over.
diff --git a/qa/251 b/qa/251
index 2b8a07917..f9b293e98 100755
--- a/qa/251
+++ b/qa/251
@@ -37,7 +37,7 @@ _filter()
status=1 # failure is the default!
$sudo rm -rf $tmp.* $seq.full
-trap "cd $here; rm -rf $tmp; exit \$status" 0 1 2 3 15
+trap "cd $here; rm -rf $tmp $tmp.*; exit \$status" 0 1 2 3 15
# real QA test starts here
mkdir $tmp
@@ -50,56 +50,62 @@ cd $tmp
for inst in "bin-100" "bin-100,bin-500,bin-900"
do
echo
- echo "All volumes present ... $inst ..."
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ echo "All volumes present ... $inst ..." | tee -a $here/$seq.full
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
echo
- echo "First volume missing ... $inst ..."
+ echo "First volume missing ... $inst ..." | tee -a $here/$seq.full
mv ok-mv-bar.0 foo.0
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
mv foo.0 ok-mv-bar.0
echo
- echo "Last volume missing ... $inst ..."
+ echo "Last volume missing ... $inst ..." | tee -a $here/$seq.full
mv ok-mv-bar.3 foo.3
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
mv foo.3 ok-mv-bar.3
echo
- echo "Second volume missing ... $inst ..."
+ echo "Second volume missing ... $inst ..." | tee -a $here/$seq.full
mv ok-mv-bar.1 foo.1
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
mv foo.1 ok-mv-bar.1
echo
- echo "Second last volume missing ... $inst ..."
+ echo "Second last volume missing ... $inst ..." | tee -a $here/$seq.full
mv ok-mv-bar.2 foo.2
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
mv foo.2 ok-mv-bar.2
echo
- echo "All volumes but second missing ... $inst ..."
+ echo "All volumes but second missing ... $inst ..." | tee -a $here/$seq.full
mv ok-mv-bar.0 foo.0
mv ok-mv-bar.2 foo.2
mv ok-mv-bar.3 foo.3
- pmval -z -O $offset -D128 -t2 -a ok-mv-bar -i $inst sampledso.bin 2>err >out
- egrep 'Skip|Change' err
- _filter <out
+ pmval -z -O $offset -Dlog -t2 -a ok-mv-bar -i $inst sampledso.bin 2>$tmp.err >$tmp.out
+ cat $tmp.err >>$here/$seq.full
+ grep '^__pmLogChangeVol:' $tmp.err
+ _filter <$tmp.out
[ -f die ] && exit
mv foo.0 ok-mv-bar.0
mv foo.2 ok-mv-bar.2
diff --git a/src/libpcp/src/fetch.c b/src/libpcp/src/fetch.c
index 5328a2807..01d5bf7fc 100644
--- a/src/libpcp/src/fetch.c
+++ b/src/libpcp/src/fetch.c
@@ -458,6 +458,7 @@ pmSetMode(int mode, const struct timeval *when, int delta)
/* assume PM_CONTEXT_ARCHIVE */
if (l_mode == PM_MODE_INTERP ||
l_mode == PM_MODE_FORW || l_mode == PM_MODE_BACK) {
+ int lsts;
if (when != NULL) {
/*
* special case of NULL for timestamp
@@ -468,7 +469,18 @@ pmSetMode(int mode, const struct timeval *when, int delta)
}
ctxp->c_mode = mode;
ctxp->c_delta = delta;
- __pmLogSetTime(ctxp);
+ lsts = __pmLogSetTime(ctxp);
+ if (lsts < 0) {
+ /*
+ * most unlikely; not much we can do here but expect
+ * PMAPI error to be returned once pmFetch's start
+ */
+ if (pmDebugOptions.log) {
+ char errmsg[PM_MAXERRMSGLEN];
+ fprintf(stderr, "pmSetMode: __pmLogSetTime failed: %s\n",
+ pmErrStr_r(lsts, errmsg, sizeof(errmsg)));
+ }
+ }
__pmLogResetInterp(ctxp);
sts = 0;
}
diff --git a/src/libpcp/src/internal.h b/src/libpcp/src/internal.h
index 977efdcf6..fd8d6e740 100644
--- a/src/libpcp/src/internal.h
+++ b/src/libpcp/src/internal.h
@@ -407,7 +407,7 @@ extern int __pmLogGenerateMark(__pmLogCtl *, int, pmResult **) _PCP_HIDDEN;
extern int __pmLogFetchInterp(__pmContext *, int, pmID *, pmResult **) _PCP_HIDDEN;
extern int __pmGetArchiveLabel(__pmLogCtl *, pmLogLabel *) _PCP_HIDDEN;
extern pmTimeval *__pmLogStartTime(__pmArchCtl *) _PCP_HIDDEN;
-extern void __pmLogSetTime(__pmContext *) _PCP_HIDDEN;
+extern int __pmLogSetTime(__pmContext *) _PCP_HIDDEN;
extern void __pmLogResetInterp(__pmContext *) _PCP_HIDDEN;
extern void __pmArchCtlFree(__pmArchCtl *) _PCP_HIDDEN;
extern int __pmLogChangeArchive(__pmContext *, int) _PCP_HIDDEN;
diff --git a/src/libpcp/src/logutil.c b/src/libpcp/src/logutil.c
index 0ef76de25..2ea559bfe 100644
--- a/src/libpcp/src/logutil.c
+++ b/src/libpcp/src/logutil.c
@@ -1995,7 +1995,6 @@ __pmLogFetch(__pmContext *ctxp, int numpmid, pmID pmidlist[], pmResult **result)
sts = __pmLogChangeVol(ctxp->c_archctl, ctxp->c_archctl->ac_vol);
if (sts < 0)
goto func_return;
- assert(ctxp->c_archctl->ac_mfp != NULL);
__pmFseek(ctxp->c_archctl->ac_mfp,
(long)ctxp->c_archctl->ac_offset, SEEK_SET);
@@ -2010,7 +2009,9 @@ more:
* no serial access, so need to make sure we are
* starting in the correct place
*/
- __pmLogSetTime(ctxp);
+ sts = __pmLogSetTime(ctxp);
+ if (sts < 0)
+ goto func_return;
ctxp->c_archctl->ac_offset = __pmFtell(ctxp->c_archctl->ac_mfp);
ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_curvol;
/*
@@ -2299,7 +2300,7 @@ VolSkip(__pmArchCtl *acp, int mode, int j)
return PM_ERR_EOL;
}
-void
+int
__pmLogSetTime(__pmContext *ctxp)
{
__pmArchCtl *acp = ctxp->c_archctl;
@@ -2356,6 +2357,7 @@ __pmLogSetTime(__pmContext *ctxp)
if (lcp->l_numti) {
/* we have a temporal index, use it! */
int j = -1;
+ int try;
int toobig = 0;
int match = 0;
int vol;
@@ -2406,9 +2408,13 @@ __pmLogSetTime(__pmContext *ctxp)
acp->ac_serial = 1;
if (match) {
+ try = j;
j = VolSkip(acp, mode, j);
- if (j < 0)
- return;
+ if (j < 0) {
+ if (pmDebugOptions.log)
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #1\n", mode, try);
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
if (mode == PM_MODE_BACK)
acp->ac_serial = 0;
@@ -2418,9 +2424,13 @@ __pmLogSetTime(__pmContext *ctxp)
}
}
else if (j < 1) {
+ try = 0;
j = VolSkip(acp, PM_MODE_FORW, 0);
- if (j < 0)
- return;
+ if (j < 0) {
+ if (pmDebugOptions.log)
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #2\n", PM_MODE_FORW, try);
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
if (pmDebugOptions.log) {
fprintf(stderr, " before start ti@");
@@ -2428,9 +2438,13 @@ __pmLogSetTime(__pmContext *ctxp)
}
}
else if (j == numti) {
+ try = numti-1;
j = VolSkip(acp, PM_MODE_BACK, numti-1);
- if (j < 0)
- return;
+ if (j < 0) {
+ if (pmDebugOptions.log)
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #3\n", PM_MODE_BACK, try);
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
if (mode == PM_MODE_BACK)
acp->ac_serial = 0;
@@ -2450,9 +2464,13 @@ __pmLogSetTime(__pmContext *ctxp)
t_hi = __pmTimevalSub(&lcp->l_ti[j].ti_stamp, &ctxp->c_origin);
t_lo = __pmTimevalSub(&ctxp->c_origin, &lcp->l_ti[j-1].ti_stamp);
if (t_hi <= t_lo && !toobig) {
+ try = j;
j = VolSkip(acp, mode, j);
- if (j < 0)
- return;
+ if (j < 0) {
+ if (pmDebugOptions.log)
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #4\n", mode, try);
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
if (mode == PM_MODE_FORW)
acp->ac_serial = 0;
@@ -2462,9 +2480,13 @@ __pmLogSetTime(__pmContext *ctxp)
}
}
else {
+ try = j-1;
j = VolSkip(acp, mode, j-1);
- if (j < 0)
- return;
+ if (j < 0) {
+ if (pmDebugOptions.log)
+ fprintf(stderr, "__pmLogSetTime: VolSkip mode=%d vol=%d failed #5\n", mode, try);
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)lcp->l_ti[j].ti_log, SEEK_SET);
if (mode == PM_MODE_BACK)
acp->ac_serial = 0;
@@ -2490,14 +2512,37 @@ __pmLogSetTime(__pmContext *ctxp)
}
else {
/* index either not available, or not useful */
+ int j;
if (mode == PM_MODE_FORW) {
- __pmLogChangeVol(acp, lcp->l_minvol);
- assert(acp->ac_mfp != NULL);
+ for (j = lcp->l_minvol; j <= lcp->l_maxvol; j++) {
+ if (__pmLogChangeVol(acp, j) >= 0)
+ break;
+ }
+ if (j > lcp->l_maxvol) {
+ /* no volume found */
+ if (pmDebugOptions.log)
+ fprintf(stderr, " index not useful, no volume between %d...%d\n",
+ lcp->l_minvol, lcp->l_maxvol);
+ acp->ac_curvol = -1;
+ acp->ac_mfp = NULL;
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)(sizeof(__pmLogLabel) + 2*sizeof(int)), SEEK_SET);
}
else if (mode == PM_MODE_BACK) {
- __pmLogChangeVol(acp, lcp->l_maxvol);
- assert(acp->ac_mfp != NULL);
+ for (j = lcp->l_maxvol; j >= lcp->l_minvol; j--) {
+ if (__pmLogChangeVol(acp, j) >= 0)
+ break;
+ }
+ if (j < lcp->l_minvol) {
+ /* no volume found */
+ if (pmDebugOptions.log)
+ fprintf(stderr, " index not useful, no volume between %d...%d\n",
+ lcp->l_maxvol, lcp->l_minvol);
+ acp->ac_curvol = -1;
+ acp->ac_mfp = NULL;
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
}
@@ -2513,6 +2558,8 @@ __pmLogSetTime(__pmContext *ctxp)
acp->ac_offset = __pmFtell(acp->ac_mfp);
assert(acp->ac_offset >= 0);
acp->ac_vol = acp->ac_curvol;
+
+ return 0;
}
/* Read the label of the current archive. */
@@ -3100,6 +3147,7 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
pmTimeval save_origin;
int save_mode;
int sts;
+ int j;
/*
* Check whether there is a previous archive to switch to.
@@ -3145,12 +3193,23 @@ LogChangeToPreviousArchive(__pmContext *ctxp)
}
/* Set up to scan backwards from the end of the archive. */
- __pmLogChangeVol(acp, lcp->l_maxvol);
- assert(acp->ac_mfp != NULL);
+ for (j = lcp->l_maxvol; j >= lcp->l_minvol; j--) {
+ if (__pmLogChangeVol(acp, j) >= 0)
+ break;
+ }
+ if (j < lcp->l_minvol) {
+ /* no volume found */
+ if (pmDebugOptions.log)
+ fprintf(stderr, "LogChangeToPreviousArchive: no volume between %d...%d\n",
+ lcp->l_maxvol, lcp->l_minvol);
+ acp->ac_curvol = -1;
+ acp->ac_mfp = NULL;
+ return PM_ERR_LOGFILE;
+ }
__pmFseek(acp->ac_mfp, (long)0, SEEK_END);
- ctxp->c_archctl->ac_offset = __pmFtell(acp->ac_mfp);
- assert(ctxp->c_archctl->ac_offset >= 0);
- ctxp->c_archctl->ac_vol = ctxp->c_archctl->ac_curvol;
+ acp->ac_offset = __pmFtell(acp->ac_mfp);
+ assert(acp->ac_offset >= 0);
+ acp->ac_vol = acp->ac_curvol;
/*
* Check for temporal overlap here. Do this last in case the API client
commit 5e3b792d3d8ae60f2cebbd51c37b9b0722c3b26e
Author: Mark Goodwin <mgoodwin@redhat.com>
Date: Tue Jul 6 20:09:28 2021 +1000
libpcp_web: plug mem leak in redisMapInsert during daily log-rolling
When pmlogger_daily processes daily archives, the resulting
merged archive(s) are discovered and processed by pmproxy
(if the discovery module is enabled). Since the metadata and
logvol data in each merged archive is likely to have already
been previously processed (but discovery doesn't know this),
we see a lot of dict updates for existing keys and values that
are already mapped.
Static analysis by Coverity (CID323605 Resource Leak) shows
when redisMapInsert calls dictAdd for an existing key, the
new value field is assigned but the old value is not free'd,
and so it leaks.
Related: RHBZ1975069 and https://github.com/performancecopilot/pcp/issues/1318
diff --git a/src/libpcp_web/src/maps.c b/src/libpcp_web/src/maps.c
index 013ef02d3..ce20476c9 100644
--- a/src/libpcp_web/src/maps.c
+++ b/src/libpcp_web/src/maps.c
@@ -160,6 +160,12 @@ redisMapLookup(redisMap *map, sds key)
void
redisMapInsert(redisMap *map, sds key, sds value)
{
+ redisMapEntry *entry = redisMapLookup(map, key);
+
+ if (entry) {
+ /* fix for Coverity CID323605 Resource Leak */
+ dictDelete(map, key);
+ }
dictAdd(map, key, value);
}
commit 2a00a90b0bc3aecb8465fd32aef1ddbe745b2c91
Author: Mark Goodwin <mgoodwin@redhat.com>
Date: Tue Jul 6 20:43:01 2021 +1000
libpcp_web/discovery: improve lock handling and scalability
Rework the global log-rolling lock detection with finer grain
(per-pmlogger directory) detection, and break early in
process_meta() and process_logvol() if a lock file is found
in the same directory as a monitored archive. This is much
more scalable since archive directories that are not locked
can continue to be processed and ingested whilst log-rolling
progresses elsewhere. Also uses much less CPU time since we
don't need a traversal of all monitored archives looking for
locks on every fs change event.
Also improve process_logvol/meta handling for archives that
are deleted whilst being processed by the discovery module.
In conjunction with Kenj's changes in libpcp - stop processing
metadata and logvols if pmFetchArchive returns -ENOENT .. the
archive has been deleted so there is no point further ingesting
it's data.
Related: RHBZ1975069
Related: https://github.com/performancecopilot/pcp/issues/1338
diff --git a/src/libpcp_web/src/discover.c b/src/libpcp_web/src/discover.c
index 991055ce5..964813f66 100644
--- a/src/libpcp_web/src/discover.c
+++ b/src/libpcp_web/src/discover.c
@@ -33,9 +33,6 @@ static char *pmDiscoverFlagsStr(pmDiscover *);
#define PM_DISCOVER_HASHTAB_SIZE 32
static pmDiscover *discover_hashtable[PM_DISCOVER_HASHTAB_SIZE];
-/* pmlogger_daily log-roll lock count */
-static int logrolling = 0;
-
/* number of archives or directories currently being monitored */
static int n_monitored = 0;
@@ -426,28 +423,6 @@ is_deleted(pmDiscover *p, struct stat *sbuf)
return ret;
}
-static int
-check_for_locks()
-{
- int i;
- pmDiscover *p;
- char sep = pmPathSeparator();
- char path[MAXNAMELEN];
-
- for (i=0; i < PM_DISCOVER_HASHTAB_SIZE; i++) {
- for (p = discover_hashtable[i]; p; p = p->next) {
- if (p->flags & PM_DISCOVER_FLAGS_DIRECTORY) {
- pmsprintf(path, sizeof(path), "%s%c%s", p->context.name, sep, "lock");
- if (access(path, F_OK) == 0)
- return 1;
- }
- }
- }
-
- /* no locks */
- return 0;
-}
-
static void
check_deleted(pmDiscover *p)
{
@@ -465,37 +440,8 @@ fs_change_callBack(uv_fs_event_t *handle, const char *filename, int events, int
pmDiscover *p;
char *s;
sds path;
- int locksfound = 0;
struct stat statbuf;
- /*
- * check if logs are currently being rolled by pmlogger_daily et al
- * in any of the directories we are tracking. For mutex, the log control
- * scripts use a 'lock' file in each directory as it is processed.
- */
- locksfound = check_for_locks();
-
- if (!logrolling && locksfound) {
- /* log-rolling has started */
- if (pmDebugOptions.discovery)
- fprintf(stderr, "%s discovery callback: log-rolling in progress\n", stamp());
- logrolling = locksfound;
- return;
- }
-
- if (logrolling && locksfound) {
- logrolling = locksfound;
- return; /* still in progress */
- }
-
- if (logrolling && !locksfound) {
- /* log-rolling is finished: check what got deleted, and then purge */
- if (pmDebugOptions.discovery)
- fprintf(stderr, "%s discovery callback: finished log-rolling\n", stamp());
- pmDiscoverTraverse(PM_DISCOVER_FLAGS_META|PM_DISCOVER_FLAGS_DATAVOL, check_deleted);
- }
- logrolling = locksfound;
-
uv_fs_event_getpath(handle, buffer, &bytes);
path = sdsnewlen(buffer, bytes);
@@ -1037,6 +983,17 @@ pmDiscoverNewSource(pmDiscover *p, int context)
pmDiscoverInvokeSourceCallBacks(p, &timestamp);
}
+static char *
+archive_dir_lock_path(pmDiscover *p)
+{
+ char path[MAXNAMELEN], lockpath[MAXNAMELEN];
+ int sep = pmPathSeparator();
+
+ strncpy(path, p->context.name, sizeof(path)-1);
+ pmsprintf(lockpath, sizeof(lockpath), "%s%c%s", dirname(path), sep, "lock");
+ return strndup(lockpath, sizeof(lockpath));
+}
+
/*
* Process metadata records until EOF. That can span multiple
* callbacks if we get a partial record read.
@@ -1059,6 +1016,7 @@ process_metadata(pmDiscover *p)
__pmLogHdr hdr;
sds msg, source;
static uint32_t *buf = NULL;
+ char *lock_path;
int deleted;
struct stat sbuf;
static int buflen = 0;
@@ -1073,7 +1031,10 @@ process_metadata(pmDiscover *p)
fprintf(stderr, "process_metadata: %s in progress %s\n",
p->context.name, pmDiscoverFlagsStr(p));
pmDiscoverStatsAdd(p->module, "metadata.callbacks", NULL, 1);
+ lock_path = archive_dir_lock_path(p);
for (;;) {
+ if (lock_path && access(lock_path, F_OK) == 0)
+ break;
pmDiscoverStatsAdd(p->module, "metadata.loops", NULL, 1);
off = lseek(p->fd, 0, SEEK_CUR);
nb = read(p->fd, &hdr, sizeof(__pmLogHdr));
@@ -1240,6 +1201,9 @@ process_metadata(pmDiscover *p)
/* flag that all available metadata has now been read */
p->flags &= ~PM_DISCOVER_FLAGS_META_IN_PROGRESS;
+ if (lock_path)
+ free(lock_path);
+
if (pmDebugOptions.discovery)
fprintf(stderr, "%s: completed, partial=%d %s %s\n",
"process_metadata", partial, p->context.name, pmDiscoverFlagsStr(p));
@@ -1266,14 +1230,18 @@ static void
process_logvol(pmDiscover *p)
{
int sts;
- pmResult *r;
+ pmResult *r = NULL;
pmTimespec ts;
int oldcurvol;
__pmContext *ctxp;
__pmArchCtl *acp;
+ char *lock_path;
pmDiscoverStatsAdd(p->module, "logvol.callbacks", NULL, 1);
+ lock_path = archive_dir_lock_path(p);
for (;;) {
+ if (lock_path && access(lock_path, F_OK) == 0)
+ break;
pmDiscoverStatsAdd(p->module, "logvol.loops", NULL, 1);
pmUseContext(p->ctx);
ctxp = __pmHandleToPtr(p->ctx);
@@ -1312,6 +1280,7 @@ process_logvol(pmDiscover *p)
}
/* we are done - return and wait for another callback */
+ r = NULL;
break;
}
@@ -1328,14 +1297,15 @@ process_logvol(pmDiscover *p)
}
/*
- * TODO: persistently save current timestamp, so after being restarted,
- * pmproxy can resume where it left off for each archive.
+ * TODO (perhaps): persistently save current timestamp, so after being
+ * restarted, pmproxy can resume where it left off for each archive.
*/
ts.tv_sec = r->timestamp.tv_sec;
ts.tv_nsec = r->timestamp.tv_usec * 1000;
bump_logvol_decode_stats(p, r);
pmDiscoverInvokeValuesCallBack(p, &ts, r);
pmFreeResult(r);
+ r = NULL;
}
if (r) {
@@ -1348,6 +1318,9 @@ process_logvol(pmDiscover *p)
/* datavol is now up-to-date and at EOF */
p->flags &= ~PM_DISCOVER_FLAGS_DATAVOL_READY;
+
+ if (lock_path)
+ free(lock_path);
}
static void
@@ -1357,6 +1330,10 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
sds msg;
sds metaname;
+ check_deleted(p);
+ if (p->flags & PM_DISCOVER_FLAGS_DELETED)
+ return; /* ignore deleted archive */
+
if (p->ctx < 0) {
/*
* once off initialization on the first event
@@ -1366,16 +1343,23 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
/* create the PMAPI context (once off) */
if ((sts = pmNewContext(p->context.type, p->context.name)) < 0) {
- /*
- * Likely an early callback on a new (still empty) archive.
- * If so, just ignore the callback and don't log any scary
- * looking messages. We'll get another CB soon.
- */
- if (sts != PM_ERR_NODATA || pmDebugOptions.desperate) {
- infofmt(msg, "pmNewContext failed for %s: %s\n",
- p->context.name, pmErrStr(sts));
- moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
+ if (sts == -ENOENT) {
+ /* newly deleted archive */
+ p->flags |= PM_DISCOVER_FLAGS_DELETED;
}
+ else {
+ /*
+ * Likely an early callback on a new (still empty) archive.
+ * If so, just ignore the callback and don't log any scary
+ * looking messages. We'll get another CB soon.
+ */
+ if (sts != PM_ERR_NODATA || pmDebugOptions.desperate) {
+ infofmt(msg, "pmNewContext failed for %s: %s\n",
+ p->context.name, pmErrStr(sts));
+ moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
+ }
+ }
+ /* no further processing for this archive */
return;
}
pmDiscoverStatsAdd(p->module, "logvol.new_contexts", NULL, 1);
@@ -1410,8 +1394,12 @@ pmDiscoverInvokeCallBacks(pmDiscover *p)
metaname = sdsnew(p->context.name);
metaname = sdscat(metaname, ".meta");
if ((p->fd = open(metaname, O_RDONLY)) < 0) {
- infofmt(msg, "open failed for %s: %s\n", metaname, osstrerror());
- moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
+ if (p->fd == -ENOENT)
+ p->flags |= PM_DISCOVER_FLAGS_DELETED;
+ else {
+ infofmt(msg, "open failed for %s: %s\n", metaname, osstrerror());
+ moduleinfo(p->module, PMLOG_ERROR, msg, p->data);
+ }
sdsfree(metaname);
return;
}

3988
SPECS/pcp.spec Normal file

File diff suppressed because it is too large Load Diff