Compare commits
No commits in common. "c9s" and "c8" have entirely different histories.
14
.gitignore
vendored
14
.gitignore
vendored
@ -1,13 +1 @@
|
|||||||
/librdkafka-0.9.2.tar.gz
|
SOURCES/librdkafka-1.6.1.tar.gz
|
||||||
/librdkafka-0.9.4.tar.gz
|
|
||||||
/librdkafka-0.9.5.tar.gz
|
|
||||||
/librdkafka-0.11.0.tar.gz
|
|
||||||
/librdkafka-0.11.1.tar.gz
|
|
||||||
/librdkafka-0.11.3.tar.gz
|
|
||||||
/librdkafka-0.11.4.tar.gz
|
|
||||||
/librdkafka-0.11.5.tar.gz
|
|
||||||
/librdkafka-0.11.6.tar.gz
|
|
||||||
/librdkafka-1.3.0.tar.gz
|
|
||||||
/librdkafka-1.5.0.tar.gz
|
|
||||||
/librdkafka-1.6.0.tar.gz
|
|
||||||
/librdkafka-1.6.1.tar.gz
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
diff -up librdkafka-1.6.1/src/rdkafka_conf.c.orig librdkafka-1.6.1/src/rdkafka_conf.c
|
diff -up librdkafka-1.6.1/src/rdkafka_conf.c.orig librdkafka-1.6.1/src/rdkafka_conf.c
|
||||||
--- librdkafka-1.6.1/src/rdkafka_conf.c.orig 2022-02-03 12:07:59.066520253 +0100
|
--- librdkafka-1.6.1/src/rdkafka_conf.c.orig 2023-11-14 08:47:54.294933845 +0100
|
||||||
+++ librdkafka-1.6.1/src/rdkafka_conf.c 2022-02-03 12:18:06.477192751 +0100
|
+++ librdkafka-1.6.1/src/rdkafka_conf.c 2023-11-14 08:48:30.777285310 +0100
|
||||||
@@ -707,6 +707,7 @@ static const struct rd_kafka_property rd
|
@@ -707,6 +707,7 @@ static const struct rd_kafka_property rd
|
||||||
"security settings for a network connection using TLS or SSL network "
|
"security settings for a network connection using TLS or SSL network "
|
||||||
"protocol. See manual page for `ciphers(1)` and "
|
"protocol. See manual page for `ciphers(1)` and "
|
130
SPECS/librdkafka.spec
Normal file
130
SPECS/librdkafka.spec
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
Name: librdkafka
|
||||||
|
Version: 1.6.1
|
||||||
|
Release: 1%{?dist}
|
||||||
|
Summary: The Apache Kafka C library
|
||||||
|
|
||||||
|
Group: Development/Libraries
|
||||||
|
License: BSD
|
||||||
|
URL: https://github.com/edenhill/librdkafka
|
||||||
|
Source0: https://github.com/edenhill/librdkafka/archive/v%{version}.tar.gz#/%{name}-%{version}.tar.gz
|
||||||
|
|
||||||
|
BuildRequires: gcc
|
||||||
|
BuildRequires: gcc-c++
|
||||||
|
BuildRequires: python3
|
||||||
|
BuildRequires: libzstd-devel
|
||||||
|
BuildRequires: lz4-devel
|
||||||
|
BuildRequires: openssl-devel
|
||||||
|
BuildRequires: cyrus-sasl-devel
|
||||||
|
BuildRequires: zlib-devel
|
||||||
|
|
||||||
|
Patch1: rsyslog-1.6.1-rhbz1842817-crypto-compliance.patch
|
||||||
|
|
||||||
|
%description
|
||||||
|
Librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
|
||||||
|
containing both Producer and Consumer support.
|
||||||
|
It was designed with message delivery reliability and high performance in mind,
|
||||||
|
current figures exceed 800000 messages/second for the producer and 3 million
|
||||||
|
messages/second for the consumer.
|
||||||
|
|
||||||
|
%package devel
|
||||||
|
Summary: The Apache Kafka C library (Development Environment)
|
||||||
|
Group: Development/Libraries
|
||||||
|
Requires: %{name}%{?_isa} = %{version}-%{release}
|
||||||
|
|
||||||
|
%description devel
|
||||||
|
librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
|
||||||
|
containing both Producer and Consumer support.
|
||||||
|
This package contains headers and libraries required to build applications
|
||||||
|
using librdkafka.
|
||||||
|
|
||||||
|
%prep
|
||||||
|
%setup -q
|
||||||
|
|
||||||
|
%patch -P 1 -p1
|
||||||
|
|
||||||
|
%build
|
||||||
|
%configure --enable-zlib \
|
||||||
|
--enable-zstd \
|
||||||
|
--enable-lz4 \
|
||||||
|
--enable-lz4-ext \
|
||||||
|
--enable-ssl \
|
||||||
|
--enable-gssapi \
|
||||||
|
--enable-sasl
|
||||||
|
|
||||||
|
%make_build
|
||||||
|
|
||||||
|
%check
|
||||||
|
make check
|
||||||
|
|
||||||
|
%install
|
||||||
|
%make_install
|
||||||
|
find %{buildroot} -name '*.a' -delete -print
|
||||||
|
|
||||||
|
%ldconfig_scriptlets
|
||||||
|
|
||||||
|
%files
|
||||||
|
%{_libdir}/librdkafka.so.*
|
||||||
|
%{_libdir}/librdkafka++.so.*
|
||||||
|
%doc README.md CONFIGURATION.md INTRODUCTION.md STATISTICS.md CHANGELOG.md LICENSE LICENSES.txt
|
||||||
|
%license LICENSE LICENSE.pycrc LICENSE.snappy LICENSES.txt
|
||||||
|
|
||||||
|
%files devel
|
||||||
|
%dir %{_includedir}/librdkafka
|
||||||
|
%attr(0644,root,root) %{_includedir}/librdkafka/*
|
||||||
|
%{_libdir}/librdkafka.so
|
||||||
|
%{_libdir}/librdkafka++.so
|
||||||
|
%{_libdir}/pkgconfig/rdkafka.pc
|
||||||
|
%{_libdir}/pkgconfig/rdkafka++.pc
|
||||||
|
%{_libdir}/pkgconfig/rdkafka-static.pc
|
||||||
|
%{_libdir}/pkgconfig/rdkafka++-static.pc
|
||||||
|
|
||||||
|
%changelog
|
||||||
|
* Tue Nov 14 2023 Attila Lakatos <alakatos@redhat.com> - 1.6.1-1
|
||||||
|
- Rebase to 1.6.1
|
||||||
|
resolves: RHEL-12892
|
||||||
|
- Fix warnings reported by rpmlint
|
||||||
|
- Enable support for zlib/zstd compression and GSSAPI
|
||||||
|
|
||||||
|
* Mon Nov 01 2021 Attila Lakatos <alakatos@redhat.com> - 0.11.4-3
|
||||||
|
- Set SSL_CTX_set_cipher_list to use system-wide crypto policies
|
||||||
|
resolves: rhbz#1842817
|
||||||
|
|
||||||
|
* Mon Jun 03 2019 Radovan Sroka <rsroka@redhat.com> - 0.11.4-2
|
||||||
|
- rebuild
|
||||||
|
|
||||||
|
* Fri Feb 08 2019 Jiri Vymazal <jvymazal@redhat.com> - 0.11.4-1
|
||||||
|
- rebase to v0.11.4 (0.11.5 was breaking rsyslog-kafka)
|
||||||
|
resolves: rhbz#1614697
|
||||||
|
|
||||||
|
* Fri Aug 10 2018 Jiri Vymazal <jvymazal@redhat.com> - 0.11.5-1
|
||||||
|
- rebase to v0.11.5
|
||||||
|
resolves: rhbz#1614697
|
||||||
|
- removed explicit attr macro on symlinks
|
||||||
|
|
||||||
|
* Thu Jun 28 2018 Radovan Sroka <rsroka@redhat.com> - 0.11.0-2
|
||||||
|
- switch from python2 to python3
|
||||||
|
resolves: rhbz#1595795
|
||||||
|
|
||||||
|
* Thu Aug 31 2017 Michal Luscon <mluscon@gmail.com> - 0.11.0-1
|
||||||
|
- Update to 0.11.0
|
||||||
|
|
||||||
|
* Thu Aug 03 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-3
|
||||||
|
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Binutils_Mass_Rebuild
|
||||||
|
|
||||||
|
* Wed Jul 26 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-2
|
||||||
|
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Mass_Rebuild
|
||||||
|
|
||||||
|
* Mon May 22 2017 Radovan Sroka <rsroka@redhat.com> - 0.9.5-1
|
||||||
|
- Update to 0.9.4
|
||||||
|
|
||||||
|
* Sat Mar 11 2017 Michal Luscon <mluscon@gmail.com> - 0.9.4-1
|
||||||
|
- Update to 0.9.4
|
||||||
|
- enable lz4, ssl, sasl
|
||||||
|
|
||||||
|
* Fri Feb 10 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.2-2
|
||||||
|
- Rebuilt for https://fedoraproject.org/wiki/Fedora_26_Mass_Rebuild
|
||||||
|
|
||||||
|
|
||||||
|
* Fri Nov 11 2016 Radovan Sroka <rsroka@redhat.com> 0.9.2-1
|
||||||
|
- 0.9.2 release
|
||||||
|
- package created
|
@ -1,6 +0,0 @@
|
|||||||
--- !Policy
|
|
||||||
product_versions:
|
|
||||||
- rhel-9
|
|
||||||
decision_context: osci_compose_gate
|
|
||||||
rules:
|
|
||||||
- !PassingTestCaseRule {test_case_name: osci.brew-build.tier0.functional}
|
|
203
librdkafka.spec
203
librdkafka.spec
@ -1,203 +0,0 @@
|
|||||||
Name: librdkafka
|
|
||||||
Version: 1.6.1
|
|
||||||
Release: 102%{?dist}
|
|
||||||
Summary: The Apache Kafka C library
|
|
||||||
|
|
||||||
License: BSD
|
|
||||||
URL: https://github.com/edenhill/librdkafka
|
|
||||||
Source0: %{url}/archive/v%{version}/%{name}-%{version}.tar.gz
|
|
||||||
|
|
||||||
BuildRequires: gcc
|
|
||||||
BuildRequires: gcc-c++
|
|
||||||
BuildRequires: make
|
|
||||||
BuildRequires: python3
|
|
||||||
BuildRequires: libzstd-devel
|
|
||||||
BuildRequires: lz4-devel
|
|
||||||
BuildRequires: openssl-devel
|
|
||||||
BuildRequires: cyrus-sasl-devel
|
|
||||||
BuildRequires: zlib-devel
|
|
||||||
BuildRequires: rapidjson-devel
|
|
||||||
|
|
||||||
Patch0: rsyslog-1.6.1-rhbz2032923-crypto-compliance.patch
|
|
||||||
|
|
||||||
%description
|
|
||||||
Librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
|
|
||||||
containing both Producer and Consumer support.
|
|
||||||
It was designed with message delivery reliability and high performance in mind,
|
|
||||||
current figures exceed 800000 messages/second for the producer and 3 million
|
|
||||||
messages/second for the consumer.
|
|
||||||
|
|
||||||
%package devel
|
|
||||||
Summary: The Apache Kafka C library (Development Environment)
|
|
||||||
Requires: %{name}%{?_isa} = %{version}-%{release}
|
|
||||||
|
|
||||||
%description devel
|
|
||||||
librdkafka is a C/C++ library implementation of the Apache Kafka protocol,
|
|
||||||
containing both Producer and Consumer support.
|
|
||||||
This package contains headers and libraries required to build applications
|
|
||||||
using librdkafka.
|
|
||||||
|
|
||||||
%prep
|
|
||||||
%autosetup -p1
|
|
||||||
|
|
||||||
%build
|
|
||||||
# This package has a configure test which uses ASMs, but does not link the
|
|
||||||
# resultant .o files. As such the ASM test is always successful, even on
|
|
||||||
# architectures were the ASM is not valid when compiling with LTO.
|
|
||||||
#
|
|
||||||
# -ffat-lto-objects is sufficient to address this issue. It is the default
|
|
||||||
# for F33, but is expected to only be enabled for packages that need it in
|
|
||||||
# F34, so we use it here explicitly
|
|
||||||
%define _lto_cflags -flto=auto -ffat-lto-objects
|
|
||||||
|
|
||||||
%configure \
|
|
||||||
--enable-zlib \
|
|
||||||
--enable-zstd \
|
|
||||||
--enable-lz4 \
|
|
||||||
--enable-lz4-ext \
|
|
||||||
--enable-ssl \
|
|
||||||
--enable-gssapi \
|
|
||||||
--enable-sasl
|
|
||||||
|
|
||||||
%make_build
|
|
||||||
|
|
||||||
%check
|
|
||||||
make check
|
|
||||||
|
|
||||||
%install
|
|
||||||
%make_install
|
|
||||||
find %{buildroot} -name '*.a' -delete -print
|
|
||||||
find %{buildroot} -name '*-static.pc' -delete -print
|
|
||||||
|
|
||||||
%ldconfig_scriptlets
|
|
||||||
|
|
||||||
%files
|
|
||||||
%{_libdir}/librdkafka.so.*
|
|
||||||
%{_libdir}/librdkafka++.so.*
|
|
||||||
%doc README.md CONFIGURATION.md INTRODUCTION.md LICENSE LICENSES.txt STATISTICS.md CHANGELOG.md
|
|
||||||
%license LICENSE LICENSE.pycrc LICENSE.snappy
|
|
||||||
|
|
||||||
%files devel
|
|
||||||
%dir %{_includedir}/librdkafka
|
|
||||||
%attr(0644,root,root) %{_includedir}/librdkafka/*
|
|
||||||
%attr(0755,root,root) %{_libdir}/librdkafka.so
|
|
||||||
%attr(0755,root,root) %{_libdir}/librdkafka++.so
|
|
||||||
%{_libdir}/pkgconfig/rdkafka.pc
|
|
||||||
%{_libdir}/pkgconfig/rdkafka++.pc
|
|
||||||
|
|
||||||
|
|
||||||
%changelog
|
|
||||||
* Tue Feb 08 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-102
|
|
||||||
- Changes for tests to compile and run appropriately
|
|
||||||
Related: rhbz#2032923
|
|
||||||
|
|
||||||
* Mon Feb 07 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-101
|
|
||||||
- Add missing tests
|
|
||||||
Related: rhbz#2032923
|
|
||||||
|
|
||||||
* Fri Feb 04 2022 Sergio Arroutbi <sarroutb@redhat.com> - 1.6.1-100
|
|
||||||
- Fix for rpmlint reporting crypto-policy-non-compliance-openssl
|
|
||||||
resolves: rhbz#2032923
|
|
||||||
|
|
||||||
* Mon Aug 09 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-4
|
|
||||||
- Rebuilt for IMA sigs, glibc 2.34, aarch64 flags
|
|
||||||
Related: rhbz#1991688
|
|
||||||
|
|
||||||
* Wed Jun 16 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-3
|
|
||||||
- Rebuilt for RHEL 9 BETA for openssl 3.0
|
|
||||||
Related: rhbz#1971065
|
|
||||||
|
|
||||||
* Fri Apr 16 2021 Mohan Boddu <mboddu@redhat.com> - 1.6.1-2
|
|
||||||
- Rebuilt for RHEL 9 BETA on Apr 15th 2021. Related: rhbz#1947937
|
|
||||||
|
|
||||||
* Mon Mar 08 2021 Attila Lakatos <alakatos@redhat.com> - 1.6.1-1
|
|
||||||
- Update to upstream 1.6.1
|
|
||||||
resolves: rhbz#1932286
|
|
||||||
|
|
||||||
* Wed Feb 03 2021 Neal Gompa <ngompa@datto.com> - 1.6.0-1
|
|
||||||
- Update to upstream 1.6.0
|
|
||||||
resolves: rhbz#1883910
|
|
||||||
- Enable all missing features
|
|
||||||
- Fix linking to external lz4 library
|
|
||||||
- Minor spec cleanups
|
|
||||||
|
|
||||||
* Tue Jan 26 2021 Fedora Release Engineering <releng@fedoraproject.org> - 1.5.0-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_34_Mass_Rebuild
|
|
||||||
|
|
||||||
* Wed Sep 09 2020 Zoltan Fridrich <zfridric@redhat.com> - 1.5.0-1
|
|
||||||
- Update to upstream 1.5.0
|
|
||||||
resolves: rhbz#1818082
|
|
||||||
|
|
||||||
* Wed Sep 09 2020 Zoltan Fridrich <zfridric@redhat.com> - 1.3.0-6
|
|
||||||
- Switch BuildRequires from python2 to python3
|
|
||||||
resolves: rhbz#1808329
|
|
||||||
|
|
||||||
* Fri Aug 21 2020 Jeff Law <law@redhat.com> - 1.3.0-5
|
|
||||||
- Re-enable LTO
|
|
||||||
|
|
||||||
* Tue Jul 28 2020 Fedora Release Engineering <releng@fedoraproject.org> - 1.3.0-4
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_33_Mass_Rebuild
|
|
||||||
|
|
||||||
* Tue Jun 30 2020 Jeff Law <law@redhat.com> - 1.3.0-3
|
|
||||||
- Disable LTO
|
|
||||||
|
|
||||||
* Wed Jan 29 2020 Fedora Release Engineering <releng@fedoraproject.org> - 1.3.0-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_32_Mass_Rebuild
|
|
||||||
|
|
||||||
* Mon Dec 30 2019 Michal Luscon <mluscon@gmail.com> - 1.3.0-1
|
|
||||||
- Update to upstream 1.3.0
|
|
||||||
|
|
||||||
* Thu Jul 25 2019 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.6-3
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_31_Mass_Rebuild
|
|
||||||
|
|
||||||
* Fri Feb 01 2019 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.6-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_30_Mass_Rebuild
|
|
||||||
|
|
||||||
* Wed Dec 12 2018 Javier Peña <jpena@redhat.com> - 0.11.6-1
|
|
||||||
- Update to upstream 0.11.6
|
|
||||||
|
|
||||||
* Mon Sep 17 2018 Michal Luscon <mluscon@gmail.com> - 0.11.5-1
|
|
||||||
- Update to upstream 0.11.5
|
|
||||||
|
|
||||||
* Fri Jul 13 2018 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.4-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_29_Mass_Rebuild
|
|
||||||
|
|
||||||
* Fri Apr 20 2018 Michal Luscon <mluscon@gmail.com> - 0.11.4-1
|
|
||||||
- Update to upstream 0.11.4
|
|
||||||
|
|
||||||
* Thu Mar 15 2018 Iryna Shcherbina <ishcherb@redhat.com> - 0.11.3-3
|
|
||||||
- Update Python 2 dependency declarations to new packaging standards
|
|
||||||
(See https://fedoraproject.org/wiki/FinalizingFedoraSwitchtoPython3)
|
|
||||||
|
|
||||||
* Wed Feb 07 2018 Fedora Release Engineering <releng@fedoraproject.org> - 0.11.3-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_28_Mass_Rebuild
|
|
||||||
|
|
||||||
* Tue Jan 09 2018 Michal Luscon <mluscon@gmail.com> - 0.11.3-1
|
|
||||||
- Update to upstream 0.11.3
|
|
||||||
|
|
||||||
* Thu Nov 02 2017 Michal Luscon <mluscon@gmail.com> - 0.11.1-1
|
|
||||||
- Update to upstream 0.11.1
|
|
||||||
|
|
||||||
* Thu Aug 31 2017 Michal Luscon <mluscon@gmail.com> - 0.11.0-1
|
|
||||||
- Update to 0.11.0
|
|
||||||
|
|
||||||
* Thu Aug 03 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-3
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Binutils_Mass_Rebuild
|
|
||||||
|
|
||||||
* Wed Jul 26 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.5-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_27_Mass_Rebuild
|
|
||||||
|
|
||||||
* Mon May 22 2017 Radovan Sroka <rsroka@redhat.com> - 0.9.5-1
|
|
||||||
- Update to 0.9.4
|
|
||||||
|
|
||||||
* Sat Mar 11 2017 Michal Luscon <mluscon@gmail.com> - 0.9.4-1
|
|
||||||
- Update to 0.9.4
|
|
||||||
- enable lz4, ssl, sasl
|
|
||||||
|
|
||||||
* Fri Feb 10 2017 Fedora Release Engineering <releng@fedoraproject.org> - 0.9.2-2
|
|
||||||
- Rebuilt for https://fedoraproject.org/wiki/Fedora_26_Mass_Rebuild
|
|
||||||
|
|
||||||
|
|
||||||
* Fri Nov 11 2016 Radovan Sroka <rsroka@redhat.com> 0.9.2-1
|
|
||||||
- 0.9.2 release
|
|
||||||
- package created
|
|
1
sources
1
sources
@ -1 +0,0 @@
|
|||||||
SHA512 (librdkafka-1.6.1.tar.gz) = 19f64f275c7cd1c60f026a466c79021549e4acced60e6c01b364944ddb2f4a2c0784ab35031275c406b638a14b958c6f904177e51e2fcb4d058c541d046677dc
|
|
@ -1 +0,0 @@
|
|||||||
1
|
|
@ -1,822 +0,0 @@
|
|||||||
/*
|
|
||||||
* librdkafka - Apache Kafka C library
|
|
||||||
*
|
|
||||||
* Copyright (c) 2012, Magnus Edenhill
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* Redistribution and use in source and binary forms, with or without
|
|
||||||
* modification, are permitted provided that the following conditions are met:
|
|
||||||
*
|
|
||||||
* 1. Redistributions of source code must retain the above copyright notice,
|
|
||||||
* this list of conditions and the following disclaimer.
|
|
||||||
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
||||||
* this list of conditions and the following disclaimer in the documentation
|
|
||||||
* and/or other materials provided with the distribution.
|
|
||||||
*
|
|
||||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
||||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
||||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
||||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
||||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
||||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
||||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
||||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
||||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
||||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
||||||
* POSSIBILITY OF SUCH DAMAGE.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Apache Kafka consumer & producer example programs
|
|
||||||
* using the Kafka driver from librdkafka
|
|
||||||
* (https://github.com/edenhill/librdkafka)
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <ctype.h>
|
|
||||||
#include <signal.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <syslog.h>
|
|
||||||
#include <time.h>
|
|
||||||
#include <sys/time.h>
|
|
||||||
#include <getopt.h>
|
|
||||||
|
|
||||||
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
|
|
||||||
* is builtin from within the librdkafka source tree and thus differs. */
|
|
||||||
#include <librdkafka/rdkafka.h> /* for Kafka driver */
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
const char * USAGE_STR ="Usage: %s -C|-P|-L -t <topic> "
|
|
||||||
"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
|
|
||||||
"\n"
|
|
||||||
"librdkafka version %s (0x%08x)\n"
|
|
||||||
"\n"
|
|
||||||
" Options:\n"
|
|
||||||
" -C | -P Consumer or Producer mode\n"
|
|
||||||
" -L Metadata list mode\n"
|
|
||||||
" -t <topic> Topic to fetch / produce\n"
|
|
||||||
" -p <num> Partition (random partitioner)\n"
|
|
||||||
" -b <brokers> Broker address (localhost:9092)\n"
|
|
||||||
" -z <codec> Enable compression:\n"
|
|
||||||
" none|gzip|snappy\n"
|
|
||||||
" -o <offset> Start offset (consumer):\n"
|
|
||||||
" beginning, end, NNNNN or -NNNNN\n"
|
|
||||||
" wmark returns the current hi&lo "
|
|
||||||
"watermarks.\n"
|
|
||||||
" -o report Report message offsets (producer)\n"
|
|
||||||
" -e Exit consumer when last message\n"
|
|
||||||
" in partition has been received.\n"
|
|
||||||
" -d [facs..] Enable debugging contexts:\n"
|
|
||||||
" %s\n"
|
|
||||||
" -q Be quiet\n"
|
|
||||||
" -A Raw payload output (consumer)\n"
|
|
||||||
" -X <prop=name> Set arbitrary librdkafka "
|
|
||||||
"configuration property\n"
|
|
||||||
" Properties prefixed with \"topic.\" "
|
|
||||||
"will be set on topic object.\n"
|
|
||||||
" -X list Show full list of supported "
|
|
||||||
"properties.\n"
|
|
||||||
" -X <prop> Get single property value\n"
|
|
||||||
"\n"
|
|
||||||
" In Consumer mode:\n"
|
|
||||||
" writes fetched messages to stdout\n"
|
|
||||||
" In Producer mode:\n"
|
|
||||||
" reads messages from stdin and sends to broker\n"
|
|
||||||
" In List mode:\n"
|
|
||||||
" queries broker for metadata information, "
|
|
||||||
"topic is optional.\n"
|
|
||||||
"\n"
|
|
||||||
"\n"
|
|
||||||
"\n";
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static int run = 1;
|
|
||||||
static rd_kafka_t *rk;
|
|
||||||
static int exit_eof = 0;
|
|
||||||
static int quiet = 0;
|
|
||||||
static enum {
|
|
||||||
OUTPUT_HEXDUMP,
|
|
||||||
OUTPUT_RAW,
|
|
||||||
} output = OUTPUT_HEXDUMP;
|
|
||||||
|
|
||||||
static void stop (int sig) {
|
|
||||||
run = 0;
|
|
||||||
fclose(stdin); /* abort fgets() */
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
|
|
||||||
const char *p = (const char *)ptr;
|
|
||||||
size_t of = 0;
|
|
||||||
|
|
||||||
|
|
||||||
if (name)
|
|
||||||
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
|
|
||||||
|
|
||||||
for (of = 0 ; of < len ; of += 16) {
|
|
||||||
char hexen[16*3+1];
|
|
||||||
char charen[16+1];
|
|
||||||
int hof = 0;
|
|
||||||
|
|
||||||
int cof = 0;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
|
|
||||||
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
|
|
||||||
cof += sprintf(charen+cof, "%c",
|
|
||||||
isprint((int)p[i]) ? p[i] : '.');
|
|
||||||
}
|
|
||||||
fprintf(fp, "%08zx: %-48s %-16s\n",
|
|
||||||
of, hexen, charen);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Kafka logger callback (optional)
|
|
||||||
*/
|
|
||||||
static void logger (const rd_kafka_t *rk, int level,
|
|
||||||
const char *fac, const char *buf) {
|
|
||||||
struct timeval tv;
|
|
||||||
gettimeofday(&tv, NULL);
|
|
||||||
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
|
|
||||||
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
|
|
||||||
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Message delivery report callback.
|
|
||||||
* Called once for each message.
|
|
||||||
* See rdkafka.h for more information.
|
|
||||||
*/
|
|
||||||
static void msg_delivered (rd_kafka_t *rk,
|
|
||||||
void *payload, size_t len,
|
|
||||||
int error_code,
|
|
||||||
void *opaque, void *msg_opaque) {
|
|
||||||
|
|
||||||
if (error_code)
|
|
||||||
fprintf(stderr, "%% Message delivery failed: %s\n",
|
|
||||||
rd_kafka_err2str(error_code));
|
|
||||||
else if (!quiet)
|
|
||||||
fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
|
|
||||||
(int)len, (const char *)payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Message delivery report callback using the richer rd_kafka_message_t object.
|
|
||||||
*/
|
|
||||||
static void msg_delivered2 (rd_kafka_t *rk,
|
|
||||||
const rd_kafka_message_t *rkmessage, void *opaque) {
|
|
||||||
printf("del: %s: offset %"PRId64"\n",
|
|
||||||
rd_kafka_err2str(rkmessage->err), rkmessage->offset);
|
|
||||||
if (rkmessage->err)
|
|
||||||
fprintf(stderr, "%% Message delivery failed: %s\n",
|
|
||||||
rd_kafka_err2str(rkmessage->err));
|
|
||||||
else if (!quiet)
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Message delivered (%zd bytes, offset %"PRId64", "
|
|
||||||
"partition %"PRId32"): %.*s\n",
|
|
||||||
rkmessage->len, rkmessage->offset,
|
|
||||||
rkmessage->partition,
|
|
||||||
(int)rkmessage->len, (const char *)rkmessage->payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void msg_consume (rd_kafka_message_t *rkmessage,
|
|
||||||
void *opaque) {
|
|
||||||
if (rkmessage->err) {
|
|
||||||
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Consumer reached end of %s [%"PRId32"] "
|
|
||||||
"message queue at offset %"PRId64"\n",
|
|
||||||
rd_kafka_topic_name(rkmessage->rkt),
|
|
||||||
rkmessage->partition, rkmessage->offset);
|
|
||||||
|
|
||||||
if (exit_eof)
|
|
||||||
run = 0;
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
|
|
||||||
"offset %"PRId64": %s\n",
|
|
||||||
rd_kafka_topic_name(rkmessage->rkt),
|
|
||||||
rkmessage->partition,
|
|
||||||
rkmessage->offset,
|
|
||||||
rd_kafka_message_errstr(rkmessage));
|
|
||||||
|
|
||||||
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
|
|
||||||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
|
|
||||||
run = 0;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!quiet) {
|
|
||||||
rd_kafka_timestamp_type_t tstype;
|
|
||||||
int64_t timestamp;
|
|
||||||
fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
|
|
||||||
rkmessage->offset, rkmessage->len);
|
|
||||||
|
|
||||||
timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
|
|
||||||
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
|
|
||||||
const char *tsname = "?";
|
|
||||||
if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
|
|
||||||
tsname = "create time";
|
|
||||||
else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
|
|
||||||
tsname = "log append time";
|
|
||||||
|
|
||||||
fprintf(stdout, "%% Message timestamp: %s %"PRId64
|
|
||||||
" (%ds ago)\n",
|
|
||||||
tsname, timestamp,
|
|
||||||
!timestamp ? 0 :
|
|
||||||
(int)time(NULL) - (int)(timestamp/1000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rkmessage->key_len) {
|
|
||||||
if (output == OUTPUT_HEXDUMP)
|
|
||||||
hexdump(stdout, "Message Key",
|
|
||||||
rkmessage->key, rkmessage->key_len);
|
|
||||||
else
|
|
||||||
printf("Key: %.*s\n",
|
|
||||||
(int)rkmessage->key_len, (char *)rkmessage->key);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (output == OUTPUT_HEXDUMP)
|
|
||||||
hexdump(stdout, "Message Payload",
|
|
||||||
rkmessage->payload, rkmessage->len);
|
|
||||||
else
|
|
||||||
printf("%.*s\n",
|
|
||||||
(int)rkmessage->len, (char *)rkmessage->payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void metadata_print (const char *topic,
|
|
||||||
const struct rd_kafka_metadata *metadata) {
|
|
||||||
int i, j, k;
|
|
||||||
|
|
||||||
printf("Metadata for %s (from broker %"PRId32": %s):\n",
|
|
||||||
topic ? : "all topics",
|
|
||||||
metadata->orig_broker_id,
|
|
||||||
metadata->orig_broker_name);
|
|
||||||
|
|
||||||
|
|
||||||
/* Iterate brokers */
|
|
||||||
printf(" %i brokers:\n", metadata->broker_cnt);
|
|
||||||
for (i = 0 ; i < metadata->broker_cnt ; i++)
|
|
||||||
printf(" broker %"PRId32" at %s:%i\n",
|
|
||||||
metadata->brokers[i].id,
|
|
||||||
metadata->brokers[i].host,
|
|
||||||
metadata->brokers[i].port);
|
|
||||||
|
|
||||||
/* Iterate topics */
|
|
||||||
printf(" %i topics:\n", metadata->topic_cnt);
|
|
||||||
for (i = 0 ; i < metadata->topic_cnt ; i++) {
|
|
||||||
const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
|
|
||||||
printf(" topic \"%s\" with %i partitions:",
|
|
||||||
t->topic,
|
|
||||||
t->partition_cnt);
|
|
||||||
if (t->err) {
|
|
||||||
printf(" %s", rd_kafka_err2str(t->err));
|
|
||||||
if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
|
|
||||||
printf(" (try again)");
|
|
||||||
}
|
|
||||||
printf("\n");
|
|
||||||
|
|
||||||
/* Iterate topic's partitions */
|
|
||||||
for (j = 0 ; j < t->partition_cnt ; j++) {
|
|
||||||
const struct rd_kafka_metadata_partition *p;
|
|
||||||
p = &t->partitions[j];
|
|
||||||
printf(" partition %"PRId32", "
|
|
||||||
"leader %"PRId32", replicas: ",
|
|
||||||
p->id, p->leader);
|
|
||||||
|
|
||||||
/* Iterate partition's replicas */
|
|
||||||
for (k = 0 ; k < p->replica_cnt ; k++)
|
|
||||||
printf("%s%"PRId32,
|
|
||||||
k > 0 ? ",":"", p->replicas[k]);
|
|
||||||
|
|
||||||
/* Iterate partition's ISRs */
|
|
||||||
printf(", isrs: ");
|
|
||||||
for (k = 0 ; k < p->isr_cnt ; k++)
|
|
||||||
printf("%s%"PRId32,
|
|
||||||
k > 0 ? ",":"", p->isrs[k]);
|
|
||||||
if (p->err)
|
|
||||||
printf(", %s\n", rd_kafka_err2str(p->err));
|
|
||||||
else
|
|
||||||
printf("\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void sig_usr1 (int sig) {
|
|
||||||
rd_kafka_dump(stdout, rk);
|
|
||||||
}
|
|
||||||
|
|
||||||
int main (int argc, char **argv) {
|
|
||||||
rd_kafka_topic_t *rkt;
|
|
||||||
char *brokers = "localhost:9092";
|
|
||||||
char mode = 'C';
|
|
||||||
char *topic = NULL;
|
|
||||||
int partition = RD_KAFKA_PARTITION_UA;
|
|
||||||
int opt;
|
|
||||||
rd_kafka_conf_t *conf;
|
|
||||||
rd_kafka_topic_conf_t *topic_conf;
|
|
||||||
char errstr[512];
|
|
||||||
int64_t start_offset = 0;
|
|
||||||
int report_offsets = 0;
|
|
||||||
int do_conf_dump = 0;
|
|
||||||
char tmp[16];
|
|
||||||
int64_t seek_offset = 0;
|
|
||||||
int64_t tmp_offset = 0;
|
|
||||||
int get_wmarks = 0;
|
|
||||||
|
|
||||||
/* Kafka configuration */
|
|
||||||
conf = rd_kafka_conf_new();
|
|
||||||
|
|
||||||
/* Set logger */
|
|
||||||
rd_kafka_conf_set_log_cb(conf, logger);
|
|
||||||
|
|
||||||
/* Quick termination */
|
|
||||||
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
|
|
||||||
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
|
|
||||||
|
|
||||||
/* Topic configuration */
|
|
||||||
topic_conf = rd_kafka_topic_conf_new();
|
|
||||||
|
|
||||||
while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:As:")) != -1) {
|
|
||||||
switch (opt) {
|
|
||||||
case 'P':
|
|
||||||
case 'C':
|
|
||||||
case 'L':
|
|
||||||
mode = opt;
|
|
||||||
break;
|
|
||||||
case 't':
|
|
||||||
topic = optarg;
|
|
||||||
break;
|
|
||||||
case 'p':
|
|
||||||
partition = atoi(optarg);
|
|
||||||
break;
|
|
||||||
case 'b':
|
|
||||||
brokers = optarg;
|
|
||||||
break;
|
|
||||||
case 'z':
|
|
||||||
if (rd_kafka_conf_set(conf, "compression.codec",
|
|
||||||
optarg,
|
|
||||||
errstr, sizeof(errstr)) !=
|
|
||||||
RD_KAFKA_CONF_OK) {
|
|
||||||
fprintf(stderr, "%% %s\n", errstr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'o':
|
|
||||||
case 's':
|
|
||||||
if (!strcmp(optarg, "end"))
|
|
||||||
tmp_offset = RD_KAFKA_OFFSET_END;
|
|
||||||
else if (!strcmp(optarg, "beginning"))
|
|
||||||
tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
|
|
||||||
else if (!strcmp(optarg, "stored"))
|
|
||||||
tmp_offset = RD_KAFKA_OFFSET_STORED;
|
|
||||||
else if (!strcmp(optarg, "report"))
|
|
||||||
report_offsets = 1;
|
|
||||||
else if (!strcmp(optarg, "wmark"))
|
|
||||||
get_wmarks = 1;
|
|
||||||
else {
|
|
||||||
tmp_offset = strtoll(optarg, NULL, 10);
|
|
||||||
|
|
||||||
if (tmp_offset < 0)
|
|
||||||
tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (opt == 'o')
|
|
||||||
start_offset = tmp_offset;
|
|
||||||
else if (opt == 's')
|
|
||||||
seek_offset = tmp_offset;
|
|
||||||
break;
|
|
||||||
case 'e':
|
|
||||||
exit_eof = 1;
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
if (rd_kafka_conf_set(conf, "debug", optarg,
|
|
||||||
errstr, sizeof(errstr)) !=
|
|
||||||
RD_KAFKA_CONF_OK) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Debug configuration failed: "
|
|
||||||
"%s: %s\n",
|
|
||||||
errstr, optarg);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'q':
|
|
||||||
quiet = 1;
|
|
||||||
break;
|
|
||||||
case 'A':
|
|
||||||
output = OUTPUT_RAW;
|
|
||||||
break;
|
|
||||||
case 'X':
|
|
||||||
{
|
|
||||||
char *name, *val;
|
|
||||||
rd_kafka_conf_res_t res;
|
|
||||||
|
|
||||||
if (!strcmp(optarg, "list") ||
|
|
||||||
!strcmp(optarg, "help")) {
|
|
||||||
rd_kafka_conf_properties_show(stdout);
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!strcmp(optarg, "dump")) {
|
|
||||||
do_conf_dump = 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
name = optarg;
|
|
||||||
if (!(val = strchr(name, '='))) {
|
|
||||||
char dest[512];
|
|
||||||
size_t dest_size = sizeof(dest);
|
|
||||||
/* Return current value for property. */
|
|
||||||
|
|
||||||
res = RD_KAFKA_CONF_UNKNOWN;
|
|
||||||
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
||||||
res = rd_kafka_topic_conf_get(
|
|
||||||
topic_conf,
|
|
||||||
name+strlen("topic."),
|
|
||||||
dest, &dest_size);
|
|
||||||
if (res == RD_KAFKA_CONF_UNKNOWN)
|
|
||||||
res = rd_kafka_conf_get(
|
|
||||||
conf, name, dest, &dest_size);
|
|
||||||
|
|
||||||
if (res == RD_KAFKA_CONF_OK) {
|
|
||||||
printf("%s = %s\n", name, dest);
|
|
||||||
exit(0);
|
|
||||||
} else {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% %s property\n",
|
|
||||||
res == RD_KAFKA_CONF_UNKNOWN ?
|
|
||||||
"Unknown" : "Invalid");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*val = '\0';
|
|
||||||
val++;
|
|
||||||
|
|
||||||
res = RD_KAFKA_CONF_UNKNOWN;
|
|
||||||
/* Try "topic." prefixed properties on topic
|
|
||||||
* conf first, and then fall through to global if
|
|
||||||
* it didnt match a topic configuration property. */
|
|
||||||
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
||||||
res = rd_kafka_topic_conf_set(topic_conf,
|
|
||||||
name+
|
|
||||||
strlen("topic."),
|
|
||||||
val,
|
|
||||||
errstr,
|
|
||||||
sizeof(errstr));
|
|
||||||
|
|
||||||
if (res == RD_KAFKA_CONF_UNKNOWN)
|
|
||||||
res = rd_kafka_conf_set(conf, name, val,
|
|
||||||
errstr, sizeof(errstr));
|
|
||||||
|
|
||||||
if (res != RD_KAFKA_CONF_OK) {
|
|
||||||
fprintf(stderr, "%% %s\n", errstr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'h':
|
|
||||||
fprintf(stdout,
|
|
||||||
USAGE_STR,
|
|
||||||
argv[0],
|
|
||||||
rd_kafka_version_str(), rd_kafka_version(),
|
|
||||||
RD_KAFKA_DEBUG_CONTEXTS);
|
|
||||||
exit(0);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
goto usage;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (do_conf_dump) {
|
|
||||||
const char **arr;
|
|
||||||
size_t cnt;
|
|
||||||
int pass;
|
|
||||||
|
|
||||||
for (pass = 0 ; pass < 2 ; pass++) {
|
|
||||||
int i;
|
|
||||||
|
|
||||||
if (pass == 0) {
|
|
||||||
arr = rd_kafka_conf_dump(conf, &cnt);
|
|
||||||
printf("# Global config\n");
|
|
||||||
} else {
|
|
||||||
printf("# Topic config\n");
|
|
||||||
arr = rd_kafka_topic_conf_dump(topic_conf,
|
|
||||||
&cnt);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i = 0 ; i < (int)cnt ; i += 2)
|
|
||||||
printf("%s = %s\n",
|
|
||||||
arr[i], arr[i+1]);
|
|
||||||
|
|
||||||
printf("\n");
|
|
||||||
|
|
||||||
rd_kafka_conf_dump_free(arr, cnt);
|
|
||||||
}
|
|
||||||
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (optind != argc || (mode != 'L' && !topic)) {
|
|
||||||
usage:
|
|
||||||
fprintf(stderr,
|
|
||||||
USAGE_STR,
|
|
||||||
argv[0],
|
|
||||||
rd_kafka_version_str(), rd_kafka_version(),
|
|
||||||
RD_KAFKA_DEBUG_CONTEXTS);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
|
|
||||||
(mode != 'C' && !isatty(STDOUT_FILENO)))
|
|
||||||
quiet = 1;
|
|
||||||
|
|
||||||
|
|
||||||
signal(SIGINT, stop);
|
|
||||||
signal(SIGUSR1, sig_usr1);
|
|
||||||
|
|
||||||
if (mode == 'P') {
|
|
||||||
/*
|
|
||||||
* Producer
|
|
||||||
*/
|
|
||||||
char buf[2048];
|
|
||||||
int sendcnt = 0;
|
|
||||||
|
|
||||||
/* Set up a message delivery report callback.
|
|
||||||
* It will be called once for each message, either on successful
|
|
||||||
* delivery to broker, or upon failure to deliver to broker. */
|
|
||||||
|
|
||||||
/* If offset reporting (-o report) is enabled, use the
|
|
||||||
* richer dr_msg_cb instead. */
|
|
||||||
if (report_offsets) {
|
|
||||||
rd_kafka_topic_conf_set(topic_conf,
|
|
||||||
"produce.offset.report",
|
|
||||||
"true", errstr, sizeof(errstr));
|
|
||||||
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
|
|
||||||
} else
|
|
||||||
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
|
|
||||||
|
|
||||||
/* Create Kafka handle */
|
|
||||||
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
|
|
||||||
errstr, sizeof(errstr)))) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Failed to create new producer: %s\n",
|
|
||||||
errstr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add brokers */
|
|
||||||
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
||||||
fprintf(stderr, "%% No valid brokers specified\n");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create topic */
|
|
||||||
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
||||||
topic_conf = NULL; /* Now owned by topic */
|
|
||||||
|
|
||||||
if (!quiet)
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Type stuff and hit enter to send\n");
|
|
||||||
|
|
||||||
while (run && fgets(buf, sizeof(buf), stdin)) {
|
|
||||||
size_t len = strlen(buf);
|
|
||||||
if (buf[len-1] == '\n')
|
|
||||||
buf[--len] = '\0';
|
|
||||||
|
|
||||||
/* Send/Produce message. */
|
|
||||||
if (rd_kafka_produce(rkt, partition,
|
|
||||||
RD_KAFKA_MSG_F_COPY,
|
|
||||||
/* Payload and length */
|
|
||||||
buf, len,
|
|
||||||
/* Optional key and its length */
|
|
||||||
NULL, 0,
|
|
||||||
/* Message opaque, provided in
|
|
||||||
* delivery report callback as
|
|
||||||
* msg_opaque. */
|
|
||||||
NULL) == -1) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Failed to produce to topic %s "
|
|
||||||
"partition %i: %s\n",
|
|
||||||
rd_kafka_topic_name(rkt), partition,
|
|
||||||
rd_kafka_err2str(rd_kafka_last_error()));
|
|
||||||
/* Poll to handle delivery reports */
|
|
||||||
rd_kafka_poll(rk, 0);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!quiet)
|
|
||||||
fprintf(stderr, "%% Sent %zd bytes to topic "
|
|
||||||
"%s partition %i\n",
|
|
||||||
len, rd_kafka_topic_name(rkt), partition);
|
|
||||||
sendcnt++;
|
|
||||||
/* Poll to handle delivery reports */
|
|
||||||
rd_kafka_poll(rk, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Poll to handle delivery reports */
|
|
||||||
rd_kafka_poll(rk, 0);
|
|
||||||
|
|
||||||
/* Wait for messages to be delivered */
|
|
||||||
while (run && rd_kafka_outq_len(rk) > 0)
|
|
||||||
rd_kafka_poll(rk, 100);
|
|
||||||
|
|
||||||
/* Destroy topic */
|
|
||||||
rd_kafka_topic_destroy(rkt);
|
|
||||||
|
|
||||||
/* Destroy the handle */
|
|
||||||
rd_kafka_destroy(rk);
|
|
||||||
|
|
||||||
} else if (mode == 'C') {
|
|
||||||
/*
|
|
||||||
* Consumer
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* Create Kafka handle */
|
|
||||||
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
|
|
||||||
errstr, sizeof(errstr)))) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Failed to create new consumer: %s\n",
|
|
||||||
errstr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add brokers */
|
|
||||||
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
||||||
fprintf(stderr, "%% No valid brokers specified\n");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (get_wmarks) {
|
|
||||||
int64_t lo, hi;
|
|
||||||
rd_kafka_resp_err_t err;
|
|
||||||
|
|
||||||
/* Only query for hi&lo partition watermarks */
|
|
||||||
|
|
||||||
if ((err = rd_kafka_query_watermark_offsets(
|
|
||||||
rk, topic, partition, &lo, &hi, 5000))) {
|
|
||||||
fprintf(stderr, "%% query_watermark_offsets() "
|
|
||||||
"failed: %s\n",
|
|
||||||
rd_kafka_err2str(err));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("%s [%d]: low - high offsets: "
|
|
||||||
"%"PRId64" - %"PRId64"\n",
|
|
||||||
topic, partition, lo, hi);
|
|
||||||
|
|
||||||
rd_kafka_destroy(rk);
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Create topic */
|
|
||||||
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
||||||
topic_conf = NULL; /* Now owned by topic */
|
|
||||||
|
|
||||||
/* Start consuming */
|
|
||||||
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
|
|
||||||
rd_kafka_resp_err_t err = rd_kafka_last_error();
|
|
||||||
fprintf(stderr, "%% Failed to start consuming: %s\n",
|
|
||||||
rd_kafka_err2str(err));
|
|
||||||
if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Broker based offset storage "
|
|
||||||
"requires a group.id, "
|
|
||||||
"add: -X group.id=yourGroup\n");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (run) {
|
|
||||||
rd_kafka_message_t *rkmessage;
|
|
||||||
rd_kafka_resp_err_t err;
|
|
||||||
|
|
||||||
/* Poll for errors, etc. */
|
|
||||||
rd_kafka_poll(rk, 0);
|
|
||||||
|
|
||||||
/* Consume single message.
|
|
||||||
* See rdkafka_performance.c for high speed
|
|
||||||
* consuming of messages. */
|
|
||||||
rkmessage = rd_kafka_consume(rkt, partition, 1000);
|
|
||||||
if (!rkmessage) /* timeout */
|
|
||||||
continue;
|
|
||||||
|
|
||||||
msg_consume(rkmessage, NULL);
|
|
||||||
|
|
||||||
/* Return message to rdkafka */
|
|
||||||
rd_kafka_message_destroy(rkmessage);
|
|
||||||
|
|
||||||
if (seek_offset) {
|
|
||||||
err = rd_kafka_seek(rkt, partition, seek_offset,
|
|
||||||
2000);
|
|
||||||
if (err)
|
|
||||||
printf("Seek failed: %s\n",
|
|
||||||
rd_kafka_err2str(err));
|
|
||||||
else
|
|
||||||
printf("Seeked to %"PRId64"\n",
|
|
||||||
seek_offset);
|
|
||||||
seek_offset = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Stop consuming */
|
|
||||||
rd_kafka_consume_stop(rkt, partition);
|
|
||||||
|
|
||||||
while (rd_kafka_outq_len(rk) > 0)
|
|
||||||
rd_kafka_poll(rk, 10);
|
|
||||||
|
|
||||||
/* Destroy topic */
|
|
||||||
rd_kafka_topic_destroy(rkt);
|
|
||||||
|
|
||||||
/* Destroy handle */
|
|
||||||
rd_kafka_destroy(rk);
|
|
||||||
|
|
||||||
} else if (mode == 'L') {
|
|
||||||
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
||||||
|
|
||||||
/* Create Kafka handle */
|
|
||||||
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
|
|
||||||
errstr, sizeof(errstr)))) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Failed to create new producer: %s\n",
|
|
||||||
errstr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add brokers */
|
|
||||||
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
||||||
fprintf(stderr, "%% No valid brokers specified\n");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create topic */
|
|
||||||
if (topic) {
|
|
||||||
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
||||||
topic_conf = NULL; /* Now owned by topic */
|
|
||||||
} else
|
|
||||||
rkt = NULL;
|
|
||||||
|
|
||||||
while (run) {
|
|
||||||
const struct rd_kafka_metadata *metadata;
|
|
||||||
|
|
||||||
/* Fetch metadata */
|
|
||||||
err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
|
|
||||||
&metadata, 5000);
|
|
||||||
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
|
|
||||||
fprintf(stderr,
|
|
||||||
"%% Failed to acquire metadata: %s\n",
|
|
||||||
rd_kafka_err2str(err));
|
|
||||||
run = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
metadata_print(topic, metadata);
|
|
||||||
|
|
||||||
rd_kafka_metadata_destroy(metadata);
|
|
||||||
run = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Destroy topic */
|
|
||||||
if (rkt)
|
|
||||||
rd_kafka_topic_destroy(rkt);
|
|
||||||
|
|
||||||
/* Destroy the handle */
|
|
||||||
rd_kafka_destroy(rk);
|
|
||||||
|
|
||||||
if (topic_conf)
|
|
||||||
rd_kafka_topic_conf_destroy(topic_conf);
|
|
||||||
|
|
||||||
|
|
||||||
/* Exit right away, dont wait for background cleanup, we haven't
|
|
||||||
* done anything important anyway. */
|
|
||||||
exit(err ? 2 : 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (topic_conf)
|
|
||||||
rd_kafka_topic_conf_destroy(topic_conf);
|
|
||||||
|
|
||||||
/* Let background threads clean up and terminate cleanly. */
|
|
||||||
run = 5;
|
|
||||||
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
|
|
||||||
printf("Waiting for librdkafka to decommission\n");
|
|
||||||
if (run <= 0)
|
|
||||||
rd_kafka_dump(stdout, rk);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
@ -1,662 +0,0 @@
|
|||||||
/*
|
|
||||||
* librdkafka - Apache Kafka C library
|
|
||||||
*
|
|
||||||
* Copyright (c) 2014, Magnus Edenhill
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* Redistribution and use in source and binary forms, with or without
|
|
||||||
* modification, are permitted provided that the following conditions are met:
|
|
||||||
*
|
|
||||||
* 1. Redistributions of source code must retain the above copyright notice,
|
|
||||||
* this list of conditions and the following disclaimer.
|
|
||||||
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
||||||
* this list of conditions and the following disclaimer in the documentation
|
|
||||||
* and/or other materials provided with the distribution.
|
|
||||||
*
|
|
||||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
||||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
||||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
||||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
||||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
||||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
||||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
||||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
||||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
||||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
||||||
* POSSIBILITY OF SUCH DAMAGE.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Apache Kafka consumer & producer example programs
|
|
||||||
* using the Kafka driver from librdkafka
|
|
||||||
* (https://github.com/edenhill/librdkafka)
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <iostream>
|
|
||||||
#include <string>
|
|
||||||
#include <cstdlib>
|
|
||||||
#include <cstdio>
|
|
||||||
#include <csignal>
|
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
#ifdef _MSC_VER
|
|
||||||
#include "../win32/wingetopt.h"
|
|
||||||
#elif _AIX
|
|
||||||
#include <unistd.h>
|
|
||||||
#else
|
|
||||||
#include <getopt.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Typically include path in a real application would be
|
|
||||||
* #include <librdkafka/rdkafkacpp.h>
|
|
||||||
*/
|
|
||||||
#include <librdkafka/rdkafkacpp.h>
|
|
||||||
|
|
||||||
const char * USAGE_STR =
|
|
||||||
"Usage: %s [-C|-P] -t <topic> "
|
|
||||||
"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
|
|
||||||
"\n"
|
|
||||||
"librdkafka version %s (0x%08x, builtin.features \"%s\")\n"
|
|
||||||
"\n"
|
|
||||||
" Options:\n"
|
|
||||||
" -C | -P Consumer or Producer mode\n"
|
|
||||||
" -L Metadata list mode\n"
|
|
||||||
" -t <topic> Topic to fetch / produce\n"
|
|
||||||
" -p <num> Partition (random partitioner)\n"
|
|
||||||
" -p <func> Use partitioner:\n"
|
|
||||||
" random (default), hash\n"
|
|
||||||
" -b <brokers> Broker address (localhost:9092)\n"
|
|
||||||
" -z <codec> Enable compression:\n"
|
|
||||||
" none|gzip|snappy\n"
|
|
||||||
" -o <offset> Start offset (consumer)\n"
|
|
||||||
" -e Exit consumer when last message\n"
|
|
||||||
" in partition has been received.\n"
|
|
||||||
" -d [facs..] Enable debugging contexts:\n"
|
|
||||||
" %s\n"
|
|
||||||
" -M <intervalms> Enable statistics\n"
|
|
||||||
" -X <prop=name> Set arbitrary librdkafka "
|
|
||||||
"configuration property\n"
|
|
||||||
" Properties prefixed with \"topic.\" "
|
|
||||||
"will be set on topic object.\n"
|
|
||||||
" Use '-X list' to see the full list\n"
|
|
||||||
" of supported properties.\n"
|
|
||||||
" -f <flag> Set option:\n"
|
|
||||||
" ccb - use consume_callback\n"
|
|
||||||
"\n"
|
|
||||||
" In Consumer mode:\n"
|
|
||||||
" writes fetched messages to stdout\n"
|
|
||||||
" In Producer mode:\n"
|
|
||||||
" reads messages from stdin and sends to broker\n"
|
|
||||||
"\n"
|
|
||||||
"\n"
|
|
||||||
"\n";
|
|
||||||
|
|
||||||
|
|
||||||
static void metadata_print (const std::string &topic,
|
|
||||||
const RdKafka::Metadata *metadata) {
|
|
||||||
std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
|
|
||||||
<< "(from broker " << metadata->orig_broker_id()
|
|
||||||
<< ":" << metadata->orig_broker_name() << std::endl;
|
|
||||||
|
|
||||||
/* Iterate brokers */
|
|
||||||
std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
|
|
||||||
RdKafka::Metadata::BrokerMetadataIterator ib;
|
|
||||||
for (ib = metadata->brokers()->begin();
|
|
||||||
ib != metadata->brokers()->end();
|
|
||||||
++ib) {
|
|
||||||
std::cout << " broker " << (*ib)->id() << " at "
|
|
||||||
<< (*ib)->host() << ":" << (*ib)->port() << std::endl;
|
|
||||||
}
|
|
||||||
/* Iterate topics */
|
|
||||||
std::cout << metadata->topics()->size() << " topics:" << std::endl;
|
|
||||||
RdKafka::Metadata::TopicMetadataIterator it;
|
|
||||||
for (it = metadata->topics()->begin();
|
|
||||||
it != metadata->topics()->end();
|
|
||||||
++it) {
|
|
||||||
std::cout << " topic \""<< (*it)->topic() << "\" with "
|
|
||||||
<< (*it)->partitions()->size() << " partitions:";
|
|
||||||
|
|
||||||
if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
|
|
||||||
std::cout << " " << err2str((*it)->err());
|
|
||||||
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE)
|
|
||||||
std::cout << " (try again)";
|
|
||||||
}
|
|
||||||
std::cout << std::endl;
|
|
||||||
|
|
||||||
/* Iterate topic's partitions */
|
|
||||||
RdKafka::TopicMetadata::PartitionMetadataIterator ip;
|
|
||||||
for (ip = (*it)->partitions()->begin();
|
|
||||||
ip != (*it)->partitions()->end();
|
|
||||||
++ip) {
|
|
||||||
std::cout << " partition " << (*ip)->id()
|
|
||||||
<< ", leader " << (*ip)->leader()
|
|
||||||
<< ", replicas: ";
|
|
||||||
|
|
||||||
/* Iterate partition's replicas */
|
|
||||||
RdKafka::PartitionMetadata::ReplicasIterator ir;
|
|
||||||
for (ir = (*ip)->replicas()->begin();
|
|
||||||
ir != (*ip)->replicas()->end();
|
|
||||||
++ir) {
|
|
||||||
std::cout << (ir == (*ip)->replicas()->begin() ? "":",") << *ir;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Iterate partition's ISRs */
|
|
||||||
std::cout << ", isrs: ";
|
|
||||||
RdKafka::PartitionMetadata::ISRSIterator iis;
|
|
||||||
for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
|
|
||||||
std::cout << (iis == (*ip)->isrs()->begin() ? "":",") << *iis;
|
|
||||||
|
|
||||||
if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
|
|
||||||
std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
|
|
||||||
else
|
|
||||||
std::cout << std::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool run = true;
|
|
||||||
static bool exit_eof = false;
|
|
||||||
|
|
||||||
static void sigterm (int sig) {
|
|
||||||
run = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
|
|
||||||
public:
|
|
||||||
void dr_cb (RdKafka::Message &message) {
|
|
||||||
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
|
|
||||||
message.errstr() << std::endl;
|
|
||||||
if (message.key())
|
|
||||||
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
class ExampleEventCb : public RdKafka::EventCb {
|
|
||||||
public:
|
|
||||||
void event_cb (RdKafka::Event &event) {
|
|
||||||
switch (event.type())
|
|
||||||
{
|
|
||||||
case RdKafka::Event::EVENT_ERROR:
|
|
||||||
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
|
|
||||||
event.str() << std::endl;
|
|
||||||
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
|
|
||||||
run = false;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RdKafka::Event::EVENT_STATS:
|
|
||||||
std::cerr << "\"STATS\": " << event.str() << std::endl;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RdKafka::Event::EVENT_LOG:
|
|
||||||
fprintf(stderr, "LOG-%i-%s: %s\n",
|
|
||||||
event.severity(), event.fac().c_str(), event.str().c_str());
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
std::cerr << "EVENT " << event.type() <<
|
|
||||||
" (" << RdKafka::err2str(event.err()) << "): " <<
|
|
||||||
event.str() << std::endl;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/* Use of this partitioner is pretty pointless since no key is provided
|
|
||||||
* in the produce() call. */
|
|
||||||
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
|
|
||||||
public:
|
|
||||||
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
|
|
||||||
int32_t partition_cnt, void *msg_opaque) {
|
|
||||||
return djb_hash(key->c_str(), key->size()) % partition_cnt;
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
|
|
||||||
static inline unsigned int djb_hash (const char *str, size_t len) {
|
|
||||||
unsigned int hash = 5381;
|
|
||||||
for (size_t i = 0 ; i < len ; i++)
|
|
||||||
hash = ((hash << 5) + hash) + str[i];
|
|
||||||
return hash;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
void msg_consume(RdKafka::Message* message, void* opaque) {
|
|
||||||
switch (message->err()) {
|
|
||||||
case RdKafka::ERR__TIMED_OUT:
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RdKafka::ERR_NO_ERROR:
|
|
||||||
/* Real message */
|
|
||||||
std::cout << "Read msg at offset " << message->offset() << std::endl;
|
|
||||||
if (message->key()) {
|
|
||||||
std::cout << "Key: " << *message->key() << std::endl;
|
|
||||||
}
|
|
||||||
printf("%.*s\n",
|
|
||||||
static_cast<int>(message->len()),
|
|
||||||
static_cast<const char *>(message->payload()));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RdKafka::ERR__PARTITION_EOF:
|
|
||||||
/* Last message */
|
|
||||||
if (exit_eof) {
|
|
||||||
run = false;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RdKafka::ERR__UNKNOWN_TOPIC:
|
|
||||||
case RdKafka::ERR__UNKNOWN_PARTITION:
|
|
||||||
std::cerr << "Consume failed: " << message->errstr() << std::endl;
|
|
||||||
run = false;
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
/* Errors */
|
|
||||||
std::cerr << "Consume failed: " << message->errstr() << std::endl;
|
|
||||||
run = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ExampleConsumeCb : public RdKafka::ConsumeCb {
|
|
||||||
public:
|
|
||||||
void consume_cb (RdKafka::Message &msg, void *opaque) {
|
|
||||||
msg_consume(&msg, opaque);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int main (int argc, char **argv) {
|
|
||||||
std::string brokers = "localhost";
|
|
||||||
std::string errstr;
|
|
||||||
std::string topic_str;
|
|
||||||
std::string mode;
|
|
||||||
std::string debug;
|
|
||||||
int32_t partition = RdKafka::Topic::PARTITION_UA;
|
|
||||||
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
|
|
||||||
bool do_conf_dump = false;
|
|
||||||
int opt;
|
|
||||||
MyHashPartitionerCb hash_partitioner;
|
|
||||||
int use_ccb = 0;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create configuration objects
|
|
||||||
*/
|
|
||||||
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
|
||||||
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
|
|
||||||
|
|
||||||
|
|
||||||
while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:AM:f:")) != -1) {
|
|
||||||
switch (opt) {
|
|
||||||
case 'P':
|
|
||||||
case 'C':
|
|
||||||
case 'L':
|
|
||||||
mode = opt;
|
|
||||||
break;
|
|
||||||
case 't':
|
|
||||||
topic_str = optarg;
|
|
||||||
break;
|
|
||||||
case 'p':
|
|
||||||
if (!strcmp(optarg, "random"))
|
|
||||||
/* default */;
|
|
||||||
else if (!strcmp(optarg, "hash")) {
|
|
||||||
if (tconf->set("partitioner_cb", &hash_partitioner, errstr) !=
|
|
||||||
RdKafka::Conf::CONF_OK) {
|
|
||||||
std::cerr << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
} else
|
|
||||||
partition = std::atoi(optarg);
|
|
||||||
break;
|
|
||||||
case 'b':
|
|
||||||
brokers = optarg;
|
|
||||||
break;
|
|
||||||
case 'z':
|
|
||||||
if (conf->set("compression.codec", optarg, errstr) !=
|
|
||||||
RdKafka::Conf::CONF_OK) {
|
|
||||||
std::cerr << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'o':
|
|
||||||
if (!strcmp(optarg, "end"))
|
|
||||||
start_offset = RdKafka::Topic::OFFSET_END;
|
|
||||||
else if (!strcmp(optarg, "beginning"))
|
|
||||||
start_offset = RdKafka::Topic::OFFSET_BEGINNING;
|
|
||||||
else if (!strcmp(optarg, "stored"))
|
|
||||||
start_offset = RdKafka::Topic::OFFSET_STORED;
|
|
||||||
else
|
|
||||||
start_offset = strtoll(optarg, NULL, 10);
|
|
||||||
break;
|
|
||||||
case 'e':
|
|
||||||
exit_eof = true;
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
debug = optarg;
|
|
||||||
break;
|
|
||||||
case 'M':
|
|
||||||
if (conf->set("statistics.interval.ms", optarg, errstr) !=
|
|
||||||
RdKafka::Conf::CONF_OK) {
|
|
||||||
std::cerr << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'X':
|
|
||||||
{
|
|
||||||
char *name, *val;
|
|
||||||
|
|
||||||
if (!strcmp(optarg, "dump")) {
|
|
||||||
do_conf_dump = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
name = optarg;
|
|
||||||
if (!(val = strchr(name, '='))) {
|
|
||||||
std::cerr << "%% Expected -X property=value, not " <<
|
|
||||||
name << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
*val = '\0';
|
|
||||||
val++;
|
|
||||||
|
|
||||||
/* Try "topic." prefixed properties on topic
|
|
||||||
* conf first, and then fall through to global if
|
|
||||||
* it didnt match a topic configuration property. */
|
|
||||||
RdKafka::Conf::ConfResult res;
|
|
||||||
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
||||||
res = tconf->set(name+strlen("topic."), val, errstr);
|
|
||||||
else
|
|
||||||
res = conf->set(name, val, errstr);
|
|
||||||
|
|
||||||
if (res != RdKafka::Conf::CONF_OK) {
|
|
||||||
std::cerr << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'f':
|
|
||||||
if (!strcmp(optarg, "ccb"))
|
|
||||||
use_ccb = 1;
|
|
||||||
else {
|
|
||||||
std::cerr << "Unknown option: " << optarg << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'h':
|
|
||||||
{
|
|
||||||
std::string features;
|
|
||||||
conf->get("builtin.features", features);
|
|
||||||
fprintf(stdout,
|
|
||||||
USAGE_STR,
|
|
||||||
argv[0],
|
|
||||||
RdKafka::version_str().c_str(), RdKafka::version(),
|
|
||||||
features.c_str(),
|
|
||||||
RdKafka::get_debug_contexts().c_str());
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
goto usage;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mode.empty() || (topic_str.empty() && mode != "L") || optind != argc) {
|
|
||||||
usage:
|
|
||||||
std::string features;
|
|
||||||
conf->get("builtin.features", features);
|
|
||||||
fprintf(stderr,
|
|
||||||
USAGE_STR,
|
|
||||||
argv[0],
|
|
||||||
RdKafka::version_str().c_str(), RdKafka::version(),
|
|
||||||
features.c_str(),
|
|
||||||
RdKafka::get_debug_contexts().c_str());
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Set configuration properties
|
|
||||||
*/
|
|
||||||
conf->set("metadata.broker.list", brokers, errstr);
|
|
||||||
|
|
||||||
if (!debug.empty()) {
|
|
||||||
if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
|
|
||||||
std::cerr << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ExampleEventCb ex_event_cb;
|
|
||||||
conf->set("event_cb", &ex_event_cb, errstr);
|
|
||||||
|
|
||||||
if (do_conf_dump) {
|
|
||||||
int pass;
|
|
||||||
|
|
||||||
for (pass = 0 ; pass < 2 ; pass++) {
|
|
||||||
std::list<std::string> *dump;
|
|
||||||
if (pass == 0) {
|
|
||||||
dump = conf->dump();
|
|
||||||
std::cout << "# Global config" << std::endl;
|
|
||||||
} else {
|
|
||||||
dump = tconf->dump();
|
|
||||||
std::cout << "# Topic config" << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (std::list<std::string>::iterator it = dump->begin();
|
|
||||||
it != dump->end(); ) {
|
|
||||||
std::cout << *it << " = ";
|
|
||||||
it++;
|
|
||||||
std::cout << *it << std::endl;
|
|
||||||
it++;
|
|
||||||
}
|
|
||||||
std::cout << std::endl;
|
|
||||||
}
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
signal(SIGINT, sigterm);
|
|
||||||
signal(SIGTERM, sigterm);
|
|
||||||
|
|
||||||
|
|
||||||
if (mode == "P") {
|
|
||||||
/*
|
|
||||||
* Producer mode
|
|
||||||
*/
|
|
||||||
|
|
||||||
if(topic_str.empty())
|
|
||||||
goto usage;
|
|
||||||
|
|
||||||
ExampleDeliveryReportCb ex_dr_cb;
|
|
||||||
|
|
||||||
/* Set delivery report callback */
|
|
||||||
conf->set("dr_cb", &ex_dr_cb, errstr);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create producer using accumulated global configuration.
|
|
||||||
*/
|
|
||||||
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
|
|
||||||
if (!producer) {
|
|
||||||
std::cerr << "Failed to create producer: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cout << "% Created producer " << producer->name() << std::endl;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create topic handle.
|
|
||||||
*/
|
|
||||||
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
|
|
||||||
tconf, errstr);
|
|
||||||
if (!topic) {
|
|
||||||
std::cerr << "Failed to create topic: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Read messages from stdin and produce to broker.
|
|
||||||
*/
|
|
||||||
for (std::string line; run && std::getline(std::cin, line);) {
|
|
||||||
if (line.empty()) {
|
|
||||||
producer->poll(0);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Produce message
|
|
||||||
*/
|
|
||||||
RdKafka::ErrorCode resp =
|
|
||||||
producer->produce(topic, partition,
|
|
||||||
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
|
|
||||||
const_cast<char *>(line.c_str()), line.size(),
|
|
||||||
NULL, NULL);
|
|
||||||
if (resp != RdKafka::ERR_NO_ERROR)
|
|
||||||
std::cerr << "% Produce failed: " <<
|
|
||||||
RdKafka::err2str(resp) << std::endl;
|
|
||||||
else
|
|
||||||
std::cerr << "% Produced message (" << line.size() << " bytes)" <<
|
|
||||||
std::endl;
|
|
||||||
|
|
||||||
producer->poll(0);
|
|
||||||
}
|
|
||||||
run = true;
|
|
||||||
|
|
||||||
while (run && producer->outq_len() > 0) {
|
|
||||||
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
|
|
||||||
producer->poll(1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
delete topic;
|
|
||||||
delete producer;
|
|
||||||
|
|
||||||
|
|
||||||
} else if (mode == "C") {
|
|
||||||
/*
|
|
||||||
* Consumer mode
|
|
||||||
*/
|
|
||||||
|
|
||||||
if(topic_str.empty())
|
|
||||||
goto usage;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create consumer using accumulated global configuration.
|
|
||||||
*/
|
|
||||||
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
|
|
||||||
if (!consumer) {
|
|
||||||
std::cerr << "Failed to create consumer: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cout << "% Created consumer " << consumer->name() << std::endl;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create topic handle.
|
|
||||||
*/
|
|
||||||
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str,
|
|
||||||
tconf, errstr);
|
|
||||||
if (!topic) {
|
|
||||||
std::cerr << "Failed to create topic: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Start consumer for topic+partition at start offset
|
|
||||||
*/
|
|
||||||
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
|
|
||||||
if (resp != RdKafka::ERR_NO_ERROR) {
|
|
||||||
std::cerr << "Failed to start consumer: " <<
|
|
||||||
RdKafka::err2str(resp) << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
ExampleConsumeCb ex_consume_cb;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Consume messages
|
|
||||||
*/
|
|
||||||
while (run) {
|
|
||||||
if (use_ccb) {
|
|
||||||
consumer->consume_callback(topic, partition, 1000,
|
|
||||||
&ex_consume_cb, &use_ccb);
|
|
||||||
} else {
|
|
||||||
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
|
|
||||||
msg_consume(msg, NULL);
|
|
||||||
delete msg;
|
|
||||||
}
|
|
||||||
consumer->poll(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Stop consumer
|
|
||||||
*/
|
|
||||||
consumer->stop(topic, partition);
|
|
||||||
|
|
||||||
consumer->poll(1000);
|
|
||||||
|
|
||||||
delete topic;
|
|
||||||
delete consumer;
|
|
||||||
} else {
|
|
||||||
/* Metadata mode */
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create producer using accumulated global configuration.
|
|
||||||
*/
|
|
||||||
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
|
|
||||||
if (!producer) {
|
|
||||||
std::cerr << "Failed to create producer: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cout << "% Created producer " << producer->name() << std::endl;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create topic handle.
|
|
||||||
*/
|
|
||||||
RdKafka::Topic *topic = NULL;
|
|
||||||
if(!topic_str.empty()) {
|
|
||||||
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
|
|
||||||
if (!topic) {
|
|
||||||
std::cerr << "Failed to create topic: " << errstr << std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (run) {
|
|
||||||
class RdKafka::Metadata *metadata;
|
|
||||||
|
|
||||||
/* Fetch metadata */
|
|
||||||
RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic,
|
|
||||||
&metadata, 5000);
|
|
||||||
if (err != RdKafka::ERR_NO_ERROR) {
|
|
||||||
std::cerr << "%% Failed to acquire metadata: "
|
|
||||||
<< RdKafka::err2str(err) << std::endl;
|
|
||||||
run = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
metadata_print(topic_str, metadata);
|
|
||||||
|
|
||||||
delete metadata;
|
|
||||||
run = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Wait for RdKafka to decommission.
|
|
||||||
* This is not strictly needed (when check outq_len() above), but
|
|
||||||
* allows RdKafka to clean up all its resources before the application
|
|
||||||
* exits so that memory profilers such as valgrind wont complain about
|
|
||||||
* memory leaks.
|
|
||||||
*/
|
|
||||||
RdKafka::wait_destroyed(5000);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
@ -1,37 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
set -e
|
|
||||||
set -x
|
|
||||||
|
|
||||||
TEST_SOURCE=kafka-test
|
|
||||||
|
|
||||||
C_TEST_TARGET="${TEST_SOURCE}.c"
|
|
||||||
CXX_TEST_TARGET="${TEST_SOURCE}.cpp"
|
|
||||||
|
|
||||||
C_BINARY="${TEST_SOURCE}-c"
|
|
||||||
CXX_BINARY="${TEST_SOURCE}-cpp"
|
|
||||||
|
|
||||||
CFLAGS="$(rpm --eval '%{build_cflags}')"
|
|
||||||
CXXFLAGS="$(rpm --eval '%{build_cxxflags}')"
|
|
||||||
|
|
||||||
LDFLAGS="$(rpm --eval '%{build_ldflags}')"
|
|
||||||
|
|
||||||
LIBCFLAGS="$(pkg-config rdkafka --libs --cflags)"
|
|
||||||
LIBCXXFLAGS="$(pkg-config rdkafka++ --libs --cflags)"
|
|
||||||
|
|
||||||
# build target using distribution-specific flags
|
|
||||||
gcc -std=c11 $CFLAGS $LDFLAGS -o $C_BINARY $C_TEST_TARGET $LIBCFLAGS
|
|
||||||
g++ -std=c++11 $CXXFLAGS $LDFLAGS -o $CXX_BINARY $CXX_TEST_TARGET $LIBCXXFLAGS
|
|
||||||
|
|
||||||
|
|
||||||
# test that target exists
|
|
||||||
test -f ./$C_BINARY
|
|
||||||
test -f ./$CXX_BINARY
|
|
||||||
|
|
||||||
# test that target is executable
|
|
||||||
test -x ./$C_BINARY
|
|
||||||
test -x ./$CXX_BINARY
|
|
||||||
|
|
||||||
|
|
||||||
# test that target runs successfully
|
|
||||||
./$C_BINARY -h
|
|
||||||
./$CXX_BINARY -h
|
|
@ -1,14 +0,0 @@
|
|||||||
---
|
|
||||||
- hosts: localhost
|
|
||||||
tags:
|
|
||||||
- classic
|
|
||||||
roles:
|
|
||||||
- role: standard-test-basic
|
|
||||||
tests:
|
|
||||||
- devel-usability
|
|
||||||
required_packages:
|
|
||||||
- gcc-c++
|
|
||||||
- pkgconf-pkg-config
|
|
||||||
- rpm
|
|
||||||
- redhat-rpm-config
|
|
||||||
- librdkafka-devel
|
|
Loading…
Reference in New Issue
Block a user