1846 lines
65 KiB
Diff
1846 lines
65 KiB
Diff
commit bf95ad72c984c9e68d12707c4d34dbe6bc1f89f2
|
|
gpg: Signature made Sat 12 Aug 2023 02:49:06 PM EDT
|
|
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
|
|
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
|
|
Author: Aliaksandr Valialkin <valyala@gmail.com>
|
|
Date: Thu Jul 27 18:52:37 2023 -0400
|
|
|
|
runtime/staprun: import gheap routines
|
|
|
|
BSD-2-Clause gift from the Aliaksandr Valialkin:
|
|
https://github.com/valyala/gheap
|
|
|
|
diff --git a/staprun/gheap.h b/staprun/gheap.h
|
|
new file mode 100644
|
|
index 000000000..4af4b29ed
|
|
--- /dev/null
|
|
+++ b/staprun/gheap.h
|
|
@@ -0,0 +1,561 @@
|
|
+#ifndef GHEAP_H
|
|
+#define GHEAP_H
|
|
+
|
|
+/*
|
|
+ * Generalized heap implementation for C99.
|
|
+ *
|
|
+ * Don't forget passing -DNDEBUG option to the compiler when creating optimized
|
|
+ * builds. This significantly speeds up gheap code by removing debug assertions.
|
|
+ *
|
|
+ * Author: Aliaksandr Valialkin <valyala@gmail.com>.
|
|
+ */
|
|
+/*
|
|
+Copyright (c) 2011 Aliaksandr Valialkin <valyala@gmail.com>
|
|
+All rights reserved.
|
|
+
|
|
+Redistribution and use in source and binary forms, with or without
|
|
+modification, are permitted provided that the following conditions
|
|
+are met:
|
|
+1. Redistributions of source code must retain the above copyright
|
|
+ notice, this list of conditions and the following disclaimer.
|
|
+2. Redistributions in binary form must reproduce the above copyright
|
|
+ notice, this list of conditions and the following disclaimer in the
|
|
+ documentation and/or other materials provided with the distribution.
|
|
+
|
|
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
+ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
+SUCH DAMAGE.
|
|
+*/
|
|
+
|
|
+
|
|
+
|
|
+/*******************************************************************************
|
|
+ * Interface.
|
|
+ ******************************************************************************/
|
|
+
|
|
+#include <stddef.h> /* for size_t */
|
|
+#include <stdint.h> /* for SIZE_MAX */
|
|
+
|
|
+/*
|
|
+ * Less comparer must return non-zero value if a < b.
|
|
+ * ctx is the gheap_ctx->less_comparer_ctx.
|
|
+ * Otherwise it must return 0.
|
|
+ */
|
|
+typedef int (*gheap_less_comparer_t)(const void *ctx, const void *a,
|
|
+ const void *b);
|
|
+
|
|
+/*
|
|
+ * Moves the item from src to dst.
|
|
+ */
|
|
+typedef void (*gheap_item_mover_t)(void *dst, const void *src);
|
|
+
|
|
+/*
|
|
+ * Gheap context.
|
|
+ * This context must be passed to every gheap function.
|
|
+ */
|
|
+struct gheap_ctx
|
|
+{
|
|
+ /*
|
|
+ * How much children each heap item can have.
|
|
+ */
|
|
+ size_t fanout;
|
|
+
|
|
+ /*
|
|
+ * A chunk is a tuple containing fanout items arranged sequentially in memory.
|
|
+ * A page is a subheap containing page_chunks chunks arranged sequentially
|
|
+ * in memory.
|
|
+ * The number of chunks in a page is an arbitrary integer greater than 0.
|
|
+ */
|
|
+ size_t page_chunks;
|
|
+
|
|
+ /*
|
|
+ * The size of each item in bytes.
|
|
+ */
|
|
+ size_t item_size;
|
|
+
|
|
+ gheap_less_comparer_t less_comparer;
|
|
+ const void *less_comparer_ctx;
|
|
+
|
|
+ gheap_item_mover_t item_mover;
|
|
+};
|
|
+
|
|
+/*
|
|
+ * Returns parent index for the given child index.
|
|
+ * Child index must be greater than 0.
|
|
+ * Returns 0 if the parent is root.
|
|
+ */
|
|
+static inline size_t gheap_get_parent_index(const struct gheap_ctx *ctx,
|
|
+ size_t u);
|
|
+
|
|
+/*
|
|
+ * Returns the index of the first child for the given parent index.
|
|
+ * Parent index must be less than SIZE_MAX.
|
|
+ * Returns SIZE_MAX if the index of the first child for the given parent
|
|
+ * cannot fit size_t.
|
|
+ */
|
|
+static inline size_t gheap_get_child_index(const struct gheap_ctx *ctx,
|
|
+ size_t u);
|
|
+
|
|
+/*
|
|
+ * Returns a pointer to the first non-heap item using less_comparer
|
|
+ * for items' comparison.
|
|
+ * Returns the index of the first non-heap item.
|
|
+ * Returns heap_size if base points to valid max heap with the given size.
|
|
+ */
|
|
+static inline size_t gheap_is_heap_until(const struct gheap_ctx *ctx,
|
|
+ const void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Returns non-zero if base points to valid max heap. Returns zero otherwise.
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline int gheap_is_heap(const struct gheap_ctx *ctx,
|
|
+ const void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Makes max heap from items base[0] ... base[heap_size-1].
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline void gheap_make_heap(const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Pushes the item base[heap_size-1] into max heap base[0] ... base[heap_size-2]
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline void gheap_push_heap(const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Pops the maximum item from max heap base[0] ... base[heap_size-1] into
|
|
+ * base[heap_size-1].
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline void gheap_pop_heap(const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Sorts items in place of max heap in ascending order.
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline void gheap_sort_heap(const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size);
|
|
+
|
|
+/*
|
|
+ * Swaps the item outside the heap with the maximum item inside
|
|
+ * the heap and restores heap invariant.
|
|
+ */
|
|
+static inline void gheap_swap_max_item(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, void *item);
|
|
+
|
|
+/*
|
|
+ * Restores max heap invariant after item's value has been increased,
|
|
+ * i.e. less_comparer(old_item, new_item) != 0.
|
|
+ */
|
|
+static inline void gheap_restore_heap_after_item_increase(
|
|
+ const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size, size_t modified_item_index);
|
|
+
|
|
+/*
|
|
+ * Restores max heap invariant after item's value has been decreased,
|
|
+ * i.e. less_comparer(new_item, old_item) != 0.
|
|
+ */
|
|
+static inline void gheap_restore_heap_after_item_decrease(
|
|
+ const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size, size_t modified_item_index);
|
|
+
|
|
+/*
|
|
+ * Removes the given item from the heap and puts it into base[heap_size-1].
|
|
+ * Uses less_comparer for items' comparison.
|
|
+ */
|
|
+static inline void gheap_remove_from_heap(const struct gheap_ctx *ctx,
|
|
+ void *base, size_t heap_size, size_t item_index);
|
|
+
|
|
+/*******************************************************************************
|
|
+ * Implementation.
|
|
+ *
|
|
+ * Define all functions inline, so compiler will be able optimizing out common
|
|
+ * args (fanout, page_chunks, item_size, less_comparer and item_mover),
|
|
+ * which are usually constants, using contant folding optimization
|
|
+ * ( http://en.wikipedia.org/wiki/Constant_folding ).
|
|
+ *****************************************************************************/
|
|
+
|
|
+#include <assert.h> /* for assert */
|
|
+#include <stddef.h> /* for size_t */
|
|
+#include <stdint.h> /* for uintptr_t, SIZE_MAX and UINTPTR_MAX */
|
|
+
|
|
+static inline size_t gheap_get_parent_index(const struct gheap_ctx *const ctx,
|
|
+ size_t u)
|
|
+{
|
|
+ assert(u > 0);
|
|
+
|
|
+ const size_t fanout = ctx->fanout;
|
|
+ const size_t page_chunks = ctx->page_chunks;
|
|
+
|
|
+ --u;
|
|
+ if (page_chunks == 1) {
|
|
+ return u / fanout;
|
|
+ }
|
|
+
|
|
+ if (u < fanout) {
|
|
+ /* Parent is root. */
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
+ assert(page_chunks <= SIZE_MAX / fanout);
|
|
+ const size_t page_size = fanout * page_chunks;
|
|
+ size_t v = u % page_size;
|
|
+ if (v >= fanout) {
|
|
+ /* Fast path. Parent is on the same page as the child. */
|
|
+ return u - v + v / fanout;
|
|
+ }
|
|
+
|
|
+ /* Slow path. Parent is on another page. */
|
|
+ v = u / page_size - 1;
|
|
+ const size_t page_leaves = (fanout - 1) * page_chunks + 1;
|
|
+ u = v / page_leaves + 1;
|
|
+ return u * page_size + v % page_leaves - page_leaves + 1;
|
|
+}
|
|
+
|
|
+static inline size_t gheap_get_child_index(const struct gheap_ctx *const ctx,
|
|
+ size_t u)
|
|
+{
|
|
+ assert(u < SIZE_MAX);
|
|
+
|
|
+ const size_t fanout = ctx->fanout;
|
|
+ const size_t page_chunks = ctx->page_chunks;
|
|
+
|
|
+ if (page_chunks == 1) {
|
|
+ if (u > (SIZE_MAX - 1) / fanout) {
|
|
+ /* Child overflow. */
|
|
+ return SIZE_MAX;
|
|
+ }
|
|
+ return u * fanout + 1;
|
|
+ }
|
|
+
|
|
+ if (u == 0) {
|
|
+ /* Root's child is always 1. */
|
|
+ return 1;
|
|
+ }
|
|
+
|
|
+ assert(page_chunks <= SIZE_MAX / fanout);
|
|
+ const size_t page_size = fanout * page_chunks;
|
|
+ --u;
|
|
+ size_t v = u % page_size + 1;
|
|
+ if (v < page_size / fanout) {
|
|
+ /* Fast path. Child is on the same page as the parent. */
|
|
+ v *= fanout - 1;
|
|
+ if (u > SIZE_MAX - 2 - v) {
|
|
+ /* Child overflow. */
|
|
+ return SIZE_MAX;
|
|
+ }
|
|
+ return u + v + 2;
|
|
+ }
|
|
+
|
|
+ /* Slow path. Child is on another page. */
|
|
+ const size_t page_leaves = (fanout - 1) * page_chunks + 1;
|
|
+ v += (u / page_size + 1) * page_leaves - page_size;
|
|
+ if (v > (SIZE_MAX - 1) / page_size) {
|
|
+ /* Child overflow. */
|
|
+ return SIZE_MAX;
|
|
+ }
|
|
+ return v * page_size + 1;
|
|
+}
|
|
+
|
|
+/* Returns a pointer to base[index]. */
|
|
+static inline void *_gheap_get_item_ptr(const struct gheap_ctx *const ctx,
|
|
+ const void *const base, const size_t index)
|
|
+{
|
|
+ const size_t item_size = ctx->item_size;
|
|
+
|
|
+ assert(index <= SIZE_MAX / item_size);
|
|
+
|
|
+ const size_t offset = item_size * index;
|
|
+ assert((uintptr_t)base <= UINTPTR_MAX - offset);
|
|
+
|
|
+ return ((char *)base) + offset;
|
|
+}
|
|
+
|
|
+/*
|
|
+ * Sifts the item up in the given sub-heap with the given root_index
|
|
+ * starting from the hole_index.
|
|
+ */
|
|
+static inline void _gheap_sift_up(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t root_index, size_t hole_index,
|
|
+ const void *const item)
|
|
+{
|
|
+ assert(hole_index >= root_index);
|
|
+
|
|
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
|
|
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ while (hole_index > root_index) {
|
|
+ const size_t parent_index = gheap_get_parent_index(ctx, hole_index);
|
|
+ assert(parent_index >= root_index);
|
|
+ const void *const parent = _gheap_get_item_ptr(ctx, base, parent_index);
|
|
+ if (!less_comparer(less_comparer_ctx, parent, item)) {
|
|
+ break;
|
|
+ }
|
|
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index),
|
|
+ parent);
|
|
+ hole_index = parent_index;
|
|
+ }
|
|
+
|
|
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index), item);
|
|
+}
|
|
+
|
|
+/*
|
|
+ * Moves the max child into the given hole and returns index
|
|
+ * of the new hole.
|
|
+ */
|
|
+static inline size_t _gheap_move_up_max_child(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t children_count,
|
|
+ const size_t hole_index, const size_t child_index)
|
|
+{
|
|
+ assert(children_count > 0);
|
|
+ assert(children_count <= ctx->fanout);
|
|
+ assert(child_index == gheap_get_child_index(ctx, hole_index));
|
|
+
|
|
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
|
|
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ size_t max_child_index = child_index;
|
|
+ for (size_t i = 1; i < children_count; ++i) {
|
|
+ if (!less_comparer(less_comparer_ctx,
|
|
+ _gheap_get_item_ptr(ctx, base, child_index + i),
|
|
+ _gheap_get_item_ptr(ctx, base, max_child_index))) {
|
|
+ max_child_index = child_index + i;
|
|
+ }
|
|
+ }
|
|
+ item_mover(_gheap_get_item_ptr(ctx, base, hole_index),
|
|
+ _gheap_get_item_ptr(ctx, base, max_child_index));
|
|
+ return max_child_index;
|
|
+}
|
|
+
|
|
+/*
|
|
+ * Sifts the given item down in the heap of the given size starting
|
|
+ * from the hole_index.
|
|
+ */
|
|
+static inline void _gheap_sift_down(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, size_t hole_index,
|
|
+ const void *const item)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(hole_index < heap_size);
|
|
+
|
|
+ const size_t fanout = ctx->fanout;
|
|
+
|
|
+ const size_t root_index = hole_index;
|
|
+ const size_t last_full_index = heap_size - (heap_size - 1) % fanout;
|
|
+ while (1) {
|
|
+ const size_t child_index = gheap_get_child_index(ctx, hole_index);
|
|
+ if (child_index >= last_full_index) {
|
|
+ if (child_index < heap_size) {
|
|
+ assert(child_index == last_full_index);
|
|
+ hole_index = _gheap_move_up_max_child(ctx, base,
|
|
+ heap_size - child_index, hole_index, child_index);
|
|
+ }
|
|
+ break;
|
|
+ }
|
|
+ assert(heap_size - child_index >= fanout);
|
|
+ hole_index = _gheap_move_up_max_child(ctx, base, fanout, hole_index,
|
|
+ child_index);
|
|
+ }
|
|
+ _gheap_sift_up(ctx, base, root_index, hole_index, item);
|
|
+}
|
|
+
|
|
+/*
|
|
+ * Pops the maximum item from the heap [base[0] ... base[heap_size-1]]
|
|
+ * into base[heap_size].
|
|
+ */
|
|
+static inline void _gheap_pop_max_item(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size)
|
|
+{
|
|
+ void *const hole = _gheap_get_item_ptr(ctx, base, heap_size);
|
|
+ gheap_swap_max_item(ctx, base, heap_size, hole);
|
|
+}
|
|
+
|
|
+static inline size_t gheap_is_heap_until(const struct gheap_ctx *const ctx,
|
|
+ const void *const base, const size_t heap_size)
|
|
+{
|
|
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
|
|
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
|
|
+
|
|
+ for (size_t u = 1; u < heap_size; ++u) {
|
|
+ const size_t v = gheap_get_parent_index(ctx, u);
|
|
+ const void *const a = _gheap_get_item_ptr(ctx, base, v);
|
|
+ const void *const b = _gheap_get_item_ptr(ctx, base, u);
|
|
+ if (less_comparer(less_comparer_ctx, a, b)) {
|
|
+ return u;
|
|
+ }
|
|
+ }
|
|
+ return heap_size;
|
|
+}
|
|
+
|
|
+static inline int gheap_is_heap(const struct gheap_ctx *const ctx,
|
|
+ const void *const base, const size_t heap_size)
|
|
+{
|
|
+ return (gheap_is_heap_until(ctx, base, heap_size) == heap_size);
|
|
+}
|
|
+
|
|
+static inline void gheap_make_heap(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size)
|
|
+{
|
|
+ const size_t fanout = ctx->fanout;
|
|
+ const size_t page_chunks = ctx->page_chunks;
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ if (heap_size > 1) {
|
|
+ /* Skip leaf nodes without children. This is easy to do for non-paged heap,
|
|
+ * i.e. when page_chunks = 1, but it is difficult for paged heaps.
|
|
+ * So leaf nodes in paged heaps are visited anyway.
|
|
+ */
|
|
+ size_t i = (page_chunks == 1) ? ((heap_size - 2) / fanout) :
|
|
+ (heap_size - 2);
|
|
+ do {
|
|
+ char tmp[item_size];
|
|
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, i));
|
|
+ _gheap_sift_down(ctx, base, heap_size, i, tmp);
|
|
+ } while (i-- > 0);
|
|
+ }
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+}
|
|
+
|
|
+static inline void gheap_push_heap(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(gheap_is_heap(ctx, base, heap_size - 1));
|
|
+
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ if (heap_size > 1) {
|
|
+ const size_t u = heap_size - 1;
|
|
+ char tmp[item_size];
|
|
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, u));
|
|
+ _gheap_sift_up(ctx, base, 0, u, tmp);
|
|
+ }
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+}
|
|
+
|
|
+static inline void gheap_pop_heap(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+
|
|
+ if (heap_size > 1) {
|
|
+ _gheap_pop_max_item(ctx, base, heap_size - 1);
|
|
+ }
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size - 1));
|
|
+}
|
|
+
|
|
+static inline void gheap_sort_heap(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size)
|
|
+{
|
|
+ for (size_t i = heap_size; i > 1; --i) {
|
|
+ _gheap_pop_max_item(ctx, base, i - 1);
|
|
+ }
|
|
+}
|
|
+
|
|
+static inline void gheap_swap_max_item(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, void *item)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ char tmp[item_size];
|
|
+ item_mover(tmp, item);
|
|
+ item_mover(item, base);
|
|
+ _gheap_sift_down(ctx, base, heap_size, 0, tmp);
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+}
|
|
+
|
|
+static inline void gheap_restore_heap_after_item_increase(
|
|
+ const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, size_t modified_item_index)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(modified_item_index < heap_size);
|
|
+ assert(gheap_is_heap(ctx, base, modified_item_index));
|
|
+
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ if (modified_item_index > 0) {
|
|
+ char tmp[item_size];
|
|
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, modified_item_index));
|
|
+ _gheap_sift_up(ctx, base, 0, modified_item_index, tmp);
|
|
+ }
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+ (void)heap_size;
|
|
+}
|
|
+
|
|
+static inline void gheap_restore_heap_after_item_decrease(
|
|
+ const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, size_t modified_item_index)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(modified_item_index < heap_size);
|
|
+ assert(gheap_is_heap(ctx, base, modified_item_index));
|
|
+
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ char tmp[item_size];
|
|
+ item_mover(tmp, _gheap_get_item_ptr(ctx, base, modified_item_index));
|
|
+ _gheap_sift_down(ctx, base, heap_size, modified_item_index, tmp);
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+}
|
|
+
|
|
+static inline void gheap_remove_from_heap(const struct gheap_ctx *const ctx,
|
|
+ void *const base, const size_t heap_size, size_t item_index)
|
|
+{
|
|
+ assert(heap_size > 0);
|
|
+ assert(item_index < heap_size);
|
|
+ assert(gheap_is_heap(ctx, base, heap_size));
|
|
+
|
|
+ const size_t item_size = ctx->item_size;
|
|
+ const gheap_less_comparer_t less_comparer = ctx->less_comparer;
|
|
+ const void *const less_comparer_ctx = ctx->less_comparer_ctx;
|
|
+ const gheap_item_mover_t item_mover = ctx->item_mover;
|
|
+
|
|
+ const size_t new_heap_size = heap_size - 1;
|
|
+ if (item_index < new_heap_size) {
|
|
+ char tmp[item_size];
|
|
+ void *const hole = _gheap_get_item_ptr(ctx, base, new_heap_size);
|
|
+ item_mover(tmp, hole);
|
|
+ item_mover(hole, _gheap_get_item_ptr(ctx, base, item_index));
|
|
+ if (less_comparer(less_comparer_ctx, tmp, hole)) {
|
|
+ _gheap_sift_down(ctx, base, new_heap_size, item_index, tmp);
|
|
+ }
|
|
+ else {
|
|
+ _gheap_sift_up(ctx, base, 0, item_index, tmp);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ assert(gheap_is_heap(ctx, base, new_heap_size));
|
|
+}
|
|
+
|
|
+#endif
|
|
|
|
commit 5b39471380a238469c8fc18136f12600e5e9aec7
|
|
gpg: Signature made Sat 12 Aug 2023 02:49:21 PM EDT
|
|
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
|
|
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
|
|
Author: Frank Ch. Eigler <fche@redhat.com>
|
|
Date: Mon Jul 31 14:06:57 2023 -0400
|
|
|
|
PR29108 / BZ2095359: rewrite staprun serializer logic
|
|
|
|
Logic in commit cd48874296e00 (2021, PR28449) fixed broken cross-cpu
|
|
message ordering that followed previous transport concurrency fixes,
|
|
but imposed a lot of userspace synchronization delays upon the threads
|
|
who were supposed to drain messages from the kernel relayfs streams as
|
|
fast as possible. This has led to unnecessarily lossy output overall.
|
|
|
|
New code uses a new many-writers single-reader data structure, a mutex
|
|
protected heap. All the per-cpu readers copy & pump messages into
|
|
that heap as rapidly as possible, sorted by the generally monotonic
|
|
sequence number. The reader is signalled via a condition variable and
|
|
time to print & release messages in sequence number order. It also
|
|
handles lost messages (jumps in the sequence numbers) by waiting a while
|
|
to let the stragglers come in.
|
|
|
|
The kernel-user messages now also include a framing sequence to allow
|
|
the per-cpu readers to resynchronize to the message boundaries, in
|
|
case some sort of buffer overflow or something else occurs. It
|
|
reports how many bytes and/or messages were skipped in order to
|
|
resynchronize. It does so in a lot less lossy way than previous code,
|
|
which just tried to flush everything then-currently available, hoping
|
|
that it'd match message boundaries.
|
|
|
|
Unfortunately, this means that the user-kernel message ABI has
|
|
changed! Previous-version staprun instances won't work with the new
|
|
modules, nor will current-version staprun with old modules. This flag
|
|
day is enforced by changing the numbers of the various ctl message
|
|
numbers, so old/new kernel/user combinations will generate errors
|
|
rather than quasi-successful staprun startup.
|
|
|
|
New code also dramatically simplifies the use of signals in staprun
|
|
(or rather stapio). Gone is the signal thread, a lot of the
|
|
masking/blocking/waiting. Instead a single basic signal handler just
|
|
increments globals when signals of various kinds arrive, and all the
|
|
per-cpu etc. threads poll those globals periodically. This includes
|
|
logic needed for -S (output file rotation on SIGUSR2) as well as
|
|
flight recorder (-L / -A) modes.
|
|
|
|
The reader_timeout_ms value (-T) in both bulk/serialized mode for all
|
|
ppoll timeouts, to prevent those threads from sleeping indefinitely,
|
|
now that they won't be bothered by signals.
|
|
|
|
diff --git a/configure b/configure
|
|
index 974cc2c81..1ff5580b4 100755
|
|
--- a/configure
|
|
+++ b/configure
|
|
@@ -12694,6 +12694,14 @@ printf "%s\n" "$as_me: WARNING: cannot find librpmio" >&2;}
|
|
fi
|
|
fi
|
|
|
|
+ac_fn_c_check_header_compile "$LINENO" "stdatomic.h" "ac_cv_header_stdatomic_h" "$ac_includes_default"
|
|
+if test "x$ac_cv_header_stdatomic_h" = xyes
|
|
+then :
|
|
+ printf "%s\n" "#define HAVE_STDATOMIC_H 1" >>confdefs.h
|
|
+
|
|
+fi
|
|
+
|
|
+
|
|
for ac_header in rpm/rpmcrypto.h
|
|
do :
|
|
ac_fn_c_check_header_compile "$LINENO" "rpm/rpmcrypto.h" "ac_cv_header_rpm_rpmcrypto_h" "$ac_includes_default"
|
|
diff --git a/configure.ac b/configure.ac
|
|
index 3f184f862..e9176b725 100644
|
|
--- a/configure.ac
|
|
+++ b/configure.ac
|
|
@@ -490,6 +490,8 @@ if test "$with_rpm" != "no"; then
|
|
fi
|
|
fi
|
|
|
|
+AC_CHECK_HEADERS([stdatomic.h])
|
|
+
|
|
dnl Look for rpmcrypto.h
|
|
AC_CHECK_HEADERS([rpm/rpmcrypto.h], [
|
|
AC_DEFINE([HAVE_RPMCRYPTO_H],[1],[have rpmcrypto_h])
|
|
diff --git a/man/stap.1.in b/man/stap.1.in
|
|
index 4e1f0a537..c1a81fef3 100644
|
|
--- a/man/stap.1.in
|
|
+++ b/man/stap.1.in
|
|
@@ -388,7 +388,7 @@ With \-o option, run staprun in background as a daemon and show its pid.
|
|
Sets the maximum size of output file and the maximum number of output files.
|
|
If the size of output file will exceed
|
|
.B size
|
|
-, systemtap switches output file to the next file. And if the number of
|
|
+megabytes, systemtap switches output file to the next file. And if the number of
|
|
output files exceed
|
|
.B N
|
|
, systemtap removes the oldest output file. You can omit the second argument.
|
|
diff --git a/runtime/print_flush.c b/runtime/print_flush.c
|
|
index 35677b225..4141f95b9 100644
|
|
--- a/runtime/print_flush.c
|
|
+++ b/runtime/print_flush.c
|
|
@@ -43,6 +43,7 @@ static void __stp_print_flush(struct _stp_log *log)
|
|
if (likely(entry && bytes_reserved > hlen)) {
|
|
/* copy new _stp_trace_ header */
|
|
struct _stp_trace t = {
|
|
+ .magic = STAP_TRACE_MAGIC,
|
|
.sequence = _stp_seq_inc(),
|
|
.pdu_len = len
|
|
};
|
|
diff --git a/runtime/transport/control.c b/runtime/transport/control.c
|
|
index 3d7333403..d0a8bdf53 100644
|
|
--- a/runtime/transport/control.c
|
|
+++ b/runtime/transport/control.c
|
|
@@ -57,7 +57,7 @@ static ssize_t _stp_ctl_write_cmd(struct file *file, const char __user *buf, siz
|
|
|
|
#if defined(DEBUG_TRANS) && (DEBUG_TRANS >= 2)
|
|
if (type < STP_MAX_CMD)
|
|
- dbug_trans2("Got %s. euid=%ld, len=%d\n", _stp_command_name[type],
|
|
+ dbug_trans2("Got %s. euid=%ld, len=%d\n", _stp_command_name[min(type,STP_MAX_CMD)] ?: "?",
|
|
(long)euid, (int)count);
|
|
#endif
|
|
|
|
@@ -211,7 +211,9 @@ out:
|
|
|
|
#if defined(DEBUG_TRANS) && (DEBUG_TRANS >= 2)
|
|
if (type < STP_MAX_CMD)
|
|
- dbug_trans2("Completed %s (rc=%d)\n", _stp_command_name[type], rc);
|
|
+ dbug_trans2("Completed %s (rc=%d)\n",
|
|
+ _stp_command_name[min(type,STP_MAX_CMD)] ?: "?",
|
|
+ rc);
|
|
#endif
|
|
return rc;
|
|
}
|
|
diff --git a/runtime/transport/transport_msgs.h b/runtime/transport/transport_msgs.h
|
|
index 9e0081c80..e3aa995b1 100644
|
|
--- a/runtime/transport/transport_msgs.h
|
|
+++ b/runtime/transport/transport_msgs.h
|
|
@@ -1,7 +1,7 @@
|
|
/* -*- linux-c -*-
|
|
* transport_msgs.h - messages exchanged between module and userspace
|
|
*
|
|
- * Copyright (C) Red Hat Inc, 2006-2011
|
|
+ * Copyright (C) Red Hat Inc, 2006-2023
|
|
*
|
|
* This file is part of systemtap, and is free software. You can
|
|
* redistribute it and/or modify it under the terms of the GNU General
|
|
@@ -19,7 +19,9 @@
|
|
#define STP_TZ_NAME_LEN 64
|
|
#define STP_REMOTE_URI_LEN 128
|
|
|
|
+#define STAP_TRACE_MAGIC "\xF0\x9F\xA9\xBA" /* unicode stethoscope 🩺 in UTF-8 */
|
|
struct _stp_trace {
|
|
+ char magic[4]; /* framing helper */
|
|
uint32_t sequence; /* event number */
|
|
uint32_t pdu_len; /* length of data after this trace */
|
|
};
|
|
@@ -30,7 +32,7 @@ enum
|
|
/** stapio sends a STP_START after recieving a STP_TRANSPORT from
|
|
the module. The module sends STP_START back with result of call
|
|
systemtap_module_init() which will install all initial probes. */
|
|
- STP_START,
|
|
+ STP_START = 0x50, // renumbered in version 5.0 to force incompatibility
|
|
/** stapio sends STP_EXIT to signal it wants to stop the module
|
|
itself or in response to receiving a STP_REQUEST_EXIT.
|
|
The module sends STP_EXIT once _stp_clean_and_exit has been
|
|
@@ -87,16 +89,21 @@ enum
|
|
/** Send by staprun to notify module of remote identity, if any.
|
|
Only send once at startup. */
|
|
STP_REMOTE_ID,
|
|
+ /** Placeholder, it was mistakenly labeled STP_MAX_CMD */
|
|
+ STP_MAX_CMD_PLACEHOLDER,
|
|
+ /** Sent by stapio after having recevied STP_TRANSPORT. Notifies
|
|
+ the module of the target namespaces pid.*/
|
|
+ STP_NAMESPACES_PID,
|
|
+
|
|
+ /** INSERT NEW MESSAGE TYPES HERE */
|
|
+
|
|
/** Max number of message types, sanity check only. */
|
|
STP_MAX_CMD,
|
|
- /** Sent by stapio after having recevied STP_TRANSPORT. Notifies
|
|
- the module of the target namespaces pid.*/
|
|
- STP_NAMESPACES_PID
|
|
};
|
|
|
|
#ifdef DEBUG_TRANS
|
|
-static const char *_stp_command_name[] = {
|
|
- "STP_START",
|
|
+static const char *_stp_command_name[STP_MAX_CMD] = {
|
|
+ [STP_START]="STP_START",
|
|
"STP_EXIT",
|
|
"STP_OOB_DATA",
|
|
"STP_SYSTEM",
|
|
@@ -113,7 +120,9 @@ static const char *_stp_command_name[] = {
|
|
"STP_TZINFO",
|
|
"STP_PRIVILEGE_CREDENTIALS",
|
|
"STP_REMOTE_ID",
|
|
- "STP_NAMESPACES_PID",
|
|
+ "STP_MAX_CMD_PLACEHOLDER",
|
|
+ "STP_NAMESPACE_PID",
|
|
+ [STP_MAX_CMD]="?" /* in control.c, STP_MAX_CMD represents unknown message numbers/names */
|
|
};
|
|
#endif /* DEBUG_TRANS */
|
|
|
|
diff --git a/staprun/common.c b/staprun/common.c
|
|
index 3d23d7319..f8d618e24 100644
|
|
--- a/staprun/common.c
|
|
+++ b/staprun/common.c
|
|
@@ -115,7 +115,7 @@ void parse_args(int argc, char **argv)
|
|
target_pid = 0;
|
|
target_namespaces_pid = 0;
|
|
buffer_size = 0;
|
|
- reader_timeout_ms = 0;
|
|
+ reader_timeout_ms = 200;
|
|
target_cmd = NULL;
|
|
outfile_name = NULL;
|
|
rename_mod = 0;
|
|
diff --git a/staprun/mainloop.c b/staprun/mainloop.c
|
|
index 4af21e950..c507fc069 100644
|
|
--- a/staprun/mainloop.c
|
|
+++ b/staprun/mainloop.c
|
|
@@ -7,7 +7,7 @@
|
|
* Public License (GPL); either version 2, or (at your option) any
|
|
* later version.
|
|
*
|
|
- * Copyright (C) 2005-2021 Red Hat Inc.
|
|
+ * Copyright (C) 2005-2023 Red Hat Inc.
|
|
*/
|
|
|
|
#include "staprun.h"
|
|
@@ -23,31 +23,9 @@
|
|
|
|
/* globals */
|
|
int ncpus;
|
|
-static int pending_interrupts = 0;
|
|
+static volatile sig_atomic_t pending_interrupts = 0; // tells stp_main_loop to trigger STP_EXIT message to kernel
|
|
static int target_pid_failed_p = 0;
|
|
|
|
-/* Setup by setup_main_signals, used by signal_thread to notify the
|
|
- main thread of interruptable events. */
|
|
-static pthread_t main_thread;
|
|
-
|
|
-static void set_nonblocking_std_fds(void)
|
|
-{
|
|
- int fd;
|
|
- for (fd = 1; fd < 3; fd++) {
|
|
- /* NB: writing to stderr/stdout blockingly in signal handler is
|
|
- * dangerous since it may prevent the stap process from quitting
|
|
- * gracefully on receiving SIGTERM/etc signals when the stderr/stdout
|
|
- * write buffer is full. PR23891 */
|
|
- int flags = fcntl(fd, F_GETFL);
|
|
- if (flags == -1)
|
|
- continue;
|
|
-
|
|
- if (flags & O_NONBLOCK)
|
|
- continue;
|
|
-
|
|
- (void) fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
|
- }
|
|
-}
|
|
|
|
static void set_blocking_std_fds(void)
|
|
{
|
|
@@ -77,43 +55,16 @@ static void my_exit(int rc)
|
|
_exit(rc);
|
|
}
|
|
|
|
-static void *signal_thread(void *arg)
|
|
-{
|
|
- sigset_t *s = (sigset_t *) arg;
|
|
- int signum = 0;
|
|
|
|
- while (1) {
|
|
- if (sigwait(s, &signum) < 0) {
|
|
- _perr("sigwait");
|
|
- continue;
|
|
- }
|
|
+
|
|
+static void interrupt_handler(int signum)
|
|
+{
|
|
if (signum == SIGQUIT) {
|
|
load_only = 1; /* flag for stp_main_loop */
|
|
- pending_interrupts ++;
|
|
- } else if (signum == SIGINT || signum == SIGHUP || signum == SIGTERM
|
|
- || signum == SIGPIPE)
|
|
- {
|
|
- pending_interrupts ++;
|
|
}
|
|
- if (pending_interrupts > 2) {
|
|
- set_nonblocking_std_fds();
|
|
- pthread_kill (main_thread, SIGURG);
|
|
- }
|
|
- dbug(2, "sigproc %d (%s)\n", signum, strsignal(signum));
|
|
- }
|
|
- /* Notify main thread (interrupts select). */
|
|
- pthread_kill (main_thread, SIGURG);
|
|
- return NULL;
|
|
+ pending_interrupts ++;
|
|
}
|
|
|
|
-static void urg_proc(int signum)
|
|
-{
|
|
- /* This handler is just notified from the signal_thread
|
|
- whenever an interruptable condition is detected. The
|
|
- handler itself doesn't do anything. But this will
|
|
- result select to detect an EINTR event. */
|
|
- dbug(2, "urg_proc %d (%s)\n", signum, strsignal(signum));
|
|
-}
|
|
|
|
static void chld_proc(int signum)
|
|
{
|
|
@@ -143,9 +94,9 @@ static void chld_proc(int signum)
|
|
(void) rc; /* XXX: notused */
|
|
}
|
|
|
|
+
|
|
static void setup_main_signals(void)
|
|
{
|
|
- pthread_t tid;
|
|
struct sigaction sa;
|
|
sigset_t *s = malloc(sizeof(*s));
|
|
if (!s) {
|
|
@@ -153,25 +104,11 @@ static void setup_main_signals(void)
|
|
exit(1);
|
|
}
|
|
|
|
- /* The main thread will only handle SIGCHLD and SIGURG.
|
|
- SIGURG is send from the signal thread in case the interrupt
|
|
- flag is set. This will then interrupt any select call. */
|
|
- main_thread = pthread_self();
|
|
- sigfillset(s);
|
|
- pthread_sigmask(SIG_SETMASK, s, NULL);
|
|
-
|
|
memset(&sa, 0, sizeof(sa));
|
|
/* select will report EINTR even when SA_RESTART is set. */
|
|
sa.sa_flags = SA_RESTART;
|
|
sigfillset(&sa.sa_mask);
|
|
|
|
- /* Ignore all these events on the main thread. */
|
|
- sa.sa_handler = SIG_IGN;
|
|
- sigaction(SIGINT, &sa, NULL);
|
|
- sigaction(SIGTERM, &sa, NULL);
|
|
- sigaction(SIGHUP, &sa, NULL);
|
|
- sigaction(SIGQUIT, &sa, NULL);
|
|
-
|
|
/* This is to notify when our child process (-c) ends. */
|
|
sa.sa_handler = chld_proc;
|
|
sigaction(SIGCHLD, &sa, NULL);
|
|
@@ -182,26 +119,21 @@ static void setup_main_signals(void)
|
|
sigaction(SIGWINCH, &sa, NULL);
|
|
}
|
|
|
|
- /* This signal handler is notified from the signal_thread
|
|
- whenever a interruptable event is detected. It will
|
|
- result in an EINTR event for select or sleep. */
|
|
- sa.sa_handler = urg_proc;
|
|
- sigaction(SIGURG, &sa, NULL);
|
|
-
|
|
- /* Everything else is handled on a special signal_thread. */
|
|
- sigemptyset(s);
|
|
- sigaddset(s, SIGINT);
|
|
- sigaddset(s, SIGTERM);
|
|
- sigaddset(s, SIGHUP);
|
|
- sigaddset(s, SIGQUIT);
|
|
- sigaddset(s, SIGPIPE);
|
|
- pthread_sigmask(SIG_SETMASK, s, NULL);
|
|
- if (pthread_create(&tid, NULL, signal_thread, s) < 0) {
|
|
- _perr(_("failed to create thread"));
|
|
- exit(1);
|
|
- }
|
|
+ // listen to these signals via general interrupt handler in whatever thread
|
|
+ memset(&sa, 0, sizeof(sa));
|
|
+ sa.sa_flags = SA_RESTART;
|
|
+ sigfillset(&sa.sa_mask);
|
|
+
|
|
+ sa.sa_handler = interrupt_handler;
|
|
+ sigaction(SIGINT, &sa, NULL);
|
|
+ sigaction(SIGTERM, &sa, NULL);
|
|
+ sigaction(SIGHUP, &sa, NULL);
|
|
+ sigaction(SIGQUIT, &sa, NULL);
|
|
+
|
|
+ // Formerly, we had a signal catching thread.
|
|
}
|
|
|
|
+
|
|
/**
|
|
* system_cmd() executes system commands in response
|
|
* to an STP_SYSTEM message from the module. These
|
|
diff --git a/staprun/relay.c b/staprun/relay.c
|
|
index dea1d5ae9..08850b246 100644
|
|
--- a/staprun/relay.c
|
|
+++ b/staprun/relay.c
|
|
@@ -7,30 +7,32 @@
|
|
* Public License (GPL); either version 2, or (at your option) any
|
|
* later version.
|
|
*
|
|
- * Copyright (C) 2007-2013 Red Hat Inc.
|
|
+ * Copyright (C) 2007-2023 Red Hat Inc.
|
|
*/
|
|
|
|
#include "staprun.h"
|
|
+#include <string.h>
|
|
+#ifdef HAVE_STDATOMIC_H
|
|
+#include <stdatomic.h>
|
|
+#endif
|
|
+#define NDEBUG
|
|
+#include "gheap.h"
|
|
+
|
|
|
|
int out_fd[MAX_NR_CPUS];
|
|
int monitor_end = 0;
|
|
static pthread_t reader[MAX_NR_CPUS];
|
|
-static int relay_fd[MAX_NR_CPUS];
|
|
+static int relay_fd[MAX_NR_CPUS]; // fd to kernel per-cpu relayfs
|
|
static int avail_cpus[MAX_NR_CPUS];
|
|
-static int switch_file[MAX_NR_CPUS];
|
|
-static pthread_mutex_t mutex[MAX_NR_CPUS];
|
|
+static volatile sig_atomic_t sigusr2_count; // number of SIGUSR2's received by process
|
|
+static int sigusr2_processed[MAX_NR_CPUS]; // each thread's count of processed SIGUSR2's
|
|
static int bulkmode = 0;
|
|
-static volatile int stop_threads = 0;
|
|
+static volatile int stop_threads = 0; // set during relayfs_close to signal threads to die
|
|
static time_t *time_backlog[MAX_NR_CPUS];
|
|
static int backlog_order=0;
|
|
#define BACKLOG_MASK ((1 << backlog_order) - 1)
|
|
#define MONITORLINELENGTH 4096
|
|
|
|
-/* tracking message sequence #s for cross-cpu merging */
|
|
-static uint32_t last_sequence_number;
|
|
-static pthread_mutex_t last_sequence_number_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
-static pthread_cond_t last_sequence_number_changed = PTHREAD_COND_INITIALIZER;
|
|
-
|
|
#ifdef NEED_PPOLL
|
|
int ppoll(struct pollfd *fds, nfds_t nfds,
|
|
const struct timespec *timeout, const sigset_t *sigmask)
|
|
@@ -123,18 +125,375 @@ static int switch_outfile(int cpu, int *fnum)
|
|
return 0;
|
|
}
|
|
|
|
+
|
|
+
|
|
+/* In serialized (non-bulk) output mode, ndividual messages that have
|
|
+ been received from the kernel per-cpu relays are stored in an central
|
|
+ serializing data structure - in this case, a heap. They are ordered
|
|
+ by message sequence number. An additional thread (serializer_thread)
|
|
+ scans & sequences the output. */
|
|
+struct serialized_message {
|
|
+ union {
|
|
+ struct _stp_trace bufhdr;
|
|
+ char bufhdr_raw[sizeof(struct _stp_trace)];
|
|
+ };
|
|
+ time_t received; // timestamp when this message was enqueued
|
|
+ char *buf; // malloc()'d size >= rounded(bufhdr.pdu_len)
|
|
+};
|
|
+static struct serialized_message* buffer_heap = NULL; // the heap
|
|
+
|
|
+// NB: we control memory via realloc(), gheap just manipulates entries in place
|
|
+static unsigned buffer_heap_size = 0; // used number of entries
|
|
+static unsigned buffer_heap_alloc = 0; // allocation length, always >= buffer_heap_size
|
|
+static unsigned last_sequence_number = 0; // last processed sequential message number
|
|
+
|
|
+#ifdef HAVE_STDATOMIC_H
|
|
+static atomic_ulong lost_message_count = 0; // how many sequence numbers we know we missed
|
|
+static atomic_ulong lost_byte_count = 0; // how many bytes were skipped during resync
|
|
+#else
|
|
+static unsigned long lost_message_count = 0; // how many sequence numbers we know we missed
|
|
+static unsigned long lost_byte_count = 0; // how many bytes were skipped during resync
|
|
+#endif
|
|
+
|
|
+// concurrency control for the buffer_heap
|
|
+static pthread_cond_t buffer_heap_cond = PTHREAD_COND_INITIALIZER;
|
|
+static pthread_mutex_t buffer_heap_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
+static pthread_t serializer_thread; // ! bulkmode only
|
|
+
|
|
+
|
|
+static void buffer_heap_mover (void *const dest, const void *const src)
|
|
+{
|
|
+ memmove (dest, src, sizeof(struct serialized_message));
|
|
+}
|
|
+
|
|
+// NB: since we want to sort messages into an INCREASING heap sequence,
|
|
+// we reverse the normal comparison operator. gheap_pop_heap() should
|
|
+// therefore return the SMALLEST element.
|
|
+static int buffer_heap_comparer (const void *const ctx, const void *a, const void *b)
|
|
+{
|
|
+ (void) ctx;
|
|
+ uint32_t aa = ((struct serialized_message *)a)->bufhdr.sequence;
|
|
+ uint32_t bb = ((struct serialized_message *)b)->bufhdr.sequence;
|
|
+ return (aa > bb);
|
|
+}
|
|
+
|
|
+static const struct gheap_ctx buffer_heap_ctx = {
|
|
+ .item_size = sizeof(struct serialized_message),
|
|
+ .less_comparer = buffer_heap_comparer,
|
|
+ .item_mover = buffer_heap_mover,
|
|
+ .page_chunks = 16, // arbitrary
|
|
+ .fanout = 2 // standard binary heap
|
|
+};
|
|
+
|
|
+
|
|
+#define MAX_MESSAGE_LENGTH (128*1024) /* maximum likely length of a single pdu */
|
|
+
|
|
+
|
|
+
|
|
+/* Thread that reads per-cpu messages, and stuffs complete ones into
|
|
+ dynamically allocated serialized_message nodes in a binary tree. */
|
|
+static void* reader_thread_serialmode (void *data)
|
|
+{
|
|
+ int rc, cpu = (int)(long)data;
|
|
+ struct pollfd pollfd;
|
|
+ sigset_t sigs;
|
|
+ cpu_set_t cpu_mask;
|
|
+
|
|
+ sigemptyset(&sigs);
|
|
+ sigaddset(&sigs,SIGUSR2);
|
|
+ pthread_sigmask(SIG_BLOCK, &sigs, NULL);
|
|
+
|
|
+ sigfillset(&sigs);
|
|
+ sigdelset(&sigs,SIGUSR2);
|
|
+
|
|
+ CPU_ZERO(&cpu_mask);
|
|
+ CPU_SET(cpu, &cpu_mask);
|
|
+ if( sched_setaffinity( 0, sizeof(cpu_mask), &cpu_mask ) < 0 )
|
|
+ _perr("sched_setaffinity");
|
|
+
|
|
+ pollfd.fd = relay_fd[cpu];
|
|
+ pollfd.events = POLLIN;
|
|
+
|
|
+ while (! stop_threads) {
|
|
+ // read a message header
|
|
+ struct serialized_message message;
|
|
+
|
|
+ /* 200ms, close to human level of "instant" */
|
|
+ struct timespec tim, *timeout = &tim;
|
|
+ timeout->tv_sec = reader_timeout_ms / 1000;
|
|
+ timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
|
|
+
|
|
+ rc = ppoll(&pollfd, 1, timeout, &sigs);
|
|
+ if (rc < 0) {
|
|
+ dbug(3, "cpu=%d poll=%d errno=%d\n", cpu, rc, errno);
|
|
+ if (errno == EINTR) {
|
|
+ if (stop_threads)
|
|
+ break;
|
|
+ } else {
|
|
+ _perr("poll error");
|
|
+ goto error_out;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // set the timestamp
|
|
+ message.received = time(NULL);
|
|
+
|
|
+ /* Read the header. */
|
|
+ rc = read(relay_fd[cpu], &message.bufhdr, sizeof(message.bufhdr));
|
|
+ if (rc <= 0) /* seen during normal shutdown or error */
|
|
+ continue;
|
|
+ if (rc != sizeof(message.bufhdr)) {
|
|
+ lost_byte_count += rc;
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ /* Validate the magic value. In case of mismatch,
|
|
+ keep on reading & shifting the header, one byte at
|
|
+ a time, until we get a match. */
|
|
+ while (! stop_threads && memcmp(message.bufhdr.magic, STAP_TRACE_MAGIC, 4)) {
|
|
+ lost_byte_count ++;
|
|
+ memmove(& message.bufhdr_raw[0],
|
|
+ & message.bufhdr_raw[1],
|
|
+ sizeof(message.bufhdr_raw)-1);
|
|
+ rc = read(relay_fd[cpu],
|
|
+ &message.bufhdr_raw[sizeof(message.bufhdr_raw)-1],
|
|
+ 1);
|
|
+ if (rc <= 0) /* seen during normal shutdown or error */
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ /* Validate it slightly. Because of lost messages, we might be getting
|
|
+ not a proper _stp_trace struct but the interior of some piece of
|
|
+ trace text message. XXX: validate bufhdr.sequence a little bit too? */
|
|
+ if (message.bufhdr.pdu_len == 0 ||
|
|
+ message.bufhdr.pdu_len > MAX_MESSAGE_LENGTH) {
|
|
+ lost_byte_count += sizeof(message.bufhdr);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ // Allocate the pdu body
|
|
+ message.buf = malloc(message.bufhdr.pdu_len);
|
|
+ if (message.buf == NULL)
|
|
+ {
|
|
+ lost_byte_count += message.bufhdr.pdu_len;
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ /* Read the message, perhaps in pieces (such as if crossing
|
|
+ * relayfs subbuf boundaries). */
|
|
+ size_t bufread = 0;
|
|
+ while (bufread < message.bufhdr.pdu_len) {
|
|
+ rc = read(relay_fd[cpu], message.buf+bufread, message.bufhdr.pdu_len-bufread);
|
|
+ if (rc <= 0) {
|
|
+ lost_byte_count += message.bufhdr.pdu_len-bufread;
|
|
+ break; /* still process it; hope we can resync at next packet. */
|
|
+ }
|
|
+ bufread += rc;
|
|
+ }
|
|
+
|
|
+ // plop the message into the buffer_heap
|
|
+ pthread_mutex_lock(& buffer_heap_mutex);
|
|
+ if (message.bufhdr.sequence < last_sequence_number) {
|
|
+ // whoa! is this some old message that we've assumed lost?
|
|
+ // or are we wrapping around the uint_32 sequence numbers?
|
|
+ _perr("unexpected sequence=%u", message.bufhdr.sequence);
|
|
+ }
|
|
+
|
|
+ // is it large enough? if not, realloc
|
|
+ if (buffer_heap_alloc - buffer_heap_size == 0) { // full
|
|
+ unsigned new_buffer_heap_alloc = (buffer_heap_alloc + 1) * 1.5;
|
|
+ struct serialized_message *new_buffer_heap =
|
|
+ realloc(buffer_heap,
|
|
+ new_buffer_heap_alloc * sizeof(struct serialized_message));
|
|
+ if (new_buffer_heap == NULL) {
|
|
+ _perr("out of memory while enlarging buffer heap");
|
|
+ free (message.buf);
|
|
+ lost_message_count ++;
|
|
+ pthread_mutex_unlock(& buffer_heap_mutex);
|
|
+ continue;
|
|
+ }
|
|
+ buffer_heap = new_buffer_heap;
|
|
+ buffer_heap_alloc = new_buffer_heap_alloc;
|
|
+ }
|
|
+ // plop copy of message struct into slot at end of heap
|
|
+ buffer_heap[buffer_heap_size++] = message;
|
|
+ // push it into heap
|
|
+ gheap_push_heap(&buffer_heap_ctx,
|
|
+ buffer_heap,
|
|
+ buffer_heap_size);
|
|
+ // and c'est tout
|
|
+ pthread_mutex_unlock(& buffer_heap_mutex);
|
|
+ pthread_cond_broadcast (& buffer_heap_cond);
|
|
+ dbug(3, "thread %d received seq=%u\n", cpu, message.bufhdr.sequence);
|
|
+ }
|
|
+
|
|
+ dbug(3, "exiting thread for cpu %d\n", cpu);
|
|
+ return NULL;
|
|
+
|
|
+error_out:
|
|
+ /* Signal the main thread that we need to quit */
|
|
+ kill(getpid(), SIGTERM);
|
|
+ dbug(2, "exiting thread for cpu %d after error\n", cpu);
|
|
+
|
|
+ return NULL;
|
|
+}
|
|
+
|
|
+
|
|
+// Print and free buffer of given serialized message.
|
|
+static void print_serialized_message (struct serialized_message *msg)
|
|
+{
|
|
+ // check if file switching is necessary, as per staprun -S
|
|
+
|
|
+ // NB: unlike reader_thread_bulkmode(), we don't need to use
|
|
+ // mutexes to protect switch_file[] or such, because we're the
|
|
+ // ONLY thread doing output.
|
|
+ unsigned cpu = 0; // arbitrary
|
|
+ static ssize_t wsize = 0; // how many bytes we've written into the serialized file so far
|
|
+ static int fnum = 0; // which file number we're using
|
|
+
|
|
+ if ((fsize_max && (wsize > fsize_max)) ||
|
|
+ (sigusr2_count > sigusr2_processed[cpu])) {
|
|
+ dbug(2, "switching output file wsize=%ld fsize_max=%ld sigusr2 %d > %d\n",
|
|
+ wsize, fsize_max, sigusr2_count, sigusr2_processed[cpu]);
|
|
+ sigusr2_processed[cpu] = sigusr2_count;
|
|
+ if (switch_outfile(cpu, &fnum) < 0) {
|
|
+ perr("unable to switch output file");
|
|
+ // but continue
|
|
+ }
|
|
+ wsize = 0;
|
|
+ }
|
|
+
|
|
+
|
|
+ // write loop ... could block if e.g. the output disk is slow
|
|
+ // or the user hits a ^S (XOFF) on the tty
|
|
+ ssize_t sent = 0;
|
|
+ do {
|
|
+ ssize_t ret = write (out_fd[avail_cpus[0]],
|
|
+ msg->buf+sent, msg->bufhdr.pdu_len-sent);
|
|
+ if (ret <= 0) {
|
|
+ perr("error writing output");
|
|
+ break;
|
|
+ }
|
|
+ sent += ret;
|
|
+ } while ((unsigned)sent < msg->bufhdr.pdu_len);
|
|
+ wsize += sent;
|
|
+
|
|
+ // free the associated buffer
|
|
+ free (msg->buf);
|
|
+ msg->buf = NULL;
|
|
+}
|
|
+
|
|
+
|
|
+/* Thread that checks on the heap of messages, and pumps them out to
|
|
+ the designated output fd in sequence. It waits, but only a little
|
|
+ while, if it has only fresher messages than it's expecting. It
|
|
+ exits upon a global stop_threads indication.
|
|
+*/
|
|
+static void* reader_thread_serializer (void *data) {
|
|
+ (void) data;
|
|
+ while (! stop_threads) {
|
|
+ /* timeout 0-1 seconds; this is the maximum extra time that
|
|
+ stapio will be waiting after a ^C */
|
|
+ struct timespec ts = {.tv_sec=time(NULL)+1, .tv_nsec=0};
|
|
+ int rc;
|
|
+ pthread_mutex_lock(& buffer_heap_mutex);
|
|
+ rc = pthread_cond_timedwait (& buffer_heap_cond,
|
|
+ & buffer_heap_mutex,
|
|
+ & ts);
|
|
+
|
|
+ dbug(3, "serializer cond wait rc=%d heapsize=%u\n", rc, buffer_heap_size);
|
|
+ time_t now = time(NULL);
|
|
+ unsigned processed = 0;
|
|
+ while (buffer_heap_size > 0) { // consume as much as possible
|
|
+ // check out the sequence# of the first element
|
|
+ uint32_t buf_min_seq = buffer_heap[0].bufhdr.sequence;
|
|
+
|
|
+ dbug(3, "serializer last=%u seq=%u\n", last_sequence_number, buf_min_seq);
|
|
+
|
|
+ if ((buf_min_seq == last_sequence_number + 1) || // expected seq#
|
|
+ (buffer_heap[0].received + 2 <= now)) { // message too old
|
|
+ // "we've got one!" -- or waited too long for one
|
|
+ // get it off the head of the heap
|
|
+ gheap_pop_heap(&buffer_heap_ctx,
|
|
+ buffer_heap,
|
|
+ buffer_heap_size);
|
|
+ buffer_heap_size --; // becomes index where the head was moved
|
|
+ processed ++;
|
|
+
|
|
+ // take a copy of the whole message
|
|
+ struct serialized_message msg = buffer_heap[buffer_heap_size];
|
|
+
|
|
+ // paranoid clear this field of the now-unused slot
|
|
+ buffer_heap[buffer_heap_size].buf = NULL;
|
|
+ // update statistics
|
|
+ if (attach_mod == 1 && last_sequence_number == 0) // first message after staprun -A
|
|
+ ; // do not penalize it with lost messages
|
|
+ else
|
|
+ lost_message_count += (buf_min_seq - last_sequence_number - 1);
|
|
+ last_sequence_number = buf_min_seq;
|
|
+
|
|
+ // unlock the mutex, permitting
|
|
+ // reader_thread_serialmode threads to
|
|
+ // resume piling messages into the
|
|
+ // heap while we print stuff
|
|
+ pthread_mutex_unlock(& buffer_heap_mutex);
|
|
+
|
|
+ print_serialized_message (& msg);
|
|
+
|
|
+ // must re-take lock for next iteration of the while loop
|
|
+ pthread_mutex_lock(& buffer_heap_mutex);
|
|
+ } else {
|
|
+ // processed as much of the heap as we
|
|
+ // could this time; wait for the
|
|
+ // condition again
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ pthread_mutex_unlock(& buffer_heap_mutex);
|
|
+ if (processed > 0)
|
|
+ dbug(2, "serializer processed n=%u\n", processed);
|
|
+ }
|
|
+ return NULL;
|
|
+}
|
|
+
|
|
+
|
|
+
|
|
+// At the end of the program main loop, flush out any the remaining
|
|
+// messages and free up all that heapy data.
|
|
+static void reader_serialized_flush()
|
|
+{
|
|
+ dbug(3, "serializer flushing messages=%u\n", buffer_heap_size);
|
|
+ while (buffer_heap_size > 0) { // consume it all
|
|
+ // check out the sequence# of the first element
|
|
+ uint32_t buf_min_seq = buffer_heap[0].bufhdr.sequence;
|
|
+ dbug(3, "serializer seq=%u\n", buf_min_seq);
|
|
+ gheap_pop_heap(&buffer_heap_ctx,
|
|
+ buffer_heap,
|
|
+ buffer_heap_size);
|
|
+ buffer_heap_size --; // also index where the head was moved
|
|
+
|
|
+ // NB: no need for mutex manipulations, this is super single threaded
|
|
+ print_serialized_message (& buffer_heap[buffer_heap_size]);
|
|
+
|
|
+ lost_message_count += (buf_min_seq - last_sequence_number - 1);
|
|
+ last_sequence_number = buf_min_seq;
|
|
+ }
|
|
+ free (buffer_heap);
|
|
+ buffer_heap = NULL;
|
|
+}
|
|
+
|
|
+
|
|
+
|
|
/**
|
|
- * reader_thread - per-cpu channel buffer reader
|
|
+ * reader_thread - per-cpu channel buffer reader, bulkmode (one output file per cpu input file)
|
|
*/
|
|
-static void *reader_thread(void *data)
|
|
+static void *reader_thread_bulkmode (void *data)
|
|
{
|
|
- char buf[128*1024]; // NB: maximum possible output amount from a single probe hit's print_flush
|
|
+ char buf[MAX_MESSAGE_LENGTH];
|
|
struct _stp_trace bufhdr;
|
|
|
|
int rc, cpu = (int)(long)data;
|
|
struct pollfd pollfd;
|
|
- /* 200ms, close to human level of "instant" */
|
|
- struct timespec tim = {.tv_sec=0, .tv_nsec=200000000}, *timeout = &tim;
|
|
sigset_t sigs;
|
|
off_t wsize = 0;
|
|
int fnum = 0;
|
|
@@ -151,44 +510,30 @@ static void *reader_thread(void *data)
|
|
CPU_SET(cpu, &cpu_mask);
|
|
if( sched_setaffinity( 0, sizeof(cpu_mask), &cpu_mask ) < 0 )
|
|
_perr("sched_setaffinity");
|
|
-#ifdef NEED_PPOLL
|
|
- /* Without a real ppoll, there is a small race condition that could */
|
|
- /* block ppoll(). So use a timeout to prevent that. */
|
|
- timeout->tv_sec = 10;
|
|
- timeout->tv_nsec = 0;
|
|
-#else
|
|
- timeout = NULL;
|
|
-#endif
|
|
-
|
|
- if (reader_timeout_ms && timeout) {
|
|
- timeout->tv_sec = reader_timeout_ms / 1000;
|
|
- timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
|
|
- }
|
|
|
|
pollfd.fd = relay_fd[cpu];
|
|
pollfd.events = POLLIN;
|
|
|
|
do {
|
|
- dbug(3, "thread %d start ppoll\n", cpu);
|
|
+ /* 200ms, close to human level of "instant" */
|
|
+ struct timespec tim, *timeout = &tim;
|
|
+ timeout->tv_sec = reader_timeout_ms / 1000;
|
|
+ timeout->tv_nsec = (reader_timeout_ms - timeout->tv_sec * 1000) * 1000000;
|
|
+
|
|
rc = ppoll(&pollfd, 1, timeout, &sigs);
|
|
- dbug(3, "thread %d end ppoll:%d\n", cpu, rc);
|
|
if (rc < 0) {
|
|
dbug(3, "cpu=%d poll=%d errno=%d\n", cpu, rc, errno);
|
|
if (errno == EINTR) {
|
|
if (stop_threads)
|
|
break;
|
|
|
|
- pthread_mutex_lock(&mutex[cpu]);
|
|
- if (switch_file[cpu]) {
|
|
- if (switch_outfile(cpu, &fnum) < 0) {
|
|
- switch_file[cpu] = 0;
|
|
- pthread_mutex_unlock(&mutex[cpu]);
|
|
+ if (sigusr2_count > sigusr2_processed[cpu]) {
|
|
+ sigusr2_processed[cpu] = sigusr2_count;
|
|
+ if (switch_outfile(cpu, &fnum) < 0) {
|
|
goto error_out;
|
|
- }
|
|
- switch_file[cpu] = 0;
|
|
- wsize = 0;
|
|
+ }
|
|
+ wsize = 0;
|
|
}
|
|
- pthread_mutex_unlock(&mutex[cpu]);
|
|
} else {
|
|
_perr("poll error");
|
|
goto error_out;
|
|
@@ -197,7 +542,7 @@ static void *reader_thread(void *data)
|
|
|
|
/* Read the header. */
|
|
rc = read(relay_fd[cpu], &bufhdr, sizeof(bufhdr));
|
|
- if (rc == 0) /* seen during normal shutdown */
|
|
+ if (rc <= 0) /* seen during normal shutdown */
|
|
continue;
|
|
if (rc != sizeof(bufhdr)) {
|
|
_perr("bufhdr read error, attempting resync");
|
|
@@ -228,41 +573,20 @@ static void *reader_thread(void *data)
|
|
bufread += rc;
|
|
}
|
|
|
|
- if (! bulkmode) {
|
|
- /* Wait until the bufhdr.sequence number indicates it's OUR TURN to go ahead. */
|
|
- struct timespec ts = {.tv_sec=time(NULL)+2, .tv_nsec=0}; /* wait 1-2 seconds */
|
|
- pthread_mutex_lock(& last_sequence_number_mutex);
|
|
- while ((last_sequence_number+1 != bufhdr.sequence) && /* normal case */
|
|
- (last_sequence_number < bufhdr.sequence)) { /* we're late!!! */
|
|
- int rc = pthread_cond_timedwait (& last_sequence_number_changed,
|
|
- & last_sequence_number_mutex,
|
|
- & ts);
|
|
- if (rc == ETIMEDOUT) {
|
|
- /* _perr("message sequencing timeout"); */
|
|
- break;
|
|
- }
|
|
- }
|
|
- pthread_mutex_unlock(& last_sequence_number_mutex);
|
|
- }
|
|
-
|
|
int wbytes = rc;
|
|
char *wbuf = buf;
|
|
|
|
dbug(3, "cpu %d: read %d bytes of data\n", cpu, rc);
|
|
|
|
/* Switching file */
|
|
- pthread_mutex_lock(&mutex[cpu]);
|
|
if ((fsize_max && ((wsize + rc) > fsize_max)) ||
|
|
- switch_file[cpu]) {
|
|
+ (sigusr2_count > sigusr2_processed[cpu])) {
|
|
+ sigusr2_processed[cpu] = sigusr2_count;
|
|
if (switch_outfile(cpu, &fnum) < 0) {
|
|
- switch_file[cpu] = 0;
|
|
- pthread_mutex_unlock(&mutex[cpu]);
|
|
goto error_out;
|
|
}
|
|
- switch_file[cpu] = 0;
|
|
wsize = 0;
|
|
}
|
|
- pthread_mutex_unlock(&mutex[cpu]);
|
|
|
|
/* Copy loop. Must repeat write(2) in case of a pipe overflow
|
|
or other transient fullness. */
|
|
@@ -291,13 +615,8 @@ static void *reader_thread(void *data)
|
|
int fd;
|
|
/* Only bulkmode and fsize_max use per-cpu output files. Otherwise,
|
|
there's just a single output fd stored at out_fd[avail_cpus[0]]. */
|
|
- if (bulkmode || fsize_max)
|
|
- fd = out_fd[cpu];
|
|
- else
|
|
- fd = out_fd[avail_cpus[0]];
|
|
- rc = 0;
|
|
- if (bulkmode)
|
|
- rc = write(fd, &bufhdr, sizeof(bufhdr)); // write header
|
|
+ fd = out_fd[cpu];
|
|
+ rc = write(fd, &bufhdr, sizeof(bufhdr)); // write header
|
|
rc |= write(fd, wbuf, wbytes); // write payload
|
|
if (rc <= 0) {
|
|
perr("Couldn't write to output %d for cpu %d, exiting.",
|
|
@@ -310,14 +629,6 @@ static void *reader_thread(void *data)
|
|
}
|
|
}
|
|
|
|
- /* update the sequence number & let other cpus go ahead */
|
|
- pthread_mutex_lock(& last_sequence_number_mutex);
|
|
- if (last_sequence_number < bufhdr.sequence) { /* not if someone leapfrogged us */
|
|
- last_sequence_number = bufhdr.sequence;
|
|
- pthread_cond_broadcast (& last_sequence_number_changed);
|
|
- }
|
|
- pthread_mutex_unlock(& last_sequence_number_mutex);
|
|
-
|
|
} while (!stop_threads);
|
|
dbug(3, "exiting thread for cpu %d\n", cpu);
|
|
return(NULL);
|
|
@@ -329,41 +640,16 @@ error_out:
|
|
return(NULL);
|
|
}
|
|
|
|
+
|
|
static void switchfile_handler(int sig)
|
|
{
|
|
- int i;
|
|
+ (void) sig;
|
|
if (stop_threads || !outfile_name)
|
|
return;
|
|
-
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- pthread_mutex_lock(&mutex[avail_cpus[i]]);
|
|
- if (reader[avail_cpus[i]] && switch_file[avail_cpus[i]]) {
|
|
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
|
|
- dbug(2, "file switching is progressing, signal ignored.\n", sig);
|
|
- return;
|
|
- }
|
|
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
|
|
- }
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- pthread_mutex_lock(&mutex[avail_cpus[i]]);
|
|
- if (reader[avail_cpus[i]]) {
|
|
- switch_file[avail_cpus[i]] = 1;
|
|
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
|
|
-
|
|
- // Make sure we don't send the USR2 signal to
|
|
- // ourselves.
|
|
- if (pthread_equal(pthread_self(),
|
|
- reader[avail_cpus[i]]))
|
|
- break;
|
|
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
|
|
- }
|
|
- else {
|
|
- pthread_mutex_unlock(&mutex[avail_cpus[i]]);
|
|
- break;
|
|
- }
|
|
- }
|
|
+ sigusr2_count ++;
|
|
}
|
|
|
|
+
|
|
/**
|
|
* init_relayfs - create files and threads for relayfs processing
|
|
*
|
|
@@ -507,19 +793,20 @@ int init_relayfs(void)
|
|
sigaction(SIGUSR2, &sa, NULL);
|
|
|
|
dbug(2, "starting threads\n");
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- if (pthread_mutex_init(&mutex[avail_cpus[i]], NULL) < 0) {
|
|
- _perr("failed to create mutex");
|
|
- return -1;
|
|
- }
|
|
- }
|
|
for (i = 0; i < ncpus; i++) {
|
|
- if (pthread_create(&reader[avail_cpus[i]], NULL, reader_thread,
|
|
+ if (pthread_create(&reader[avail_cpus[i]], NULL,
|
|
+ bulkmode ? reader_thread_bulkmode : reader_thread_serialmode,
|
|
(void *)(long)avail_cpus[i]) < 0) {
|
|
_perr("failed to create thread");
|
|
return -1;
|
|
}
|
|
}
|
|
+ if (! bulkmode)
|
|
+ if (pthread_create(&serializer_thread, NULL,
|
|
+ reader_thread_serializer, NULL) < 0) {
|
|
+ _perr("failed to create thread");
|
|
+ return -1;
|
|
+ }
|
|
|
|
return 0;
|
|
}
|
|
@@ -529,27 +816,31 @@ void close_relayfs(void)
|
|
int i;
|
|
stop_threads = 1;
|
|
dbug(2, "closing\n");
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- if (reader[avail_cpus[i]])
|
|
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
|
|
- else
|
|
- break;
|
|
- }
|
|
+
|
|
for (i = 0; i < ncpus; i++) {
|
|
if (reader[avail_cpus[i]])
|
|
pthread_join(reader[avail_cpus[i]], NULL);
|
|
else
|
|
break;
|
|
}
|
|
+ if (! bulkmode) {
|
|
+ if (serializer_thread) // =0 on load_only!
|
|
+ pthread_join(serializer_thread, NULL);
|
|
+ // at this point, we know all reader and writer
|
|
+ // threads for the buffer_heap are dead.
|
|
+ reader_serialized_flush();
|
|
+
|
|
+ if (lost_message_count > 0 || lost_byte_count > 0)
|
|
+ eprintf("WARNING: There were %u lost messages and %u lost bytes.\n",
|
|
+ lost_message_count, lost_byte_count);
|
|
+ }
|
|
+
|
|
for (i = 0; i < ncpus; i++) {
|
|
if (relay_fd[avail_cpus[i]] >= 0)
|
|
close(relay_fd[avail_cpus[i]]);
|
|
else
|
|
break;
|
|
}
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- pthread_mutex_destroy(&mutex[avail_cpus[i]]);
|
|
- }
|
|
dbug(2, "done\n");
|
|
}
|
|
|
|
@@ -558,12 +849,6 @@ void kill_relayfs(void)
|
|
int i;
|
|
stop_threads = 1;
|
|
dbug(2, "killing\n");
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- if (reader[avail_cpus[i]])
|
|
- pthread_kill(reader[avail_cpus[i]], SIGUSR2);
|
|
- else
|
|
- break;
|
|
- }
|
|
for (i = 0; i < ncpus; i++) {
|
|
if (reader[avail_cpus[i]])
|
|
pthread_cancel(reader[avail_cpus[i]]); /* no wait */
|
|
@@ -576,8 +861,5 @@ void kill_relayfs(void)
|
|
else
|
|
break;
|
|
}
|
|
- for (i = 0; i < ncpus; i++) {
|
|
- pthread_mutex_destroy(&mutex[avail_cpus[i]]);
|
|
- }
|
|
dbug(2, "done\n");
|
|
}
|
|
diff --git a/staprun/stap_merge.c b/staprun/stap_merge.c
|
|
index 87de7d465..b210db663 100644
|
|
--- a/staprun/stap_merge.c
|
|
+++ b/staprun/stap_merge.c
|
|
@@ -76,6 +76,7 @@ int main (int argc, char *argv[])
|
|
fprintf(stderr, "error opening file %s.\n", argv[optind - 1]);
|
|
return -1;
|
|
}
|
|
+ (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
|
|
if (fread (buf, TIMESTAMP_SIZE, 1, fp[i]))
|
|
num[i] = *((int *)buf);
|
|
else
|
|
@@ -133,6 +134,7 @@ int main (int argc, char *argv[])
|
|
count = min;
|
|
}
|
|
|
|
+ (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
|
|
if (fread (buf, TIMESTAMP_SIZE, 1, fp[j]))
|
|
num[j] = *((int *)buf);
|
|
else
|
|
diff --git a/staprun/stap_merge.tcl b/staprun/stap_merge.tcl
|
|
deleted file mode 100755
|
|
index 0c7d7b694..000000000
|
|
--- a/staprun/stap_merge.tcl
|
|
+++ /dev/null
|
|
@@ -1,101 +0,0 @@
|
|
-#!/usr/bin/env tclsh
|
|
-#
|
|
-# stap_merge.tcl - systemtap merge program
|
|
-#
|
|
-# This program is free software; you can redistribute it and/or modify
|
|
-# it under the terms of the GNU General Public License as published by
|
|
-# the Free Software Foundation; either version 2 of the License, or
|
|
-# (at your option) any later version.
|
|
-#
|
|
-# This program is distributed in the hope that it will be useful,
|
|
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
-# GNU General Public License for more details.
|
|
-#
|
|
-# You should have received a copy of the GNU General Public License
|
|
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
-#
|
|
-# Copyright (C) Red Hat Inc, 2007
|
|
-#
|
|
-#
|
|
-
|
|
-proc usage {} {
|
|
- puts stderr "$::argv0 \[-v\] \[-o output_filename\] input_files ...\n"
|
|
- exit 1
|
|
-}
|
|
-
|
|
-set outfile "stdout"
|
|
-set verbose 0
|
|
-set index 0
|
|
-while {[string match -* [lindex $argv $index]]} {
|
|
- switch -glob -- [lindex $argv $index] {
|
|
- -v {set verbose 1}
|
|
- -o {incr index; set outfile [lindex $argv $index]}
|
|
- default {usage}
|
|
- }
|
|
- incr index
|
|
-}
|
|
-
|
|
-if {$tcl_platform(byteOrder) == "littleEndian"} {
|
|
- set int_format i
|
|
-} else {
|
|
- set int_format I
|
|
-}
|
|
-
|
|
-set files [lrange $argv $index end]
|
|
-
|
|
-set n 0
|
|
-foreach file $files {
|
|
- if {[catch {open $file} fd($n)]} {
|
|
- puts stderr $fd($n)
|
|
- exit 1
|
|
- }
|
|
- fconfigure $fd($n) -translation binary
|
|
- if {![binary scan [read $fd($n) 4] $int_format timestamp($n)]} {
|
|
- continue
|
|
- }
|
|
- set timestamp($n) [expr $timestamp($n) & 0xFFFFFFFF]
|
|
- incr n
|
|
-}
|
|
-set ncpus $n
|
|
-
|
|
-if {$outfile != "stdout"} {
|
|
- if {[catch {open $outfile w} outfile]} {
|
|
- puts stderr $outfile
|
|
- exit 1
|
|
- }
|
|
-}
|
|
-fconfigure $outfile -translation binary
|
|
-
|
|
-while {1} {
|
|
- set mincpu -1
|
|
- for {set n 0} {$n < $ncpus} {incr n} {
|
|
- if {[info exists fd($n)] && (![info exists min] || $timestamp($n) <= $min)} {
|
|
- set min $timestamp($n)
|
|
- set mincpu $n
|
|
- }
|
|
- }
|
|
-
|
|
- if {![info exists min]} {break}
|
|
-
|
|
- if {![binary scan [read $fd($mincpu) 4] $int_format len]} {
|
|
- puts stderr "Error reading length from channel $mincpu"
|
|
- exit 1
|
|
- }
|
|
-
|
|
- if {$verbose == 1} {
|
|
- puts stderr "\[CPU:$mincpu, seq=$min, length=$len\]"
|
|
- }
|
|
-
|
|
- set data [read $fd($mincpu) $len]
|
|
- puts -nonewline $outfile $data
|
|
-
|
|
- set data [read $fd($mincpu) 4]
|
|
- if {$data == ""} {
|
|
- unset fd($mincpu)
|
|
- } else {
|
|
- binary scan $data $int_format timestamp($mincpu)
|
|
- set timestamp($mincpu) [expr $timestamp($mincpu) & 0xFFFFFFFF]
|
|
- }
|
|
- unset min
|
|
-}
|
|
diff --git a/staprun/staprun.8 b/staprun/staprun.8
|
|
index 3bc16ab95..4e1ca9af6 100644
|
|
--- a/staprun/staprun.8
|
|
+++ b/staprun/staprun.8
|
|
@@ -120,7 +120,7 @@ remote_id() and remote_uri().
|
|
Sets the maximum size of output file and the maximum number of output files.
|
|
If the size of output file will exceed
|
|
.B size
|
|
-, systemtap switches output file to the next file. And if the number of
|
|
+megabytes, systemtap switches output file to the next file. And if the number of
|
|
output files exceed
|
|
.B N
|
|
, systemtap removes the oldest output file. You can omit the second argument.
|
|
commit 2442beb99eeab3144c2622cae1fc98b999f72108
|
|
gpg: Signature made Mon 14 Aug 2023 01:55:27 PM EDT
|
|
gpg: using RSA key 5D38116FA4D3A7CC77E378D37E83610126DCC2E8
|
|
gpg: Good signature from "Frank Ch. Eigler <fche@elastic.org>" [full]
|
|
Author: Frank Ch. Eigler <fche@redhat.com>
|
|
Date: Mon Aug 14 13:54:50 2023 -0400
|
|
|
|
PR29108 / BZ2095359 tweak: stap_merge magic handling
|
|
|
|
We don't bother do much error checking in this infrequently used
|
|
tool, but gcc warnings require us to do some.
|
|
|
|
diff --git a/staprun/stap_merge.c b/staprun/stap_merge.c
|
|
index b210db663..388b14938 100644
|
|
--- a/staprun/stap_merge.c
|
|
+++ b/staprun/stap_merge.c
|
|
@@ -76,7 +76,8 @@ int main (int argc, char *argv[])
|
|
fprintf(stderr, "error opening file %s.\n", argv[optind - 1]);
|
|
return -1;
|
|
}
|
|
- (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
|
|
+ if (fread(buf, 4, 1, fp[i]) != 1) // read magic word
|
|
+ fprintf(stderr, "warning: erro reading magic word\n");
|
|
if (fread (buf, TIMESTAMP_SIZE, 1, fp[i]))
|
|
num[i] = *((int *)buf);
|
|
else
|
|
@@ -134,7 +135,8 @@ int main (int argc, char *argv[])
|
|
count = min;
|
|
}
|
|
|
|
- (void) fread(buf, 4, 1, fp[i]); // read & ignore magic word
|
|
+ if (fread(buf, 4, 1, fp[i]) != 1) // read magic word
|
|
+ fprintf(stderr, "warning: erro reading magic word\n");
|
|
if (fread (buf, TIMESTAMP_SIZE, 1, fp[j]))
|
|
num[j] = *((int *)buf);
|
|
else
|