systemtap/SOURCES/pr29108.patch

1846 lines
65 KiB
Diff

commit bf95ad72c984c9e68d12707c4d34dbe6bc1f89f2
gpg: Signature made Sat 12 Aug 2023 02:49:06 PM EDT
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
Author: Aliaksandr Valialkin <valyala@gmail.com>
Date: Thu Jul 27 18:52:37 2023 -0400
runtime/staprun: import gheap routines
BSD-2-Clause gift from the Aliaksandr Valialkin:
https://github.com/valyala/gheap
diff --git a/staprun/gheap.h b/staprun/gheap.h
new file mode 100644
index 000000000..4af4b29ed
--- /dev/null
+++ b/staprun/gheap.h
@@ -0,0 +1,561 @@
+#ifndef GHEAP_H
+#define GHEAP_H
+
+/*
+ * Generalized heap implementation for C99.
+ *
+ * Don't forget passing -DNDEBUG option to the compiler when creating optimized
+ * builds. This significantly speeds up gheap code by removing debug assertions.
+ *
+ * Author: Aliaksandr Valialkin <valyala@gmail.com>.
+ */
+/*
+Copyright (c) 2011 Aliaksandr Valialkin <valyala@gmail.com>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.
+*/
+
+
+
+/*******************************************************************************
+ * Interface.
+ ******************************************************************************/
+
+#include <stddef.h> /* for size_t */
+#include <stdint.h> /* for SIZE_MAX */
+
+/*
+ * Less comparer must return non-zero value if a < b.
+ * ctx is the gheap_ctx->less_comparer_ctx.
+ * Otherwise it must return 0.
+ */
+typedef int (*gheap_less_comparer_t)(const void *ctx, const void *a,
+ const void *b);
+
+/*
+ * Moves the item from src to dst.
+ */
+typedef void (*gheap_item_mover_t)(void *dst, const void *src);
+
+/*
+ * Gheap context.
+ * This context must be passed to every gheap function.
+ */
+struct gheap_ctx
+{
+ /*
+ * How much children each heap item can have.
+ */
+ size_t fanout;
+
+ /*
+ * A chunk is a tuple containing fanout items arranged sequentially in memory.
+ * A page is a subheap containing page_chunks chunks arranged sequentially
+ * in memory.
+ * The number of chunks in a page is an arbitrary integer greater than 0.
+ */
+ size_t page_chunks;
+
+ /*
+ * The size of each item in bytes.
+ */
+ size_t item_size;
+
+ gheap_less_comparer_t less_comparer;
+ const void *less_comparer_ctx;
+
+ gheap_item_mover_t item_mover;
+};
+
+/*
+ * Returns parent index for the given child index.
+ * Child index must be greater than 0.
+ * Returns 0 if the parent is root.
+ */
+static inline size_t gheap_get_parent_index(const struct gheap_ctx *ctx,
+ size_t u);
+
+/*
+ * Returns the index of the first child for the given parent index.
+ * Parent index must be less than SIZE_MAX.
+ * Returns SIZE_MAX if the index of the first child for the given parent
+ * cannot fit size_t.
+ */
+static inline size_t gheap_get_child_index(const struct gheap_ctx *ctx,
+ size_t u);
+
+/*
+ * Returns a pointer to the first non-heap item using less_comparer
+ * for items' comparison.
+ * Returns the index of the first non-heap item.
+ * Returns heap_size if base points to valid max heap with the given size.
+ */
+static inline size_t gheap_is_heap_until(const struct gheap_ctx *ctx,
+ const void *base, size_t heap_size);
+
+/*
+ * Returns non-zero if base points to valid max heap. Returns zero otherwise.
+ * Uses less_comparer for items' comparison.
+ */
+static inline int gheap_is_heap(const struct gheap_ctx *ctx,
+ const void *base, size_t heap_size);
+
+/*
+ * Makes max heap from items base[0] ... base[heap_size-1].
+ * Uses less_comparer for items' comparison.
+ */
+static inline void gheap_make_heap(const struct gheap_ctx *ctx,
+ void *base, size_t heap_size);
+
+/*
+ * Pushes the item base[heap_size-1] into max heap base[0] ... base[heap_size-2]
+ * Uses less_comparer for items' comparison.
+ */
+static inline void gheap_push_heap(const struct gheap_ctx *ctx,
+ void *base, size_t heap_size);
+
+/*
+ * Pops the maximum item from max heap base[0] ... base[heap_size-1] into
+ * base[heap_size-1].
+ * Uses less_comparer for items' comparison.
+ */
+static inline void gheap_pop_heap(const struct gheap_ctx *ctx,
+ void *base, size_t heap_size);
+
+/*
+ * Sorts items in place of max heap in ascending order.
+ * Uses less_comparer for items' comparison.
+ */
+static inline void gheap_sort_heap(const struct gheap_ctx *ctx,
+ void *base, size_t heap_size);
+
+/*
+ * Swaps the item outside the heap with the maximum item inside
+ * the heap and restores heap invariant.
+ */
+static inline void gheap_swap_max_item(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, void *item);
+
+/*
+ * Restores max heap invariant after item's value has been increased,
+ * i.e. less_comparer(old_item, new_item) != 0.
+ */
+static inline void gheap_restore_heap_after_item_increase(
+ const struct gheap_ctx *ctx,
+ void *base, size_t heap_size, size_t modified_item_index);
+
+/*
+ * Restores max heap invariant after item's value has been decreased,
+ * i.e. less_comparer(new_item, old_item) != 0.
+ */
+static inline void gheap_restore_heap_after_item_decrease(
+ const struct gheap_ctx *ctx,
+ void *base, size_t heap_size, size_t modified_item_index);
+
+/*
+ * Removes the given item from the heap and puts it into base[heap_size-1].
+ * Uses less_comparer for items' comparison.
+ */
+static inline void gheap_remove_from_heap(const struct gheap_ctx *ctx,
+ void *base, size_t heap_size, size_t item_index);
+
+/*******************************************************************************
+ * Implementation.
+ *
+ * Define all functions inline, so compiler will be able optimizing out common
+ * args (fanout, page_chunks, item_size, less_comparer and item_mover),
+ * which are usually constants, using contant folding optimization
+ * ( http://en.wikipedia.org/wiki/Constant_folding ).
+ *****************************************************************************/
+
+#include <assert.h> /* for assert */
+#include <stddef.h> /* for size_t */
+#include <stdint.h> /* for uintptr_t, SIZE_MAX and UINTPTR_MAX */
+
+static inline size_t gheap_get_parent_index(const struct gheap_ctx *const ctx,
+ size_t u)
+{
+ assert(u > 0);
+
+ const size_t fanout = ctx->fanout;
+ const size_t page_chunks = ctx->page_chunks;
+
+ --u;
+ if (page_chunks == 1) {
+ return u / fanout;
+ }
+
+ if (u < fanout) {
+ /* Parent is root. */
+ return 0;
+ }
+
+ assert(page_chunks <= SIZE_MAX / fanout);
+ const size_t page_size = fanout * page_chunks;
+ size_t v = u % page_size;
+ if (v >= fanout) {
+ /* Fast path. Parent is on the same page as the child. */
+ return u - v + v / fanout;
+ }
+
+ /* Slow path. Parent is on another page. */
+ v = u / page_size - 1;
+ const size_t page_leaves = (fanout - 1) * page_chunks + 1;
+ u = v / page_leaves + 1;
+ return u * page_size + v % page_leaves - page_leaves + 1;
+}
+
+static inline size_t gheap_get_child_index(const struct gheap_ctx *const ctx,
+ size_t u)
+{
+ assert(u < SIZE_MAX);
+
+ const size_t fanout = ctx->fanout;
+ const size_t page_chunks = ctx->page_chunks;
+
+ if (page_chunks == 1) {
+ if (u > (SIZE_MAX - 1) / fanout) {
+ /* Child overflow. */
+ return SIZE_MAX;
+ }
+ return u * fanout + 1;
+ }
+
+ if (u == 0) {
+ /* Root's child is always 1. */
+ return 1;
+ }
+
+ assert(page_chunks <= SIZE_MAX / fanout);
+ const size_t page_size = fanout * page_chunks;
+ --u;
+ size_t v = u % page_size + 1;
+ if (v < page_size / fanout) {
+ /* Fast path. Child is on the same page as the parent. */
+ v *= fanout - 1;
+ if (u > SIZE_MAX - 2 - v) {
+ /* Child overflow. */
+ return SIZE_MAX;
+ }
+ return u + v + 2;
+ }
+
+ /* Slow path. Child is on another page. */
+ const size_t page_leaves = (fanout - 1) * page_chunks + 1;
+ v += (u / page_size + 1) * page_leaves - page_size;
+ if (v > (SIZE_MAX - 1) / page_size) {
+ /* Child overflow. */
+ return SIZE_MAX;
+ }
+ return v * page_size + 1;
+}
+
+/* Returns a pointer to base[index]. */
+static inline void *_gheap_get_item_ptr(const struct gheap_ctx *const ctx,
+ const void *const base, const size_t index)
+{
+ const size_t item_size = ctx->item_size;
+
+ assert(index <= SIZE_MAX / item_size);
+
+ const size_t offset = item_size * index;
+ assert((uintptr_t)base <= UINTPTR_MAX - offset);
+
+ return ((char *)base) + offset;
+}
+
+/*
+ * Sifts the item up in the given sub-heap with the given root_index
+ * starting from the hole_index.
+ */
+static inline void _gheap_sift_up(const struct gheap_ctx *const ctx,
+ void *const base, const size_t root_index, size_t hole_index,
+ const void *const item)
+{
+ assert(hole_index >= root_index);
+
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ while (hole_index > root_index) {
+ const size_t parent_index = gheap_get_parent_index(ctx, hole_index);
+ assert(parent_index >= root_index);
+ const void *const parent = _gheap_get_item_ptr(ctx, base, parent_index);
+ if (!less_comparer(less_comparer_ctx, parent, item)) {
+ break;
+ }
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index),
+ parent);
+ hole_index = parent_index;
+ }
+
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index), item);
+}
+
+/*
+ * Moves the max child into the given hole and returns index
+ * of the new hole.
+ */
+static inline size_t _gheap_move_up_max_child(const struct gheap_ctx *const ctx,
+ void *const base, const size_t children_count,
+ const size_t hole_index, const size_t child_index)
+{
+ assert(children_count > 0);
+ assert(children_count <= ctx->fanout);
+ assert(child_index == gheap_get_child_index(ctx, hole_index));
+
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ size_t max_child_index = child_index;
+ for (size_t i = 1; i < children_count; ++i) {
+ if (!less_comparer(less_comparer_ctx,
+ _gheap_get_item_ptr(ctx, base, child_index + i),
+ _gheap_get_item_ptr(ctx, base, max_child_index))) {
+ max_child_index = child_index + i;
+ }
+ }
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index),
+ _gheap_get_item_ptr(ctx, base, max_child_index));
+ return max_child_index;
+}
+
+/*
+ * Sifts the given item down in the heap of the given size starting
+ * from the hole_index.
+ */
+static inline void _gheap_sift_down(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, size_t hole_index,
+ const void *const item)
+{
+ assert(heap_size > 0);
+ assert(hole_index < heap_size);
+
+ const size_t fanout = ctx->fanout;
+
+ const size_t root_index = hole_index;
+ const size_t last_full_index = heap_size - (heap_size - 1) % fanout;
+ while (1) {
+ const size_t child_index = gheap_get_child_index(ctx, hole_index);
+ if (child_index >= last_full_index) {
+ if (child_index < heap_size) {
+ assert(child_index == last_full_index);
+ hole_index = _gheap_move_up_max_child(ctx, base,
+ heap_size - child_index, hole_index, child_index);
+ }
+ break;
+ }
+ assert(heap_size - child_index >= fanout);
+ hole_index = _gheap_move_up_max_child(ctx, base, fanout, hole_index,
+ child_index);
+ }
+ _gheap_sift_up(ctx, base, root_index, hole_index, item);
+}
+
+/*
+ * Pops the maximum item from the heap [base[0] ... base[heap_size-1]]
+ * into base[heap_size].
+ */
+static inline void _gheap_pop_max_item(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size)
+{
+ void *const hole = _gheap_get_item_ptr(ctx, base, heap_size);
+ gheap_swap_max_item(ctx, base, heap_size, hole);
+}
+
+static inline size_t gheap_is_heap_until(const struct gheap_ctx *const ctx,
+ const void *const base, const size_t heap_size)
+{
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
+
+ for (size_t u = 1; u < heap_size; ++u) {
+ const size_t v = gheap_get_parent_index(ctx, u);
+ const void *const a = _gheap_get_item_ptr(ctx, base, v);
+ const void *const b = _gheap_get_item_ptr(ctx, base, u);
+ if (less_comparer(less_comparer_ctx, a, b)) {
+ return u;
+ }
+ }
+ return heap_size;
+}
+
+static inline int gheap_is_heap(const struct gheap_ctx *const ctx,
+ const void *const base, const size_t heap_size)
+{
+ return (gheap_is_heap_until(ctx, base, heap_size) == heap_size);
+}
+
+static inline void gheap_make_heap(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size)
+{
+ const size_t fanout = ctx->fanout;
+ const size_t page_chunks = ctx->page_chunks;
+ const size_t item_size = ctx->item_size;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ if (heap_size > 1) {
+ /* Skip leaf nodes without children. This is easy to do for non-paged heap,
+ * i.e. when page_chunks = 1, but it is difficult for paged heaps.
+ * So leaf nodes in paged heaps are visited anyway.
+ */
+ size_t i = (page_chunks == 1) ? ((heap_size - 2) / fanout) :
+ (heap_size - 2);
+ do {
+ char tmp[item_size];
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, i));
+ _gheap_sift_down(ctx, base, heap_size, i, tmp);
+ } while (i-- > 0);
+ }
+
+ assert(gheap_is_heap(ctx, base, heap_size));
+}
+
+static inline void gheap_push_heap(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size)
+{
+ assert(heap_size > 0);
+ assert(gheap_is_heap(ctx, base, heap_size - 1));
+
+ const size_t item_size = ctx->item_size;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ if (heap_size > 1) {
+ const size_t u = heap_size - 1;
+ char tmp[item_size];
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, u));
+ _gheap_sift_up(ctx, base, 0, u, tmp);
+ }
+
+ assert(gheap_is_heap(ctx, base, heap_size));
+}
+
+static inline void gheap_pop_heap(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size)
+{
+ assert(heap_size > 0);
+ assert(gheap_is_heap(ctx, base, heap_size));
+
+ if (heap_size > 1) {
+ _gheap_pop_max_item(ctx, base, heap_size - 1);
+ }
+
+ assert(gheap_is_heap(ctx, base, heap_size - 1));
+}
+
+static inline void gheap_sort_heap(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size)
+{
+ for (size_t i = heap_size; i > 1; --i) {
+ _gheap_pop_max_item(ctx, base, i - 1);
+ }
+}
+
+static inline void gheap_swap_max_item(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, void *item)
+{
+ assert(heap_size > 0);
+ assert(gheap_is_heap(ctx, base, heap_size));
+
+ const size_t item_size = ctx->item_size;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ char tmp[item_size];
+ item_mover(tmp, item);
+ item_mover(item, base);
+ _gheap_sift_down(ctx, base, heap_size, 0, tmp);
+
+ assert(gheap_is_heap(ctx, base, heap_size));
+}
+
+static inline void gheap_restore_heap_after_item_increase(
+ const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, size_t modified_item_index)
+{
+ assert(heap_size > 0);
+ assert(modified_item_index < heap_size);
+ assert(gheap_is_heap(ctx, base, modified_item_index));
+
+ const size_t item_size = ctx->item_size;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ if (modified_item_index > 0) {
+ char tmp[item_size];
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, modified_item_index));
+ _gheap_sift_up(ctx, base, 0, modified_item_index, tmp);
+ }
+
+ assert(gheap_is_heap(ctx, base, heap_size));
+ (void)heap_size;
+}
+
+static inline void gheap_restore_heap_after_item_decrease(
+ const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, size_t modified_item_index)
+{
+ assert(heap_size > 0);
+ assert(modified_item_index < heap_size);
+ assert(gheap_is_heap(ctx, base, modified_item_index));
+
+ const size_t item_size = ctx->item_size;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ char tmp[item_size];
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, modified_item_index));
+ _gheap_sift_down(ctx, base, heap_size, modified_item_index, tmp);
+
+ assert(gheap_is_heap(ctx, base, heap_size));
+}
+
+static inline void gheap_remove_from_heap(const struct gheap_ctx *const ctx,
+ void *const base, const size_t heap_size, size_t item_index)
+{
+ assert(heap_size > 0);
+ assert(item_index < heap_size);
+ assert(gheap_is_heap(ctx, base, heap_size));
+
+ const size_t item_size = ctx->item_size;
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
+ const gheap_item_mover_t item_mover = ctx->item_mover;
+
+ const size_t new_heap_size = heap_size - 1;
+ if (item_index < new_heap_size) {
+ char tmp[item_size];
+ void *const hole = _gheap_get_item_ptr(ctx, base, new_heap_size);
+ item_mover(tmp, hole);
+ item_mover(hole, _gheap_get_item_ptr(ctx, base, item_index));
+ if (less_comparer(less_comparer_ctx, tmp, hole)) {
+ _gheap_sift_down(ctx, base, new_heap_size, item_index, tmp);
+ }
+ else {
+ _gheap_sift_up(ctx, base, 0, item_index, tmp);
+ }
+ }
+
+ assert(gheap_is_heap(ctx, base, new_heap_size));
+}
+
+#endif
commit 5b39471380a238469c8fc18136f12600e5e9aec7
gpg: Signature made Sat 12 Aug 2023 02:49:21 PM EDT
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
Author: Frank Ch. Eigler <fche@redhat.com>
Date: Mon Jul 31 14:06:57 2023 -0400
PR29108 / BZ2095359: rewrite staprun serializer logic
Logic in commit cd48874296e00 (2021, PR28449) fixed broken cross-cpu
message ordering that followed previous transport concurrency fixes,
but imposed a lot of userspace synchronization delays upon the threads
who were supposed to drain messages from the kernel relayfs streams as
fast as possible. This has led to unnecessarily lossy output overall.
New code uses a new many-writers single-reader data structure, a mutex
protected heap. All the per-cpu readers copy & pump messages into
that heap as rapidly as possible, sorted by the generally monotonic
sequence number. The reader is signalled via a condition variable and
time to print & release messages in sequence number order. It also
handles lost messages (jumps in the sequence numbers) by waiting a while
to let the stragglers come in.
The kernel-user messages now also include a framing sequence to allow
the per-cpu readers to resynchronize to the message boundaries, in
case some sort of buffer overflow or something else occurs. It
reports how many bytes and/or messages were skipped in order to
resynchronize. It does so in a lot less lossy way than previous code,
which just tried to flush everything then-currently available, hoping
that it'd match message boundaries.
Unfortunately, this means that the user-kernel message ABI has
changed! Previous-version staprun instances won't work with the new
modules, nor will current-version staprun with old modules. This flag
day is enforced by changing the numbers of the various ctl message
numbers, so old/new kernel/user combinations will generate errors
rather than quasi-successful staprun startup.
New code also dramatically simplifies the use of signals in staprun
(or rather stapio). Gone is the signal thread, a lot of the
masking/blocking/waiting. Instead a single basic signal handler just
increments globals when signals of various kinds arrive, and all the
per-cpu etc. threads poll those globals periodically. This includes
logic needed for -S (output file rotation on SIGUSR2) as well as
flight recorder (-L / -A) modes.
The reader_timeout_ms value (-T) in both bulk/serialized mode for all
ppoll timeouts, to prevent those threads from sleeping indefinitely,
now that they won't be bothered by signals.
diff --git a/configure b/configure
index 974cc2c81..1ff5580b4 100755
--- a/configure
+++ b/configure
@@ -12694,6 +12694,14 @@ printf "%s\n" "$as_me: WARNING: cannot find librpmio" >&2;}
fi
fi
+ac_fn_c_check_header_compile "$LINENO" "stdatomic.h" "ac_cv_header_stdatomic_h" "$ac_includes_default"
+if test "x$ac_cv_header_stdatomic_h" = xyes
+then :
+ printf "%s\n" "#define HAVE_STDATOMIC_H 1" >>confdefs.h
+
+fi
+
+
for ac_header in rpm/rpmcrypto.h
do :
ac_fn_c_check_header_compile "$LINENO" "rpm/rpmcrypto.h" "ac_cv_header_rpm_rpmcrypto_h" "$ac_includes_default"
diff --git a/configure.ac b/configure.ac
index 3f184f862..e9176b725 100644
--- a/configure.ac
+++ b/configure.ac
@@ -490,6 +490,8 @@ if test "$with_rpm" != "no"; then
fi
fi
+AC_CHECK_HEADERS([stdatomic.h])
+
dnl Look for rpmcrypto.h
AC_CHECK_HEADERS([rpm/rpmcrypto.h], [
AC_DEFINE([HAVE_RPMCRYPTO_H],[1],[have rpmcrypto_h])
diff --git a/man/stap.1.in b/man/stap.1.in
index 4e1f0a537..c1a81fef3 100644
--- a/man/stap.1.in
+++ b/man/stap.1.in
@@ -388,7 +388,7 @@ With \-o option, run staprun in background as a daemon and show its pid.
Sets the maximum size of output file and the maximum number of output files.
If the size of output file will exceed
.B size
-, systemtap switches output file to the next file. And if the number of
+megabytes, systemtap switches output file to the next file. And if the number of
output files exceed
.B N
, systemtap removes the oldest output file. You can omit the second argument.
diff --git a/runtime/print_flush.c b/runtime/print_flush.c
index 35677b225..4141f95b9 100644
--- a/runtime/print_flush.c
+++ b/runtime/print_flush.c
@@ -43,6 +43,7 @@ static void __stp_print_flush(struct _stp_log *log)
if (likely(entry && bytes_reserved > hlen)) {
/* copy new _stp_trace_ header */
struct _stp_trace t = {
+ .magic = STAP_TRACE_MAGIC,
.sequence = _stp_seq_inc(),
.pdu_len = len
};
diff --git a/runtime/transport/control.c b/runtime/transport/control.c
index 3d7333403..d0a8bdf53 100644
--- a/runtime/transport/control.c
+++ b/runtime/transport/control.c
@@ -57,7 +57,7 @@ static ssize_t _stp_ctl_write_cmd(struct file *file, const char __user *buf, siz
#if defined(DEBUG_TRANS) && (DEBUG_TRANS >= 2)
if (type < STP_MAX_CMD)
- dbug_trans2("Got %s. euid=%ld, len=%d\n", _stp_command_name[type],
+ dbug_trans2("Got %s. euid=%ld, len=%d\n", _stp_command_name[min(type,STP_MAX_CMD)] ?: "?",
(long)euid, (int)count);
#endif
@@ -211,7 +211,9 @@ out:
#if defined(DEBUG_TRANS) && (DEBUG_TRANS >= 2)
if (type < STP_MAX_CMD)
- dbug_trans2("Completed %s (rc=%d)\n", _stp_command_name[type], rc);
+ dbug_trans2("Completed %s (rc=%d)\n",
+ _stp_command_name[min(type,STP_MAX_CMD)] ?: "?",
+ rc);
#endif
return rc;
}
diff --git a/runtime/transport/transport_msgs.h b/runtime/transport/transport_msgs.h
index 9e0081c80..e3aa995b1 100644
--- a/runtime/transport/transport_msgs.h
+++ b/runtime/transport/transport_msgs.h
@@ -1,7 +1,7 @@
/* -*- linux-c -*-
* transport_msgs.h - messages exchanged between module and userspace
*
- * Copyright (C) Red Hat Inc, 2006-2011
+ * Copyright (C) Red Hat Inc, 2006-2023
*
* This file is part of systemtap, and is free software. You can
* redistribute it and/or modify it under the terms of the GNU General
@@ -19,7 +19,9 @@
#define STP_TZ_NAME_LEN 64
#define STP_REMOTE_URI_LEN 128
+#define STAP_TRACE_MAGIC "\xF0\x9F\xA9\xBA" /* unicode stethoscope 🩺 in UTF-8 */
struct _stp_trace {
+ char magic[4]; /* framing helper */
uint32_t sequence; /* event number */
uint32_t pdu_len; /* length of data after this trace */
};
@@ -30,7 +32,7 @@ enum
/** stapio sends a STP_START after recieving a STP_TRANSPORT from
the module. The module sends STP_START back with result of call
systemtap_module_init() which will install all initial probes. */
- STP_START,
+ STP_START = 0x50, // renumbered in version 5.0 to force incompatibility
/** stapio sends STP_EXIT to signal it wants to stop the module
itself or in response to receiving a STP_REQUEST_EXIT.
The module sends STP_EXIT once _stp_clean_and_exit has been
@@ -87,16 +89,21 @@ enum
/** Send by staprun to notify module of remote identity, if any.
Only send once at startup. */
STP_REMOTE_ID,
+ /** Placeholder, it was mistakenly labeled STP_MAX_CMD */
+ STP_MAX_CMD_PLACEHOLDER,
+ /** Sent by stapio after having recevied STP_TRANSPORT. Notifies
+ the module of the target namespaces pid.*/
+ STP_NAMESPACES_PID,
+
+ /** INSERT NEW MESSAGE TYPES HERE */
+
/** Max number of message types, sanity check only. */
STP_MAX_CMD,
- /** Sent by stapio after having recevied STP_TRANSPORT. Notifies
- the module of the target namespaces pid.*/
- STP_NAMESPACES_PID
};
#ifdef DEBUG_TRANS
-static const char *_stp_command_name[] = {
- "STP_START",
+static const char *_stp_command_name[STP_MAX_CMD] = {
+ [STP_START]="STP_START",
"STP_EXIT",
"STP_OOB_DATA",
"STP_SYSTEM",
@@ -113,7 +120,9 @@ static const char *_stp_command_name[] = {
"STP_TZINFO",
"STP_PRIVILEGE_CREDENTIALS",
"STP_REMOTE_ID",
- "STP_NAMESPACES_PID",
+ "STP_MAX_CMD_PLACEHOLDER",
+ "STP_NAMESPACE_PID",
+ [STP_MAX_CMD]="?" /* in control.c, STP_MAX_CMD represents unknown message numbers/names */
};
#endif /* DEBUG_TRANS */
diff --git a/staprun/common.c b/staprun/common.c
index 3d23d7319..f8d618e24 100644
--- a/staprun/common.c
+++ b/staprun/common.c
@@ -115,7 +115,7 @@ void parse_args(int argc, char **argv)
target_pid = 0;
target_namespaces_pid = 0;
buffer_size = 0;
- reader_timeout_ms = 0;
+ reader_timeout_ms = 200;
target_cmd = NULL;
outfile_name = NULL;
rename_mod = 0;
diff --git a/staprun/mainloop.c b/staprun/mainloop.c
index 4af21e950..c507fc069 100644
--- a/staprun/mainloop.c
+++ b/staprun/mainloop.c
@@ -7,7 +7,7 @@
* Public License (GPL); either version 2, or (at your option) any
* later version.
*
- * Copyright (C) 2005-2021 Red Hat Inc.
+ * Copyright (C) 2005-2023 Red Hat Inc.
*/
#include "staprun.h"
@@ -23,31 +23,9 @@
/* globals */
int ncpus;
-static int pending_interrupts = 0;
+static volatile sig_atomic_t pending_interrupts = 0; // tells stp_main_loop to trigger STP_EXIT message to kernel
static int target_pid_failed_p = 0;
-/* Setup by setup_main_signals, used by signal_thread to notify the
- main thread of interruptable events. */
-static pthread_t main_thread;
-
-static void set_nonblocking_std_fds(void)
-{
- int fd;
- for (fd = 1; fd < 3; fd++) {
- /* NB: writing to stderr/stdout blockingly in signal handler is
- * dangerous since it may prevent the stap process from quitting
- * gracefully on receiving SIGTERM/etc signals when the stderr/stdout
- * write buffer is full. PR23891 */
- int flags = fcntl(fd, F_GETFL);
- if (flags == -1)
- continue;
-
- if (flags & O_NONBLOCK)
- continue;
-
- (void) fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- }
-}
static void set_blocking_std_fds(void)
{
@@ -77,43 +55,16 @@ static void my_exit(int rc)
_exit(rc);
}
-static void *signal_thread(void *arg)
-{
- sigset_t *s = (sigset_t *) arg;
- int signum = 0;
- while (1) {
- if (sigwait(s, &signum) < 0) {
- _perr("sigwait");
- continue;
- }
+
+static void interrupt_handler(int signum)
+{
if (signum == SIGQUIT) {
load_only = 1; /* flag for stp_main_loop */
- pending_interrupts ++;
- } else if (signum == SIGINT || signum == SIGHUP || signum == SIGTERM
- || signum == SIGPIPE)
- {
- pending_interrupts ++;
}
- if (pending_interrupts > 2) {
- set_nonblocking_std_fds();
- pthread_kill (main_thread, SIGURG);
- }
- dbug(2, "sigproc %d (%s)\n", signum, strsignal(signum));
- }
- /* Notify main thread (interrupts select). */
- pthread_kill (main_thread, SIGURG);
- return NULL;
+ pending_interrupts ++;
}
-static void urg_proc(int signum)
-{
- /* This handler is just notified from the signal_thread
- whenever an interruptable condition is detected. The
- handler itself doesn't do anything. But this will
- result select to detect an EINTR event. */
- dbug(2, "urg_proc %d (%s)\n", signum, strsignal(signum));
-}
static void chld_proc(int signum)
{
@@ -143,9 +94,9 @@ static void chld_proc(int signum)
(void) rc; /* XXX: notused */
}
+
static void setup_main_signals(void)
{
- pthread_t tid;
struct sigaction sa;
sigset_t *s = malloc(sizeof(*s));
if (!s) {
@@ -153,25 +104,11 @@ static void setup_main_signals(void)
exit(1);
}
- /* The main thread will only handle SIGCHLD and SIGURG.
- SIGURG is send from the signal thread in case the interrupt
- flag is set. This will then interrupt any select call. */
- main_thread = pthread_self();
- sigfillset(s);
- pthread_sigmask(SIG_SETMASK, s, NULL);
-
memset(&sa, 0, sizeof(sa));
/* select will report EINTR even when SA_RESTART is set. */
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
- /* Ignore all these events on the main thread. */
- sa.sa_handler = SIG_IGN;
- sigaction(SIGINT, &sa, NULL);
- sigaction(SIGTERM, &sa, NULL);
- sigaction(SIGHUP, &sa, NULL);
- sigaction(SIGQUIT, &sa, NULL);
-
/* This is to notify when our child process (-c) ends. */
sa.sa_handler = chld_proc;
sigaction(SIGCHLD, &sa, NULL);
@@ -182,26 +119,21 @@ static void setup_main_signals(void)
sigaction(SIGWINCH, &sa, NULL);
}
- /* This signal handler is notified from the signal_thread
- whenever a interruptable event is detected. It will
- result in an EINTR event for select or sleep. */
- sa.sa_handler = urg_proc;
- sigaction(SIGURG, &sa, NULL);
-
- /* Everything else is handled on a special signal_thread. */
- sigemptyset(s);
- sigaddset(s, SIGINT);
- sigaddset(s, SIGTERM);
- sigaddset(s, SIGHUP);
- sigaddset(s, SIGQUIT);
- sigaddset(s, SIGPIPE);
- pthread_sigmask(SIG_SETMASK, s, NULL);
- if (pthread_create(&tid, NULL, signal_thread, s) < 0) {
- _perr(_("failed to create thread"));
- exit(1);
- }
+ // listen to these signals via general interrupt handler in whatever thread
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_flags = SA_RESTART;
+ sigfillset(&sa.sa_mask);
+
+ sa.sa_handler = interrupt_handler;
+ sigaction(SIGINT, &sa, NULL);
+ sigaction(SIGTERM, &sa, NULL);
+ sigaction(SIGHUP, &sa, NULL);
+ sigaction(SIGQUIT, &sa, NULL);
+
+ // Formerly, we had a signal catching thread.
}
+
/**
* system_cmd() executes system commands in response
* to an STP_SYSTEM message from the module. These
diff --git a/staprun/relay.c b/staprun/relay.c
index dea1d5ae9..08850b246 100644
--- a/staprun/relay.c
+++ b/staprun/relay.c
@@ -7,30 +7,32 @@
* Public License (GPL); either version 2, or (at your option) any
* later version.
*
- * Copyright (C) 2007-2013 Red Hat Inc.
+ * Copyright (C) 2007-2023 Red Hat Inc.
*/
#include "staprun.h"
+#include <string.h>
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#endif
+#define NDEBUG
+#include "gheap.h"
+
int out_fd[MAX_NR_CPUS];
int monitor_end = 0;
static pthread_t reader[MAX_NR_CPUS];
-static int relay_fd[MAX_NR_CPUS];
+static int relay_fd[MAX_NR_CPUS]; // fd to kernel per-cpu relayfs
static int avail_cpus[MAX_NR_CPUS];
-static int switch_file[MAX_NR_CPUS];
-static pthread_mutex_t mutex[MAX_NR_CPUS];
+static volatile sig_atomic_t sigusr2_count; // number of SIGUSR2's received by process
+static int sigusr2_processed[MAX_NR_CPUS]; // each thread's count of processed SIGUSR2's
static int bulkmode = 0;
-static volatile int stop_threads = 0;
+static volatile int stop_threads = 0; // set during relayfs_close to signal threads to die
static time_t *time_backlog[MAX_NR_CPUS];
static int backlog_order=0;
#define BACKLOG_MASK ((1 << backlog_order) - 1)
#define MONITORLINELENGTH 4096
-/* tracking message sequence #s for cross-cpu merging */
-static uint32_t last_sequence_number;
-static pthread_mutex_t last_sequence_number_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t last_sequence_number_changed = PTHREAD_COND_INITIALIZER;
-
#ifdef NEED_PPOLL
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *timeout, const sigset_t *sigmask)
@@ -123,18 +125,375 @@ static int switch_outfile(int cpu, int *fnum)
return 0;
}
+
+
+/* In serialized (non-bulk) output mode, ndividual messages that have
+ been received from the kernel per-cpu relays are stored in an central
+ serializing data structure - in this case, a heap. They are ordered
+ by message sequence number. An additional thread (serializer_thread)
+ scans & sequences the output. */
+struct serialized_message {
+ union {
+ struct _stp_trace bufhdr;
+ char bufhdr_raw[sizeof(struct _stp_trace)];
+ };
+ time_t received; // timestamp when this message was enqueued
+ char *buf; // malloc()'d size >= rounded(bufhdr.pdu_len)
+};
+static struct serialized_message* buffer_heap = NULL; // the heap
+
+// NB: we control memory via realloc(), gheap just manipulates entries in place
+static unsigned buffer_heap_size = 0; // used number of entries
+static unsigned buffer_heap_alloc = 0; // allocation length, always >= buffer_heap_size
+static unsigned last_sequence_number = 0; // last processed sequential message number
+
+#ifdef HAVE_STDATOMIC_H
+static atomic_ulong lost_message_count = 0; // how many sequence numbers we know we missed
+static atomic_ulong lost_byte_count = 0; // how many bytes were skipped during resync
+#else
+static unsigned long lost_message_count = 0; // how many sequence numbers we know we missed
+static unsigned long lost_byte_count = 0; // how many bytes were skipped during resync
+#endif
+
+// concurrency control for the buffer_heap
+static pthread_cond_t buffer_heap_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t buffer_heap_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t serializer_thread; // ! bulkmode only
+
+
+static void buffer_heap_mover (void *const dest, const void *const src)
+{
+ memmove (dest, src, sizeof(struct serialized_message));
+}
+
+// NB: since we want to sort messages into an INCREASING heap sequence,
+// we reverse the normal comparison operator. gheap_pop_heap() should
+// therefore return the SMALLEST element.
+static int buffer_heap_comparer (const void *const ctx, const void *a, const void *b)
+{
+ (void) ctx;
+ uint32_t aa = ((struct serialized_message *)a)->bufhdr.sequence;
+ uint32_t bb = ((struct serialized_message *)b)->bufhdr.sequence;
+ return (aa > bb);
+}
+
+static const struct gheap_ctx buffer_heap_ctx = {
+ .item_size = sizeof(struct serialized_message),
+ .less_comparer = buffer_heap_comparer,
+ .item_mover = buffer_heap_mover,
+ .page_chunks = 16, // arbitrary
+ .fanout = 2 // standard binary heap
+};
+
+
+#define MAX_MESSAGE_LENGTH (128*1024) /* maximum likely length of a single pdu */
+
+
+
+/* Thread that reads per-cpu messages, and stuffs complete ones into
+ dynamically allocated serialized_message nodes in a binary tree. */
+static void* reader_thread_serialmode (void *data)
+{
+ int rc, cpu = (int)(long)data;
+ struct pollfd pollfd;
+ sigset_t sigs;
+ cpu_set_t cpu_mask;
+
+ sigemptyset(&sigs);
+ sigaddset(&sigs,SIGUSR2);
+ pthread_sigmask(SIG_BLOCK, &sigs, NULL);
+
+ sigfillset(&sigs);
+ sigdelset(&sigs,SIGUSR2);
+
+ CPU_ZERO(&cpu_mask);
+ CPU_SET(cpu, &cpu_mask);
+ if( sched_setaffinity( 0, sizeof(cpu_mask), &cpu_mask ) < 0 )
+ _perr("sched_setaffinity");
+
+ pollfd.fd = relay_fd[cpu];
+ pollfd.events = POLLIN;
+
+ while (! stop_threads) {
+ // read a message header
+ struct serialized_message message;
+
+ /* 200ms, close to human level of "instant" */
+ struct timespec tim, *timeout = &tim;
+ timeout->tv_sec = reader_timeout_ms / 1000;
+ timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
+
+ rc = ppoll(&pollfd, 1, timeout, &sigs);
+ if (rc < 0) {
+ dbug(3, "cpu=%d poll=%d errno=%d\n", cpu, rc, errno);
+ if (errno == EINTR) {
+ if (stop_threads)
+ break;
+ } else {
+ _perr("poll error");
+ goto error_out;
+ }
+ }
+
+ // set the timestamp
+ message.received = time(NULL);
+
+ /* Read the header. */
+ rc = read(relay_fd[cpu], &message.bufhdr, sizeof(message.bufhdr));
+ if (rc <= 0) /* seen during normal shutdown or error */
+ continue;
+ if (rc != sizeof(message.bufhdr)) {
+ lost_byte_count += rc;
+ continue;
+ }
+
+ /* Validate the magic value. In case of mismatch,
+ keep on reading & shifting the header, one byte at
+ a time, until we get a match. */
+ while (! stop_threads && memcmp(message.bufhdr.magic, STAP_TRACE_MAGIC, 4)) {
+ lost_byte_count ++;
+ memmove(& message.bufhdr_raw[0],
+ & message.bufhdr_raw[1],
+ sizeof(message.bufhdr_raw)-1);
+ rc = read(relay_fd[cpu],
+ &message.bufhdr_raw[sizeof(message.bufhdr_raw)-1],
+ 1);
+ if (rc <= 0) /* seen during normal shutdown or error */
+ break;
+ }
+
+ /* Validate it slightly. Because of lost messages, we might be getting
+ not a proper _stp_trace struct but the interior of some piece of
+ trace text message. XXX: validate bufhdr.sequence a little bit too? */
+ if (message.bufhdr.pdu_len == 0 ||
+ message.bufhdr.pdu_len > MAX_MESSAGE_LENGTH) {
+ lost_byte_count += sizeof(message.bufhdr);
+ continue;
+ }
+
+ // Allocate the pdu body
+ message.buf = malloc(message.bufhdr.pdu_len);
+ if (message.buf == NULL)
+ {
+ lost_byte_count += message.bufhdr.pdu_len;
+ continue;
+ }
+
+ /* Read the message, perhaps in pieces (such as if crossing
+ * relayfs subbuf boundaries). */
+ size_t bufread = 0;
+ while (bufread < message.bufhdr.pdu_len) {
+ rc = read(relay_fd[cpu], message.buf+bufread, message.bufhdr.pdu_len-bufread);
+ if (rc <= 0) {
+ lost_byte_count += message.bufhdr.pdu_len-bufread;
+ break; /* still process it; hope we can resync at next packet. */
+ }
+ bufread += rc;
+ }
+
+ // plop the message into the buffer_heap
+ pthread_mutex_lock(& buffer_heap_mutex);
+ if (message.bufhdr.sequence < last_sequence_number) {
+ // whoa! is this some old message that we've assumed lost?
+ // or are we wrapping around the uint_32 sequence numbers?
+ _perr("unexpected sequence=%u", message.bufhdr.sequence);
+ }
+
+ // is it large enough? if not, realloc
+ if (buffer_heap_alloc - buffer_heap_size == 0) { // full
+ unsigned new_buffer_heap_alloc = (buffer_heap_alloc + 1) * 1.5;
+ struct serialized_message *new_buffer_heap =
+ realloc(buffer_heap,
+ new_buffer_heap_alloc * sizeof(struct serialized_message));
+ if (new_buffer_heap == NULL) {
+ _perr("out of memory while enlarging buffer heap");
+ free (message.buf);
+ lost_message_count ++;
+ pthread_mutex_unlock(& buffer_heap_mutex);
+ continue;
+ }
+ buffer_heap = new_buffer_heap;
+ buffer_heap_alloc = new_buffer_heap_alloc;
+ }
+ // plop copy of message struct into slot at end of heap
+ buffer_heap[buffer_heap_size++] = message;
+ // push it into heap
+ gheap_push_heap(&buffer_heap_ctx,
+ buffer_heap,
+ buffer_heap_size);
+ // and c'est tout
+ pthread_mutex_unlock(& buffer_heap_mutex);
+ pthread_cond_broadcast (& buffer_heap_cond);
+ dbug(3, "thread %d received seq=%u\n", cpu, message.bufhdr.sequence);
+ }
+
+ dbug(3, "exiting thread for cpu %d\n", cpu);
+ return NULL;
+
+error_out:
+ /* Signal the main thread that we need to quit */
+ kill(getpid(), SIGTERM);
+ dbug(2, "exiting thread for cpu %d after error\n", cpu);
+
+ return NULL;
+}
+
+
+// Print and free buffer of given serialized message.
+static void print_serialized_message (struct serialized_message *msg)
+{
+ // check if file switching is necessary, as per staprun -S
+
+ // NB: unlike reader_thread_bulkmode(), we don't need to use
+ // mutexes to protect switch_file[] or such, because we're the
+ // ONLY thread doing output.
+ unsigned cpu = 0; // arbitrary
+ static ssize_t wsize = 0; // how many bytes we've written into the serialized file so far
+ static int fnum = 0; // which file number we're using
+
+ if ((fsize_max && (wsize > fsize_max)) ||
+ (sigusr2_count > sigusr2_processed[cpu])) {
+ dbug(2, "switching output file wsize=%ld fsize_max=%ld sigusr2 %d > %d\n",
+ wsize, fsize_max, sigusr2_count, sigusr2_processed[cpu]);
+ sigusr2_processed[cpu] = sigusr2_count;
+ if (switch_outfile(cpu, &fnum) < 0) {
+ perr("unable to switch output file");
+ // but continue
+ }
+ wsize = 0;
+ }
+
+
+ // write loop ... could block if e.g. the output disk is slow
+ // or the user hits a ^S (XOFF) on the tty
+ ssize_t sent = 0;
+ do {
+ ssize_t ret = write (out_fd[avail_cpus[0]],
+ msg->buf+sent, msg->bufhdr.pdu_len-sent);
+ if (ret <= 0) {
+ perr("error writing output");
+ break;
+ }
+ sent += ret;
+ } while ((unsigned)sent < msg->bufhdr.pdu_len);
+ wsize += sent;
+
+ // free the associated buffer
+ free (msg->buf);
+ msg->buf = NULL;
+}
+
+
+/* Thread that checks on the heap of messages, and pumps them out to
+ the designated output fd in sequence. It waits, but only a little
+ while, if it has only fresher messages than it's expecting. It
+ exits upon a global stop_threads indication.
+*/
+static void* reader_thread_serializer (void *data) {
+ (void) data;
+ while (! stop_threads) {
+ /* timeout 0-1 seconds; this is the maximum extra time that
+ stapio will be waiting after a ^C */
+ struct timespec ts = {.tv_sec=time(NULL)+1, .tv_nsec=0};
+ int rc;
+ pthread_mutex_lock(& buffer_heap_mutex);
+ rc = pthread_cond_timedwait (& buffer_heap_cond,
+ & buffer_heap_mutex,
+ & ts);
+
+ dbug(3, "serializer cond wait rc=%d heapsize=%u\n", rc, buffer_heap_size);
+ time_t now = time(NULL);
+ unsigned processed = 0;
+ while (buffer_heap_size > 0) { // consume as much as possible
+ // check out the sequence# of the first element
+ uint32_t buf_min_seq = buffer_heap[0].bufhdr.sequence;
+
+ dbug(3, "serializer last=%u seq=%u\n", last_sequence_number, buf_min_seq);
+
+ if ((buf_min_seq == last_sequence_number + 1) || // expected seq#
+ (buffer_heap[0].received + 2 <= now)) { // message too old
+ // "we've got one!" -- or waited too long for one
+ // get it off the head of the heap
+ gheap_pop_heap(&buffer_heap_ctx,
+ buffer_heap,
+ buffer_heap_size);
+ buffer_heap_size --; // becomes index where the head was moved
+ processed ++;
+
+ // take a copy of the whole message
+ struct serialized_message msg = buffer_heap[buffer_heap_size];
+
+ // paranoid clear this field of the now-unused slot
+ buffer_heap[buffer_heap_size].buf = NULL;
+ // update statistics
+ if (attach_mod == 1 && last_sequence_number == 0) // first message after staprun -A
+ ; // do not penalize it with lost messages
+ else
+ lost_message_count += (buf_min_seq - last_sequence_number - 1);
+ last_sequence_number = buf_min_seq;
+
+ // unlock the mutex, permitting
+ // reader_thread_serialmode threads to
+ // resume piling messages into the
+ // heap while we print stuff
+ pthread_mutex_unlock(& buffer_heap_mutex);
+
+ print_serialized_message (& msg);
+
+ // must re-take lock for next iteration of the while loop
+ pthread_mutex_lock(& buffer_heap_mutex);
+ } else {
+ // processed as much of the heap as we
+ // could this time; wait for the
+ // condition again
+ break;
+ }
+ }
+ pthread_mutex_unlock(& buffer_heap_mutex);
+ if (processed > 0)
+ dbug(2, "serializer processed n=%u\n", processed);
+ }
+ return NULL;
+}
+
+
+
+// At the end of the program main loop, flush out any the remaining
+// messages and free up all that heapy data.
+static void reader_serialized_flush()
+{
+ dbug(3, "serializer flushing messages=%u\n", buffer_heap_size);
+ while (buffer_heap_size > 0) { // consume it all
+ // check out the sequence# of the first element
+ uint32_t buf_min_seq = buffer_heap[0].bufhdr.sequence;
+ dbug(3, "serializer seq=%u\n", buf_min_seq);
+ gheap_pop_heap(&buffer_heap_ctx,
+ buffer_heap,
+ buffer_heap_size);
+ buffer_heap_size --; // also index where the head was moved
+
+ // NB: no need for mutex manipulations, this is super single threaded
+ print_serialized_message (& buffer_heap[buffer_heap_size]);
+
+ lost_message_count += (buf_min_seq - last_sequence_number - 1);
+ last_sequence_number = buf_min_seq;
+ }
+ free (buffer_heap);
+ buffer_heap = NULL;
+}
+
+
+
/**
- * reader_thread - per-cpu channel buffer reader
+ * reader_thread - per-cpu channel buffer reader, bulkmode (one output file per cpu input file)
*/
-static void *reader_thread(void *data)
+static void *reader_thread_bulkmode (void *data)
{
- char buf[128*1024]; // NB: maximum possible output amount from a single probe hit's print_flush
+ char buf[MAX_MESSAGE_LENGTH];
struct _stp_trace bufhdr;
int rc, cpu = (int)(long)data;
struct pollfd pollfd;
- /* 200ms, close to human level of "instant" */
- struct timespec tim = {.tv_sec=0, .tv_nsec=200000000}, *timeout = &tim;
sigset_t sigs;
off_t wsize = 0;
int fnum = 0;
@@ -151,44 +510,30 @@ static void *reader_thread(void *data)
CPU_SET(cpu, &cpu_mask);
if( sched_setaffinity( 0, sizeof(cpu_mask), &cpu_mask ) < 0 )
_perr("sched_setaffinity");
-#ifdef NEED_PPOLL
- /* Without a real ppoll, there is a small race condition that could */
- /* block ppoll(). So use a timeout to prevent that. */
- timeout->tv_sec = 10;
- timeout->tv_nsec = 0;
-#else
- timeout = NULL;
-#endif
-
- if (reader_timeout_ms && timeout) {
- timeout->tv_sec = reader_timeout_ms / 1000;
- timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
- }
pollfd.fd = relay_fd[cpu];
pollfd.events = POLLIN;
do {
- dbug(3, "thread %d start ppoll\n", cpu);
+ /* 200ms, close to human level of "instant" */
+ struct timespec tim, *timeout = &tim;
+ timeout->tv_sec = reader_timeout_ms / 1000;
+ timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
+
rc = ppoll(&pollfd, 1, timeout, &sigs);
- dbug(3, "thread %d end ppoll:%d\n", cpu, rc);
if (rc < 0) {
dbug(3, "cpu=%d poll=%d errno=%d\n", cpu, rc, errno);
if (errno == EINTR) {
if (stop_threads)
break;
- pthread_mutex_lock(&mutex[cpu]);
- if (switch_file[cpu]) {
- if (switch_outfile(cpu, &fnum) < 0) {
- switch_file[cpu] = 0;
- pthread_mutex_unlock(&mutex[cpu]);
+ if (sigusr2_count > sigusr2_processed[cpu]) {
+ sigusr2_processed[cpu] = sigusr2_count;
+ if (switch_outfile(cpu, &fnum) < 0) {
goto error_out;
- }
- switch_file[cpu] = 0;
- wsize = 0;
+ }
+ wsize = 0;
}
- pthread_mutex_unlock(&mutex[cpu]);
} else {
_perr("poll error");
goto error_out;
@@ -197,7 +542,7 @@ static void *reader_thread(void *data)
/* Read the header. */
rc = read(relay_fd[cpu], &bufhdr, sizeof(bufhdr));
- if (rc == 0) /* seen during normal shutdown */
+ if (rc <= 0) /* seen during normal shutdown */
continue;
if (rc != sizeof(bufhdr)) {
_perr("bufhdr read error, attempting resync");
@@ -228,41 +573,20 @@ static void *reader_thread(void *data)
bufread += rc;
}
- if (! bulkmode) {
- /* Wait until the bufhdr.sequence number indicates it's OUR TURN to go ahead. */
- struct timespec ts = {.tv_sec=time(NULL)+2, .tv_nsec=0}; /* wait 1-2 seconds */
- pthread_mutex_lock(& last_sequence_number_mutex);
- while ((last_sequence_number+1 != bufhdr.sequence) && /* normal case */
- (last_sequence_number < bufhdr.sequence)) { /* we're late!!! */
- int rc = pthread_cond_timedwait (& last_sequence_number_changed,
- & last_sequence_number_mutex,
- & ts);
- if (rc == ETIMEDOUT) {
- /* _perr("message sequencing timeout"); */
- break;
- }
- }
- pthread_mutex_unlock(& last_sequence_number_mutex);
- }
-
int wbytes = rc;
char *wbuf = buf;
dbug(3, "cpu %d: read %d bytes of data\n", cpu, rc);
/* Switching file */
- pthread_mutex_lock(&mutex[cpu]);
if ((fsize_max && ((wsize + rc) > fsize_max)) ||
- switch_file[cpu]) {
+ (sigusr2_count > sigusr2_processed[cpu])) {
+ sigusr2_processed[cpu] = sigusr2_count;
if (switch_outfile(cpu, &fnum) < 0) {
- switch_file[cpu] = 0;
- pthread_mutex_unlock(&mutex[cpu]);
goto error_out;
}
- switch_file[cpu] = 0;
wsize = 0;
}
- pthread_mutex_unlock(&mutex[cpu]);
/* Copy loop. Must repeat write(2) in case of a pipe overflow
or other transient fullness. */
@@ -291,13 +615,8 @@ static void *reader_thread(void *data)
int fd;
/* Only bulkmode and fsize_max use per-cpu output files. Otherwise,
there's just a single output fd stored at out_fd[avail_cpus[0]]. */
- if (bulkmode || fsize_max)
- fd = out_fd[cpu];
- else
- fd = out_fd[avail_cpus[0]];
- rc = 0;
- if (bulkmode)
- rc = write(fd, &bufhdr, sizeof(bufhdr)); // write header
+ fd = out_fd[cpu];
+ rc = write(fd, &bufhdr, sizeof(bufhdr)); // write header
rc |= write(fd, wbuf, wbytes); // write payload
if (rc <= 0) {
perr("Couldn't write to output %d for cpu %d, exiting.",
@@ -310,14 +629,6 @@ static void *reader_thread(void *data)
}
}
- /* update the sequence number & let other cpus go ahead */
- pthread_mutex_lock(& last_sequence_number_mutex);
- if (last_sequence_number < bufhdr.sequence) { /* not if someone leapfrogged us */
- last_sequence_number = bufhdr.sequence;
- pthread_cond_broadcast (& last_sequence_number_changed);
- }
- pthread_mutex_unlock(& last_sequence_number_mutex);
-
} while (!stop_threads);
dbug(3, "exiting thread for cpu %d\n", cpu);
return(NULL);
@@ -329,41 +640,16 @@ error_out:
return(NULL);
}
+
static void switchfile_handler(int sig)
{
- int i;
+ (void) sig;
if (stop_threads || !outfile_name)
return;
-
- for (i = 0; i < ncpus; i++) {
- pthread_mutex_lock(&mutex[avail_cpus[i]]);
- if (reader[avail_cpus[i]] && switch_file[avail_cpus[i]]) {
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
- dbug(2, "file switching is progressing, signal ignored.\n", sig);
- return;
- }
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
- }
- for (i = 0; i < ncpus; i++) {
- pthread_mutex_lock(&mutex[avail_cpus[i]]);
- if (reader[avail_cpus[i]]) {
- switch_file[avail_cpus[i]] = 1;
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
-
- // Make sure we don't send the USR2 signal to
- // ourselves.
- if (pthread_equal(pthread_self(),
- reader[avail_cpus[i]]))
- break;
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
- }
- else {
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
- break;
- }
- }
+ sigusr2_count ++;
}
+
/**
* init_relayfs - create files and threads for relayfs processing
*
@@ -507,19 +793,20 @@ int init_relayfs(void)
sigaction(SIGUSR2, &sa, NULL);
dbug(2, "starting threads\n");
- for (i = 0; i < ncpus; i++) {
- if (pthread_mutex_init(&mutex[avail_cpus[i]], NULL) < 0) {
- _perr("failed to create mutex");
- return -1;
- }
- }
for (i = 0; i < ncpus; i++) {
- if (pthread_create(&reader[avail_cpus[i]], NULL, reader_thread,
+ if (pthread_create(&reader[avail_cpus[i]], NULL,
+ bulkmode ? reader_thread_bulkmode : reader_thread_serialmode,
(void *)(long)avail_cpus[i]) < 0) {
_perr("failed to create thread");
return -1;
}
}
+ if (! bulkmode)
+ if (pthread_create(&serializer_thread, NULL,
+ reader_thread_serializer, NULL) < 0) {
+ _perr("failed to create thread");
+ return -1;
+ }
return 0;
}
@@ -529,27 +816,31 @@ void close_relayfs(void)
int i;
stop_threads = 1;
dbug(2, "closing\n");
- for (i = 0; i < ncpus; i++) {
- if (reader[avail_cpus[i]])
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
- else
- break;
- }
+
for (i = 0; i < ncpus; i++) {
if (reader[avail_cpus[i]])
pthread_join(reader[avail_cpus[i]], NULL);
else
break;
}
+ if (! bulkmode) {
+ if (serializer_thread) // =0 on load_only!
+ pthread_join(serializer_thread, NULL);
+ // at this point, we know all reader and writer
+ // threads for the buffer_heap are dead.
+ reader_serialized_flush();
+
+ if (lost_message_count > 0 || lost_byte_count > 0)
+ eprintf("WARNING: There were %u lost messages and %u lost bytes.\n",
+ lost_message_count, lost_byte_count);
+ }
+
for (i = 0; i < ncpus; i++) {
if (relay_fd[avail_cpus[i]] >= 0)
close(relay_fd[avail_cpus[i]]);
else
break;
}
- for (i = 0; i < ncpus; i++) {
- pthread_mutex_destroy(&mutex[avail_cpus[i]]);
- }
dbug(2, "done\n");
}
@@ -558,12 +849,6 @@ void kill_relayfs(void)
int i;
stop_threads = 1;
dbug(2, "killing\n");
- for (i = 0; i < ncpus; i++) {
- if (reader[avail_cpus[i]])
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
- else
- break;
- }
for (i = 0; i < ncpus; i++) {
if (reader[avail_cpus[i]])
pthread_cancel(reader[avail_cpus[i]]); /* no wait */
@@ -576,8 +861,5 @@ void kill_relayfs(void)
else
break;
}
- for (i = 0; i < ncpus; i++) {
- pthread_mutex_destroy(&mutex[avail_cpus[i]]);
- }
dbug(2, "done\n");
}
diff --git a/staprun/stap_merge.c b/staprun/stap_merge.c
index 87de7d465..b210db663 100644
--- a/staprun/stap_merge.c
+++ b/staprun/stap_merge.c
@@ -76,6 +76,7 @@ int main (int argc, char *argv[])
fprintf(stderr, "error opening file %s.\n", argv[optind - 1]);
return -1;
}
+ (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
if (fread (buf, TIMESTAMP_SIZE, 1, fp[i]))
num[i] = *((int *)buf);
else
@@ -133,6 +134,7 @@ int main (int argc, char *argv[])
count = min;
}
+ (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
if (fread (buf, TIMESTAMP_SIZE, 1, fp[j]))
num[j] = *((int *)buf);
else
diff --git a/staprun/stap_merge.tcl b/staprun/stap_merge.tcl
deleted file mode 100755
index 0c7d7b694..000000000
--- a/staprun/stap_merge.tcl
+++ /dev/null
@@ -1,101 +0,0 @@
-#!/usr/bin/env tclsh
-#
-# stap_merge.tcl - systemtap merge program
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Copyright (C) Red Hat Inc, 2007
-#
-#
-
-proc usage {} {
- puts stderr "$::argv0 \[-v\] \[-o output_filename\] input_files ...\n"
- exit 1
-}
-
-set outfile "stdout"
-set verbose 0
-set index 0
-while {[string match -* [lindex $argv $index]]} {
- switch -glob -- [lindex $argv $index] {
- -v {set verbose 1}
- -o {incr index; set outfile [lindex $argv $index]}
- default {usage}
- }
- incr index
-}
-
-if {$tcl_platform(byteOrder) == "littleEndian"} {
- set int_format i
-} else {
- set int_format I
-}
-
-set files [lrange $argv $index end]
-
-set n 0
-foreach file $files {
- if {[catch {open $file} fd($n)]} {
- puts stderr $fd($n)
- exit 1
- }
- fconfigure $fd($n) -translation binary
- if {![binary scan [read $fd($n) 4] $int_format timestamp($n)]} {
- continue
- }
- set timestamp($n) [expr $timestamp($n) & 0xFFFFFFFF]
- incr n
-}
-set ncpus $n
-
-if {$outfile != "stdout"} {
- if {[catch {open $outfile w} outfile]} {
- puts stderr $outfile
- exit 1
- }
-}
-fconfigure $outfile -translation binary
-
-while {1} {
- set mincpu -1
- for {set n 0} {$n < $ncpus} {incr n} {
- if {[info exists fd($n)] && (![info exists min] || $timestamp($n) <= $min)} {
- set min $timestamp($n)
- set mincpu $n
- }
- }
-
- if {![info exists min]} {break}
-
- if {![binary scan [read $fd($mincpu) 4] $int_format len]} {
- puts stderr "Error reading length from channel $mincpu"
- exit 1
- }
-
- if {$verbose == 1} {
- puts stderr "\[CPU:$mincpu, seq=$min, length=$len\]"
- }
-
- set data [read $fd($mincpu) $len]
- puts -nonewline $outfile $data
-
- set data [read $fd($mincpu) 4]
- if {$data == ""} {
- unset fd($mincpu)
- } else {
- binary scan $data $int_format timestamp($mincpu)
- set timestamp($mincpu) [expr $timestamp($mincpu) & 0xFFFFFFFF]
- }
- unset min
-}
diff --git a/staprun/staprun.8 b/staprun/staprun.8
index 3bc16ab95..4e1ca9af6 100644
--- a/staprun/staprun.8
+++ b/staprun/staprun.8
@@ -120,7 +120,7 @@ remote_id() and remote_uri().
Sets the maximum size of output file and the maximum number of output files.
If the size of output file will exceed
.B size
-, systemtap switches output file to the next file. And if the number of
+megabytes, systemtap switches output file to the next file. And if the number of
output files exceed
.B N
, systemtap removes the oldest output file. You can omit the second argument.
commit 2442beb99eeab3144c2622cae1fc98b999f72108
gpg: Signature made Mon 14 Aug 2023 01:55:27 PM EDT
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
Author: Frank Ch. Eigler <fche@redhat.com>
Date: Mon Aug 14 13:54:50 2023 -0400
PR29108 / BZ2095359 tweak: stap_merge magic handling
We don't bother do much error checking in this infrequently used
tool, but gcc warnings require us to do some.
diff --git a/staprun/stap_merge.c b/staprun/stap_merge.c
index b210db663..388b14938 100644
--- a/staprun/stap_merge.c
+++ b/staprun/stap_merge.c
@@ -76,7 +76,8 @@ int main (int argc, char *argv[])
fprintf(stderr, "error opening file %s.\n", argv[optind - 1]);
return -1;
}
- (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
+ if (fread(buf, 4, 1, fp[i]) != 1) // read magic word
+ fprintf(stderr, "warning: erro reading magic word\n");
if (fread (buf, TIMESTAMP_SIZE, 1, fp[i]))
num[i] = *((int *)buf);
else
@@ -134,7 +135,8 @@ int main (int argc, char *argv[])
count = min;
}
- (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
+ if (fread(buf, 4, 1, fp[i]) != 1) // read magic word
+ fprintf(stderr, "warning: erro reading magic word\n");
if (fread (buf, TIMESTAMP_SIZE, 1, fp[j]))
num[j] = *((int *)buf);
else