Compare commits

...

No commits in common. "c9s" and "c8" have entirely different histories.
c9s ... c8

11 changed files with 134 additions and 1762 deletions

14
.gitignore vendored
View File

@ -1,13 +1 @@
/librdkafka-0.9.2.tar.gz SOURCES/librdkafka-1.6.1.tar.gz
/librdkafka-0.9.4.tar.gz
/librdkafka-0.9.5.tar.gz
/librdkafka-0.11.0.tar.gz
/librdkafka-0.11.1.tar.gz
/librdkafka-0.11.3.tar.gz
/librdkafka-0.11.4.tar.gz
/librdkafka-0.11.5.tar.gz
/librdkafka-0.11.6.tar.gz
/librdkafka-1.3.0.tar.gz
/librdkafka-1.5.0.tar.gz
/librdkafka-1.6.0.tar.gz
/librdkafka-1.6.1.tar.gz

View File

@ -1,11 +1,11 @@
diff -up librdkafka-1.6.1/src/rdkafka_conf.c.orig librdkafka-1.6.1/src/rdkafka_conf.c diff -up librdkafka-1.6.1/src/rdkafka_conf.c.orig librdkafka-1.6.1/src/rdkafka_conf.c
--- librdkafka-1.6.1/src/rdkafka_conf.c.orig 2022-02-03 12:07:59.066520253 +0100 --- librdkafka-1.6.1/src/rdkafka_conf.c.orig 2023-11-14 08:47:54.294933845 +0100
+++ librdkafka-1.6.1/src/rdkafka_conf.c 2022-02-03 12:18:06.477192751 +0100 +++ librdkafka-1.6.1/src/rdkafka_conf.c 2023-11-14 08:48:30.777285310 +0100
@@ -707,6 +707,7 @@ static const struct rd_kafka_property rd @@ -707,6 +707,7 @@ static const struct rd_kafka_property rd
"security settings for a network connection using TLS or SSL network " "security settings for a network connection using TLS or SSL network "
"protocol. See manual page for `ciphers(1)` and " "protocol. See manual page for `ciphers(1)` and "
"`SSL_CTX_set_cipher_list(3).", "`SSL_CTX_set_cipher_list(3).",
+ .sdef = "PROFILE=SYSTEM", + .sdef = "PROFILE=SYSTEM",
_UNSUPPORTED_SSL _UNSUPPORTED_SSL
}, },
{ _RK_GLOBAL, "ssl.curves.list", _RK_C_STR, { _RK_GLOBAL, "ssl.curves.list", _RK_C_STR,

130
SPECS/librdkafka.spec Normal file
View File

@ -0,0 +1,130 @@
Name: librdkafka
Version: 1.6.1
Release: 1%{?dist}
Summary: The Apache Kafka C library
Group: Development/Libraries
License: BSD
URL: https://github.com/edenhill/librdkafka
Source0: https://github.com/edenhill/librdkafka/archive/v%{version}.tar.gz#/%{name}-%{version}.tar.gz
BuildRequires: gcc
BuildRequires: gcc-c++
BuildRequires: python3
BuildRequires: libzstd-devel
BuildRequires: lz4-devel
BuildRequires: openssl-devel
BuildRequires: cyrus-sasl-devel
BuildRequires: zlib-devel
Patch1: rsyslog-1.6.1-rhbz1842817-crypto-compliance.patch
%description
Librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
containing both Producer and Consumer support.
It was designed with message delivery reliability and high performance in mind,
current figures exceed 800000 messages/second for the producer and 3 million
messages/second for the consumer.
%package devel
Summary: The Apache Kafka C library (Development Environment)
Group: Development/Libraries
Requires: %{name}%{?_isa} = %{version}-%{release}
%description devel
librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
containing both Producer and Consumer support.
This package contains headers and libraries required to build applications
using librdkafka.
%prep
%setup -q
%patch -P 1 -p1
%build
%configure --enable-zlib \
--enable-zstd \
--enable-lz4 \
--enable-lz4-ext \
--enable-ssl \
--enable-gssapi \
--enable-sasl
%make_build
%check
make check
%install
%make_install
find %{buildroot} -name '*.a' -delete -print
%ldconfig_scriptlets
%files
%{_libdir}/librdkafka.so.*
%{_libdir}/librdkafka++.so.*
%doc README.md CONFIGURATION.md INTRODUCTION.md STATISTICS.md CHANGELOG.md LICENSE LICENSES.txt
%license LICENSE LICENSE.pycrc LICENSE.snappy LICENSES.txt
%files devel
%dir %{_includedir}/librdkafka
%attr(0644,root,root) %{_includedir}/librdkafka/*
%{_libdir}/librdkafka.so
%{_libdir}/librdkafka++.so
%{_libdir}/pkgconfig/rdkafka.pc
%{_libdir}/pkgconfig/rdkafka++.pc
%{_libdir}/pkgconfig/rdkafka-static.pc
%{_libdir}/pkgconfig/rdkafka++-static.pc
%changelog
* Tue Nov 14 2023 Attila Lakatos <alakatos@redhat.com> - 1.6.1-1
- Rebase to 1.6.1
resolves: RHEL-12892
- Fix warnings reported by rpmlint
- Enable support for zlib/zstd compression and GSSAPI
* Mon Nov 01 2021 Attila Lakatos <alakatos@redhat.com> - 0.11.4-3
- Set SSL_CTX_set_cipher_list to use system-wide crypto policies
resolves: rhbz#1842817
* Mon Jun 03 2019 Radovan Sroka <rsroka@redhat.com> - 0.11.4-2
- rebuild
* Fri Feb 08 2019 Jiri Vymazal <jvymazal@redhat.com> - 0.11.4-1
- rebase to v0.11.4 (0.11.5 was breaking rsyslog-kafka)
resolves: rhbz#1614697
* Fri Aug 10 2018 Jiri Vymazal <jvymazal@redhat.com> - 0.11.5-1
- rebase to v0.11.5
resolves: rhbz#1614697
- removed explicit attr macro on symlinks
* Thu Jun 28 2018 Radovan Sroka <rsroka@redhat.com> - 0.11.0-2
- switch from python2 to python3
resolves: rhbz#1595795
* Thu Aug 31 2017 Michal Luscon <mluscon@gmail.com> - 0.11.0-1
- Update to 0.11.0
* Thu Aug 03 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-3
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Binutils_Mass_Rebuild
* Wed Jul 26 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Mass_Rebuild
* Mon May 22 2017 Radovan Sroka <rsroka@redhat.com> - 0.9.5-1
- Update to 0.9.4
* Sat Mar 11 2017 Michal Luscon <mluscon@gmail.com> - 0.9.4-1
- Update to 0.9.4
- enable lz4, ssl, sasl
* Fri Feb 10 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.2-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_26_Mass_Rebuild
* Fri Nov 11 2016 Radovan Sroka <rsroka@redhat.com> 0.9.2-1
- 0.9.2 release
- package created

View File

@ -1,6 +0,0 @@
--- !Policy
product_versions:
- rhel-9
decision_context: osci_compose_gate
rules:
- !PassingTestCaseRule {test_case_name: osci.brew-build.tier0.functional}

View File

@ -1,203 +0,0 @@
Name: librdkafka
Version: 1.6.1
Release: 102%{?dist}
Summary: The Apache Kafka C library
License: BSD
URL: https://github.com/edenhill/librdkafka
Source0: %{url}/archive/v%{version}/%{name}-%{version}.tar.gz
BuildRequires: gcc
BuildRequires: gcc-c++
BuildRequires: make
BuildRequires: python3
BuildRequires: libzstd-devel
BuildRequires: lz4-devel
BuildRequires: openssl-devel
BuildRequires: cyrus-sasl-devel
BuildRequires: zlib-devel
BuildRequires: rapidjson-devel
Patch0: rsyslog-1.6.1-rhbz2032923-crypto-compliance.patch
%description
Librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
containing both Producer and Consumer support.
It was designed with message delivery reliability and high performance in mind,
current figures exceed 800000 messages/second for the producer and 3 million
messages/second for the consumer.
%package devel
Summary: The Apache Kafka C library (Development Environment)
Requires: %{name}%{?_isa} = %{version}-%{release}
%description devel
librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
containing both Producer and Consumer support.
This package contains headers and libraries required to build applications
using librdkafka.
%prep
%autosetup -p1
%build
# This package has a configure test which uses ASMs, but does not link the
# resultant .o files. As such the ASM test is always successful, even on
# architectures were the ASM is not valid when compiling with LTO.
#
# -ffat-lto-objects is sufficient to address this issue. It is the default
# for F33, but is expected to only be enabled for packages that need it in
# F34, so we use it here explicitly
%define _lto_cflags -flto=auto -ffat-lto-objects
%configure \
--enable-zlib \
--enable-zstd \
--enable-lz4 \
--enable-lz4-ext \
--enable-ssl \
--enable-gssapi \
--enable-sasl
%make_build
%check
make check
%install
%make_install
find %{buildroot} -name '*.a' -delete -print
find %{buildroot} -name '*-static.pc' -delete -print
%ldconfig_scriptlets
%files
%{_libdir}/librdkafka.so.*
%{_libdir}/librdkafka++.so.*
%doc README.md CONFIGURATION.md INTRODUCTION.md LICENSE LICENSES.txt STATISTICS.md CHANGELOG.md
%license LICENSE LICENSE.pycrc LICENSE.snappy
%files devel
%dir %{_includedir}/librdkafka
%attr(0644,root,root) %{_includedir}/librdkafka/*
%attr(0755,root,root) %{_libdir}/librdkafka.so
%attr(0755,root,root) %{_libdir}/librdkafka++.so
%{_libdir}/pkgconfig/rdkafka.pc
%{_libdir}/pkgconfig/rdkafka++.pc
%changelog
* Tue Feb 08 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-102
- Changes for tests to compile and run appropriately
Related: rhbz#2032923
* Mon Feb 07 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-101
- Add missing tests
Related: rhbz#2032923
* Fri Feb 04 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-100
- Fix for rpmlint reporting crypto-policy-non-compliance-openssl
resolves: rhbz#2032923
* Mon Aug 09 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-4
- Rebuilt for IMA sigs, glibc 2.34, aarch64 flags
Related: rhbz#1991688
* Wed Jun 16 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-3
- Rebuilt for RHEL 9 BETA for openssl 3.0
Related: rhbz#1971065
* Fri Apr 16 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-2
- Rebuilt for RHEL 9 BETA on Apr 15th 2021. Related: rhbz#1947937
* Mon Mar 08 2021 Attila Lakatos <alakatos@redhat.com> - 1.6.1-1
- Update to upstream 1.6.1
resolves: rhbz#1932286
* Wed Feb 03 2021 Neal Gompa <ngompa@datto.com> - 1.6.0-1
- Update to upstream 1.6.0
resolves: rhbz#1883910
- Enable all missing features
- Fix linking to external lz4 library
- Minor spec cleanups
* Tue Jan 26 2021 Fedora Release Engineering <releng@fedoraproject.org> - 1.5.0-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_34_Mass_Rebuild
* Wed Sep 09 2020 Zoltan Fridrich <zfridric@redhat.com> - 1.5.0-1
- Update to upstream 1.5.0
resolves: rhbz#1818082
* Wed Sep 09 2020 Zoltan Fridrich <zfridric@redhat.com> - 1.3.0-6
- Switch BuildRequires from python2 to python3
resolves: rhbz#1808329
* Fri Aug 21 2020 Jeff Law <law@redhat.com> - 1.3.0-5
- Re-enable LTO
* Tue Jul 28 2020 Fedora Release Engineering <releng@fedoraproject.org> - 1.3.0-4
- Rebuilt for https://fedoraproject.org/wiki/Fedora_33_Mass_Rebuild
* Tue Jun 30 2020 Jeff Law <law@redhat.com> - 1.3.0-3
- Disable LTO
* Wed Jan 29 2020 Fedora Release Engineering <releng@fedoraproject.org> - 1.3.0-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_32_Mass_Rebuild
* Mon Dec 30 2019 Michal Luscon <mluscon@gmail.com> - 1.3.0-1
- Update to upstream 1.3.0
* Thu Jul 25 2019 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.6-3
- Rebuilt for https://fedoraproject.org/wiki/Fedora_31_Mass_Rebuild
* Fri Feb 01 2019 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.6-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_30_Mass_Rebuild
* Wed Dec 12 2018 Javier Peña <jpena@redhat.com> - 0.11.6-1
- Update to upstream 0.11.6
* Mon Sep 17 2018 Michal Luscon <mluscon@gmail.com> - 0.11.5-1
- Update to upstream 0.11.5
* Fri Jul 13 2018 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.4-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_29_Mass_Rebuild
* Fri Apr 20 2018 Michal Luscon <mluscon@gmail.com> - 0.11.4-1
- Update to upstream 0.11.4
* Thu Mar 15 2018 Iryna Shcherbina <ishcherb@redhat.com> - 0.11.3-3
- Update Python 2 dependency declarations to new packaging standards
(See https://fedoraproject.org/wiki/FinalizingFedoraSwitchtoPython3)
* Wed Feb 07 2018 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.3-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_28_Mass_Rebuild
* Tue Jan 09 2018 Michal Luscon <mluscon@gmail.com> - 0.11.3-1
- Update to upstream 0.11.3
* Thu Nov 02 2017 Michal Luscon <mluscon@gmail.com> - 0.11.1-1
- Update to upstream 0.11.1
* Thu Aug 31 2017 Michal Luscon <mluscon@gmail.com> - 0.11.0-1
- Update to 0.11.0
* Thu Aug 03 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-3
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Binutils_Mass_Rebuild
* Wed Jul 26 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Mass_Rebuild
* Mon May 22 2017 Radovan Sroka <rsroka@redhat.com> - 0.9.5-1
- Update to 0.9.4
* Sat Mar 11 2017 Michal Luscon <mluscon@gmail.com> - 0.9.4-1
- Update to 0.9.4
- enable lz4, ssl, sasl
* Fri Feb 10 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.2-2
- Rebuilt for https://fedoraproject.org/wiki/Fedora_26_Mass_Rebuild
* Fri Nov 11 2016 Radovan Sroka <rsroka@redhat.com> 0.9.2-1
- 0.9.2 release
- package created

View File

@ -1 +0,0 @@
SHA512 (librdkafka-1.6.1.tar.gz) = 19f64f275c7cd1c60f026a466c79021549e4acced60e6c01b364944ddb2f4a2c0784ab35031275c406b638a14b958c6f904177e51e2fcb4d058c541d046677dc

View File

@ -1 +0,0 @@
1

View File

@ -1,822 +0,0 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka consumer & producer example programs
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include <librdkafka/rdkafka.h> /* for Kafka driver */
const char * USAGE_STR ="Usage: %s -C|-P|-L -t <topic> "
"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
"\n"
"librdkafka version %s (0x%08x)\n"
"\n"
" Options:\n"
" -C | -P Consumer or Producer mode\n"
" -L Metadata list mode\n"
" -t <topic> Topic to fetch / produce\n"
" -p <num> Partition (random partitioner)\n"
" -b <brokers> Broker address (localhost:9092)\n"
" -z <codec> Enable compression:\n"
" none|gzip|snappy\n"
" -o <offset> Start offset (consumer):\n"
" beginning, end, NNNNN or -NNNNN\n"
" wmark returns the current hi&lo "
"watermarks.\n"
" -o report Report message offsets (producer)\n"
" -e Exit consumer when last message\n"
" in partition has been received.\n"
" -d [facs..] Enable debugging contexts:\n"
" %s\n"
" -q Be quiet\n"
" -A Raw payload output (consumer)\n"
" -X <prop=name> Set arbitrary librdkafka "
"configuration property\n"
" Properties prefixed with \"topic.\" "
"will be set on topic object.\n"
" -X list Show full list of supported "
"properties.\n"
" -X <prop> Get single property value\n"
"\n"
" In Consumer mode:\n"
" writes fetched messages to stdout\n"
" In Producer mode:\n"
" reads messages from stdin and sends to broker\n"
" In List mode:\n"
" queries broker for metadata information, "
"topic is optional.\n"
"\n"
"\n"
"\n";
static int run = 1;
static rd_kafka_t *rk;
static int exit_eof = 0;
static int quiet = 0;
static enum {
OUTPUT_HEXDUMP,
OUTPUT_RAW,
} output = OUTPUT_HEXDUMP;
static void stop (int sig) {
run = 0;
fclose(stdin); /* abort fgets() */
}
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
const char *p = (const char *)ptr;
size_t of = 0;
if (name)
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
for (of = 0 ; of < len ; of += 16) {
char hexen[16*3+1];
char charen[16+1];
int hof = 0;
int cof = 0;
int i;
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
cof += sprintf(charen+cof, "%c",
isprint((int)p[i]) ? p[i] : '.');
}
fprintf(fp, "%08zx: %-48s %-16s\n",
of, hexen, charen);
}
}
/**
* Kafka logger callback (optional)
*/
static void logger (const rd_kafka_t *rk, int level,
const char *fac, const char *buf) {
struct timeval tv;
gettimeofday(&tv, NULL);
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
/**
* Message delivery report callback.
* Called once for each message.
* See rdkafka.h for more information.
*/
static void msg_delivered (rd_kafka_t *rk,
void *payload, size_t len,
int error_code,
void *opaque, void *msg_opaque) {
if (error_code)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(error_code));
else if (!quiet)
fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
(int)len, (const char *)payload);
}
/**
* Message delivery report callback using the richer rd_kafka_message_t object.
*/
static void msg_delivered2 (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
printf("del: %s: offset %"PRId64"\n",
rd_kafka_err2str(rkmessage->err), rkmessage->offset);
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else if (!quiet)
fprintf(stderr,
"%% Message delivered (%zd bytes, offset %"PRId64", "
"partition %"PRId32"): %.*s\n",
rkmessage->len, rkmessage->offset,
rkmessage->partition,
(int)rkmessage->len, (const char *)rkmessage->payload);
}
static void msg_consume (rd_kafka_message_t *rkmessage,
void *opaque) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
if (exit_eof)
run = 0;
return;
}
fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
"offset %"PRId64": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
run = 0;
return;
}
if (!quiet) {
rd_kafka_timestamp_type_t tstype;
int64_t timestamp;
fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
rkmessage->offset, rkmessage->len);
timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
const char *tsname = "?";
if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
tsname = "create time";
else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
tsname = "log append time";
fprintf(stdout, "%% Message timestamp: %s %"PRId64
" (%ds ago)\n",
tsname, timestamp,
!timestamp ? 0 :
(int)time(NULL) - (int)(timestamp/1000));
}
}
if (rkmessage->key_len) {
if (output == OUTPUT_HEXDUMP)
hexdump(stdout, "Message Key",
rkmessage->key, rkmessage->key_len);
else
printf("Key: %.*s\n",
(int)rkmessage->key_len, (char *)rkmessage->key);
}
if (output == OUTPUT_HEXDUMP)
hexdump(stdout, "Message Payload",
rkmessage->payload, rkmessage->len);
else
printf("%.*s\n",
(int)rkmessage->len, (char *)rkmessage->payload);
}
static void metadata_print (const char *topic,
const struct rd_kafka_metadata *metadata) {
int i, j, k;
printf("Metadata for %s (from broker %"PRId32": %s):\n",
topic ? : "all topics",
metadata->orig_broker_id,
metadata->orig_broker_name);
/* Iterate brokers */
printf(" %i brokers:\n", metadata->broker_cnt);
for (i = 0 ; i < metadata->broker_cnt ; i++)
printf(" broker %"PRId32" at %s:%i\n",
metadata->brokers[i].id,
metadata->brokers[i].host,
metadata->brokers[i].port);
/* Iterate topics */
printf(" %i topics:\n", metadata->topic_cnt);
for (i = 0 ; i < metadata->topic_cnt ; i++) {
const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
printf(" topic \"%s\" with %i partitions:",
t->topic,
t->partition_cnt);
if (t->err) {
printf(" %s", rd_kafka_err2str(t->err));
if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
printf(" (try again)");
}
printf("\n");
/* Iterate topic's partitions */
for (j = 0 ; j < t->partition_cnt ; j++) {
const struct rd_kafka_metadata_partition *p;
p = &t->partitions[j];
printf(" partition %"PRId32", "
"leader %"PRId32", replicas: ",
p->id, p->leader);
/* Iterate partition's replicas */
for (k = 0 ; k < p->replica_cnt ; k++)
printf("%s%"PRId32,
k > 0 ? ",":"", p->replicas[k]);
/* Iterate partition's ISRs */
printf(", isrs: ");
for (k = 0 ; k < p->isr_cnt ; k++)
printf("%s%"PRId32,
k > 0 ? ",":"", p->isrs[k]);
if (p->err)
printf(", %s\n", rd_kafka_err2str(p->err));
else
printf("\n");
}
}
}
static void sig_usr1 (int sig) {
rd_kafka_dump(stdout, rk);
}
int main (int argc, char **argv) {
rd_kafka_topic_t *rkt;
char *brokers = "localhost:9092";
char mode = 'C';
char *topic = NULL;
int partition = RD_KAFKA_PARTITION_UA;
int opt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
int64_t start_offset = 0;
int report_offsets = 0;
int do_conf_dump = 0;
char tmp[16];
int64_t seek_offset = 0;
int64_t tmp_offset = 0;
int get_wmarks = 0;
/* Kafka configuration */
conf = rd_kafka_conf_new();
/* Set logger */
rd_kafka_conf_set_log_cb(conf, logger);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();
while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:As:")) != -1) {
switch (opt) {
case 'P':
case 'C':
case 'L':
mode = opt;
break;
case 't':
topic = optarg;
break;
case 'p':
partition = atoi(optarg);
break;
case 'b':
brokers = optarg;
break;
case 'z':
if (rd_kafka_conf_set(conf, "compression.codec",
optarg,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
break;
case 'o':
case 's':
if (!strcmp(optarg, "end"))
tmp_offset = RD_KAFKA_OFFSET_END;
else if (!strcmp(optarg, "beginning"))
tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
else if (!strcmp(optarg, "stored"))
tmp_offset = RD_KAFKA_OFFSET_STORED;
else if (!strcmp(optarg, "report"))
report_offsets = 1;
else if (!strcmp(optarg, "wmark"))
get_wmarks = 1;
else {
tmp_offset = strtoll(optarg, NULL, 10);
if (tmp_offset < 0)
tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
}
if (opt == 'o')
start_offset = tmp_offset;
else if (opt == 's')
seek_offset = tmp_offset;
break;
case 'e':
exit_eof = 1;
break;
case 'd':
if (rd_kafka_conf_set(conf, "debug", optarg,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr,
"%% Debug configuration failed: "
"%s: %s\n",
errstr, optarg);
exit(1);
}
break;
case 'q':
quiet = 1;
break;
case 'A':
output = OUTPUT_RAW;
break;
case 'X':
{
char *name, *val;
rd_kafka_conf_res_t res;
if (!strcmp(optarg, "list") ||
!strcmp(optarg, "help")) {
rd_kafka_conf_properties_show(stdout);
exit(0);
}
if (!strcmp(optarg, "dump")) {
do_conf_dump = 1;
continue;
}
name = optarg;
if (!(val = strchr(name, '='))) {
char dest[512];
size_t dest_size = sizeof(dest);
/* Return current value for property. */
res = RD_KAFKA_CONF_UNKNOWN;
if (!strncmp(name, "topic.", strlen("topic.")))
res = rd_kafka_topic_conf_get(
topic_conf,
name+strlen("topic."),
dest, &dest_size);
if (res == RD_KAFKA_CONF_UNKNOWN)
res = rd_kafka_conf_get(
conf, name, dest, &dest_size);
if (res == RD_KAFKA_CONF_OK) {
printf("%s = %s\n", name, dest);
exit(0);
} else {
fprintf(stderr,
"%% %s property\n",
res == RD_KAFKA_CONF_UNKNOWN ?
"Unknown" : "Invalid");
exit(1);
}
}
*val = '\0';
val++;
res = RD_KAFKA_CONF_UNKNOWN;
/* Try "topic." prefixed properties on topic
* conf first, and then fall through to global if
* it didnt match a topic configuration property. */
if (!strncmp(name, "topic.", strlen("topic.")))
res = rd_kafka_topic_conf_set(topic_conf,
name+
strlen("topic."),
val,
errstr,
sizeof(errstr));
if (res == RD_KAFKA_CONF_UNKNOWN)
res = rd_kafka_conf_set(conf, name, val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
}
break;
case 'h':
fprintf(stdout,
USAGE_STR,
argv[0],
rd_kafka_version_str(), rd_kafka_version(),
RD_KAFKA_DEBUG_CONTEXTS);
exit(0);
break;
default:
goto usage;
}
}
if (do_conf_dump) {
const char **arr;
size_t cnt;
int pass;
for (pass = 0 ; pass < 2 ; pass++) {
int i;
if (pass == 0) {
arr = rd_kafka_conf_dump(conf, &cnt);
printf("# Global config\n");
} else {
printf("# Topic config\n");
arr = rd_kafka_topic_conf_dump(topic_conf,
&cnt);
}
for (i = 0 ; i < (int)cnt ; i += 2)
printf("%s = %s\n",
arr[i], arr[i+1]);
printf("\n");
rd_kafka_conf_dump_free(arr, cnt);
}
exit(0);
}
if (optind != argc || (mode != 'L' && !topic)) {
usage:
fprintf(stderr,
USAGE_STR,
argv[0],
rd_kafka_version_str(), rd_kafka_version(),
RD_KAFKA_DEBUG_CONTEXTS);
exit(1);
}
if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
(mode != 'C' && !isatty(STDOUT_FILENO)))
quiet = 1;
signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1);
if (mode == 'P') {
/*
* Producer
*/
char buf[2048];
int sendcnt = 0;
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
/* If offset reporting (-o report) is enabled, use the
* richer dr_msg_cb instead. */
if (report_offsets) {
rd_kafka_topic_conf_set(topic_conf,
"produce.offset.report",
"true", errstr, sizeof(errstr));
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
} else
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
topic_conf = NULL; /* Now owned by topic */
if (!quiet)
fprintf(stderr,
"%% Type stuff and hit enter to send\n");
while (run && fgets(buf, sizeof(buf), stdin)) {
size_t len = strlen(buf);
if (buf[len-1] == '\n')
buf[--len] = '\0';
/* Send/Produce message. */
if (rd_kafka_produce(rkt, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
fprintf(stderr,
"%% Failed to produce to topic %s "
"partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
continue;
}
if (!quiet)
fprintf(stderr, "%% Sent %zd bytes to topic "
"%s partition %i\n",
len, rd_kafka_topic_name(rkt), partition);
sendcnt++;
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
}
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
/* Wait for messages to be delivered */
while (run && rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 100);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy the handle */
rd_kafka_destroy(rk);
} else if (mode == 'C') {
/*
* Consumer
*/
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new consumer: %s\n",
errstr);
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
if (get_wmarks) {
int64_t lo, hi;
rd_kafka_resp_err_t err;
/* Only query for hi&lo partition watermarks */
if ((err = rd_kafka_query_watermark_offsets(
rk, topic, partition, &lo, &hi, 5000))) {
fprintf(stderr, "%% query_watermark_offsets() "
"failed: %s\n",
rd_kafka_err2str(err));
exit(1);
}
printf("%s [%d]: low - high offsets: "
"%"PRId64" - %"PRId64"\n",
topic, partition, lo, hi);
rd_kafka_destroy(rk);
exit(0);
}
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
topic_conf = NULL; /* Now owned by topic */
/* Start consuming */
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
rd_kafka_resp_err_t err = rd_kafka_last_error();
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
fprintf(stderr,
"%% Broker based offset storage "
"requires a group.id, "
"add: -X group.id=yourGroup\n");
exit(1);
}
while (run) {
rd_kafka_message_t *rkmessage;
rd_kafka_resp_err_t err;
/* Poll for errors, etc. */
rd_kafka_poll(rk, 0);
/* Consume single message.
* See rdkafka_performance.c for high speed
* consuming of messages. */
rkmessage = rd_kafka_consume(rkt, partition, 1000);
if (!rkmessage) /* timeout */
continue;
msg_consume(rkmessage, NULL);
/* Return message to rdkafka */
rd_kafka_message_destroy(rkmessage);
if (seek_offset) {
err = rd_kafka_seek(rkt, partition, seek_offset,
2000);
if (err)
printf("Seek failed: %s\n",
rd_kafka_err2str(err));
else
printf("Seeked to %"PRId64"\n",
seek_offset);
seek_offset = 0;
}
}
/* Stop consuming */
rd_kafka_consume_stop(rkt, partition);
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 10);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy handle */
rd_kafka_destroy(rk);
} else if (mode == 'L') {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
/* Create topic */
if (topic) {
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
topic_conf = NULL; /* Now owned by topic */
} else
rkt = NULL;
while (run) {
const struct rd_kafka_metadata *metadata;
/* Fetch metadata */
err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
&metadata, 5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr,
"%% Failed to acquire metadata: %s\n",
rd_kafka_err2str(err));
run = 0;
break;
}
metadata_print(topic, metadata);
rd_kafka_metadata_destroy(metadata);
run = 0;
}
/* Destroy topic */
if (rkt)
rd_kafka_topic_destroy(rkt);
/* Destroy the handle */
rd_kafka_destroy(rk);
if (topic_conf)
rd_kafka_topic_conf_destroy(topic_conf);
/* Exit right away, dont wait for background cleanup, we haven't
* done anything important anyway. */
exit(err ? 2 : 0);
}
if (topic_conf)
rd_kafka_topic_conf_destroy(topic_conf);
/* Let background threads clean up and terminate cleanly. */
run = 5;
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
printf("Waiting for librdkafka to decommission\n");
if (run <= 0)
rd_kafka_dump(stdout, rk);
return 0;
}

View File

@ -1,662 +0,0 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2014, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka consumer & producer example programs
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#ifdef _MSC_VER
#include "../win32/wingetopt.h"
#elif _AIX
#include <unistd.h>
#else
#include <getopt.h>
#endif
/*
* Typically include path in a real application would be
* #include <librdkafka/rdkafkacpp.h>
*/
#include <librdkafka/rdkafkacpp.h>
const char * USAGE_STR =
"Usage: %s [-C|-P] -t <topic> "
"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
"\n"
"librdkafka version %s (0x%08x, builtin.features \"%s\")\n"
"\n"
" Options:\n"
" -C | -P Consumer or Producer mode\n"
" -L Metadata list mode\n"
" -t <topic> Topic to fetch / produce\n"
" -p <num> Partition (random partitioner)\n"
" -p <func> Use partitioner:\n"
" random (default), hash\n"
" -b <brokers> Broker address (localhost:9092)\n"
" -z <codec> Enable compression:\n"
" none|gzip|snappy\n"
" -o <offset> Start offset (consumer)\n"
" -e Exit consumer when last message\n"
" in partition has been received.\n"
" -d [facs..] Enable debugging contexts:\n"
" %s\n"
" -M <intervalms> Enable statistics\n"
" -X <prop=name> Set arbitrary librdkafka "
"configuration property\n"
" Properties prefixed with \"topic.\" "
"will be set on topic object.\n"
" Use '-X list' to see the full list\n"
" of supported properties.\n"
" -f <flag> Set option:\n"
" ccb - use consume_callback\n"
"\n"
" In Consumer mode:\n"
" writes fetched messages to stdout\n"
" In Producer mode:\n"
" reads messages from stdin and sends to broker\n"
"\n"
"\n"
"\n";
static void metadata_print (const std::string &topic,
const RdKafka::Metadata *metadata) {
std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
<< "(from broker " << metadata->orig_broker_id()
<< ":" << metadata->orig_broker_name() << std::endl;
/* Iterate brokers */
std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
RdKafka::Metadata::BrokerMetadataIterator ib;
for (ib = metadata->brokers()->begin();
ib != metadata->brokers()->end();
++ib) {
std::cout << " broker " << (*ib)->id() << " at "
<< (*ib)->host() << ":" << (*ib)->port() << std::endl;
}
/* Iterate topics */
std::cout << metadata->topics()->size() << " topics:" << std::endl;
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin();
it != metadata->topics()->end();
++it) {
std::cout << " topic \""<< (*it)->topic() << "\" with "
<< (*it)->partitions()->size() << " partitions:";
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::cout << " " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE)
std::cout << " (try again)";
}
std::cout << std::endl;
/* Iterate topic's partitions */
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin();
ip != (*it)->partitions()->end();
++ip) {
std::cout << " partition " << (*ip)->id()
<< ", leader " << (*ip)->leader()
<< ", replicas: ";
/* Iterate partition's replicas */
RdKafka::PartitionMetadata::ReplicasIterator ir;
for (ir = (*ip)->replicas()->begin();
ir != (*ip)->replicas()->end();
++ir) {
std::cout << (ir == (*ip)->replicas()->begin() ? "":",") << *ir;
}
/* Iterate partition's ISRs */
std::cout << ", isrs: ";
RdKafka::PartitionMetadata::ISRSIterator iis;
for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
std::cout << (iis == (*ip)->isrs()->begin() ? "":",") << *iis;
if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
else
std::cout << std::endl;
}
}
}
static bool run = true;
static bool exit_eof = false;
static void sigterm (int sig) {
run = false;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
/* Use of this partitioner is pretty pointless since no key is provided
* in the produce() call. */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque) {
return djb_hash(key->c_str(), key->size()) % partition_cnt;
}
private:
static inline unsigned int djb_hash (const char *str, size_t len) {
unsigned int hash = 5381;
for (size_t i = 0 ; i < len ; i++)
hash = ((hash << 5) + hash) + str[i];
return hash;
}
};
void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof) {
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
int main (int argc, char **argv) {
std::string brokers = "localhost";
std::string errstr;
std::string topic_str;
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:AM:f:")) != -1) {
switch (opt) {
case 'P':
case 'C':
case 'L':
mode = opt;
break;
case 't':
topic_str = optarg;
break;
case 'p':
if (!strcmp(optarg, "random"))
/* default */;
else if (!strcmp(optarg, "hash")) {
if (tconf->set("partitioner_cb", &hash_partitioner, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
} else
partition = std::atoi(optarg);
break;
case 'b':
brokers = optarg;
break;
case 'z':
if (conf->set("compression.codec", optarg, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
break;
case 'o':
if (!strcmp(optarg, "end"))
start_offset = RdKafka::Topic::OFFSET_END;
else if (!strcmp(optarg, "beginning"))
start_offset = RdKafka::Topic::OFFSET_BEGINNING;
else if (!strcmp(optarg, "stored"))
start_offset = RdKafka::Topic::OFFSET_STORED;
else
start_offset = strtoll(optarg, NULL, 10);
break;
case 'e':
exit_eof = true;
break;
case 'd':
debug = optarg;
break;
case 'M':
if (conf->set("statistics.interval.ms", optarg, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
break;
case 'X':
{
char *name, *val;
if (!strcmp(optarg, "dump")) {
do_conf_dump = true;
continue;
}
name = optarg;
if (!(val = strchr(name, '='))) {
std::cerr << "%% Expected -X property=value, not " <<
name << std::endl;
exit(1);
}
*val = '\0';
val++;
/* Try "topic." prefixed properties on topic
* conf first, and then fall through to global if
* it didnt match a topic configuration property. */
RdKafka::Conf::ConfResult res;
if (!strncmp(name, "topic.", strlen("topic.")))
res = tconf->set(name+strlen("topic."), val, errstr);
else
res = conf->set(name, val, errstr);
if (res != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
}
break;
case 'f':
if (!strcmp(optarg, "ccb"))
use_ccb = 1;
else {
std::cerr << "Unknown option: " << optarg << std::endl;
exit(1);
}
break;
case 'h':
{
std::string features;
conf->get("builtin.features", features);
fprintf(stdout,
USAGE_STR,
argv[0],
RdKafka::version_str().c_str(), RdKafka::version(),
features.c_str(),
RdKafka::get_debug_contexts().c_str());
exit(0);
}
break;
default:
goto usage;
}
}
if (mode.empty() || (topic_str.empty() && mode != "L") || optind != argc) {
usage:
std::string features;
conf->get("builtin.features", features);
fprintf(stderr,
USAGE_STR,
argv[0],
RdKafka::version_str().c_str(), RdKafka::version(),
features.c_str(),
RdKafka::get_debug_contexts().c_str());
exit(1);
}
/*
* Set configuration properties
*/
conf->set("metadata.broker.list", brokers, errstr);
if (!debug.empty()) {
if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
}
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
if (do_conf_dump) {
int pass;
for (pass = 0 ; pass < 2 ; pass++) {
std::list<std::string> *dump;
if (pass == 0) {
dump = conf->dump();
std::cout << "# Global config" << std::endl;
} else {
dump = tconf->dump();
std::cout << "# Topic config" << std::endl;
}
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end(); ) {
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}
exit(0);
}
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
if (mode == "P") {
/*
* Producer mode
*/
if(topic_str.empty())
goto usage;
ExampleDeliveryReportCb ex_dr_cb;
/* Set delivery report callback */
conf->set("dr_cb", &ex_dr_cb, errstr);
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Read messages from stdin and produce to broker.
*/
for (std::string line; run && std::getline(std::cin, line);) {
if (line.empty()) {
producer->poll(0);
continue;
}
/*
* Produce message
*/
RdKafka::ErrorCode resp =
producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(line.c_str()), line.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "% Produce failed: " <<
RdKafka::err2str(resp) << std::endl;
else
std::cerr << "% Produced message (" << line.size() << " bytes)" <<
std::endl;
producer->poll(0);
}
run = true;
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
delete topic;
delete producer;
} else if (mode == "C") {
/*
* Consumer mode
*/
if(topic_str.empty())
goto usage;
/*
* Create consumer using accumulated global configuration.
*/
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str,
tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Start consumer for topic+partition at start offset
*/
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
/*
* Consume messages
*/
while (run) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000,
&ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
} else {
/* Metadata mode */
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if(!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
while (run) {
class RdKafka::Metadata *metadata;
/* Fetch metadata */
RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic,
&metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "%% Failed to acquire metadata: "
<< RdKafka::err2str(err) << std::endl;
run = 0;
break;
}
metadata_print(topic_str, metadata);
delete metadata;
run = 0;
}
}
/*
* Wait for RdKafka to decommission.
* This is not strictly needed (when check outq_len() above), but
* allows RdKafka to clean up all its resources before the application
* exits so that memory profilers such as valgrind wont complain about
* memory leaks.
*/
RdKafka::wait_destroyed(5000);
return 0;
}

View File

@ -1,37 +0,0 @@
#!/bin/bash
set -e
set -x
TEST_SOURCE=kafka-test
C_TEST_TARGET="${TEST_SOURCE}.c"
CXX_TEST_TARGET="${TEST_SOURCE}.cpp"
C_BINARY="${TEST_SOURCE}-c"
CXX_BINARY="${TEST_SOURCE}-cpp"
CFLAGS="$(rpm --eval '%{build_cflags}')"
CXXFLAGS="$(rpm --eval '%{build_cxxflags}')"
LDFLAGS="$(rpm --eval '%{build_ldflags}')"
LIBCFLAGS="$(pkg-config rdkafka --libs --cflags)"
LIBCXXFLAGS="$(pkg-config rdkafka++ --libs --cflags)"
# build target using distribution-specific flags
gcc -std=c11 $CFLAGS $LDFLAGS -o $C_BINARY $C_TEST_TARGET $LIBCFLAGS
g++ -std=c++11 $CXXFLAGS $LDFLAGS -o $CXX_BINARY $CXX_TEST_TARGET $LIBCXXFLAGS
# test that target exists
test -f ./$C_BINARY
test -f ./$CXX_BINARY
# test that target is executable
test -x ./$C_BINARY
test -x ./$CXX_BINARY
# test that target runs successfully
./$C_BINARY -h
./$CXX_BINARY -h

View File

@ -1,14 +0,0 @@
---
- hosts: localhost
tags:
- classic
roles:
- role: standard-test-basic
tests:
- devel-usability
required_packages:
- gcc-c++
- pkgconf-pkg-config
- rpm
- redhat-rpm-config
- librdkafka-devel