diff --git a/.gitignore b/.gitignore index 202488c..f14b0fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ -SOURCES/librdkafka-0.11.4.tar.gz +/librdkafka-0.11.0.tar.gz +/librdkafka-0.11.5.tar.gz /librdkafka-0.11.4.tar.gz diff --git a/tests/.fmf/version b/tests/.fmf/version new file mode 100644 index 0000000..d00491f --- /dev/null +++ b/tests/.fmf/version @@ -0,0 +1 @@ +1 diff --git a/tests/devel-usability/kafka-test.c b/tests/devel-usability/kafka-test.c new file mode 100644 index 0000000..96a31ff --- /dev/null +++ b/tests/devel-usability/kafka-test.c @@ -0,0 +1,822 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Apache Kafka consumer & producer example programs + * using the Kafka driver from librdkafka + * (https://github.com/edenhill/librdkafka) + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include /* for Kafka driver */ + + + +const char * USAGE_STR ="Usage: %s -C|-P|-L -t " + "[-p ] [-b ]\n" + "\n" + "librdkafka version %s (0x%08x)\n" + "\n" + " Options:\n" + " -C | -P Consumer or Producer mode\n" + " -L Metadata list mode\n" + " -t Topic to fetch / produce\n" + " -p Partition (random partitioner)\n" + " -b Broker address (localhost:9092)\n" + " -z Enable compression:\n" + " none|gzip|snappy\n" + " -o Start offset (consumer):\n" + " beginning, end, NNNNN or -NNNNN\n" + " wmark returns the current hi&lo " + "watermarks.\n" + " -o report Report message offsets (producer)\n" + " -e Exit consumer when last message\n" + " in partition has been received.\n" + " -d [facs..] Enable debugging contexts:\n" + " %s\n" + " -q Be quiet\n" + " -A Raw payload output (consumer)\n" + " -X Set arbitrary librdkafka " + "configuration property\n" + " Properties prefixed with \"topic.\" " + "will be set on topic object.\n" + " -X list Show full list of supported " + "properties.\n" + " -X Get single property value\n" + "\n" + " In Consumer mode:\n" + " writes fetched messages to stdout\n" + " In Producer mode:\n" + " reads messages from stdin and sends to broker\n" + " In List mode:\n" + " queries broker for metadata information, " + "topic is optional.\n" + "\n" + "\n" + "\n"; + + + + + +static int run = 1; +static rd_kafka_t *rk; +static int exit_eof = 0; +static int quiet = 0; +static enum { + OUTPUT_HEXDUMP, + OUTPUT_RAW, +} output = OUTPUT_HEXDUMP; + +static void stop (int sig) { + run = 0; + fclose(stdin); /* abort fgets() */ +} + + +static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) { + const char *p = (const char *)ptr; + size_t of = 0; + + + if (name) + fprintf(fp, "%s hexdump (%zd bytes):\n", name, len); + + for (of = 0 ; of < len ; of += 16) { + char hexen[16*3+1]; + char charen[16+1]; + int hof = 0; + + int cof = 0; + int i; + + for (i = of ; i < (int)of + 16 && i < (int)len ; i++) { + hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff); + cof += sprintf(charen+cof, "%c", + isprint((int)p[i]) ? p[i] : '.'); + } + fprintf(fp, "%08zx: %-48s %-16s\n", + of, hexen, charen); + } +} + +/** + * Kafka logger callback (optional) + */ +static void logger (const rd_kafka_t *rk, int level, + const char *fac, const char *buf) { + struct timeval tv; + gettimeofday(&tv, NULL); + fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf); +} + +/** + * Message delivery report callback. + * Called once for each message. + * See rdkafka.h for more information. + */ +static void msg_delivered (rd_kafka_t *rk, + void *payload, size_t len, + int error_code, + void *opaque, void *msg_opaque) { + + if (error_code) + fprintf(stderr, "%% Message delivery failed: %s\n", + rd_kafka_err2str(error_code)); + else if (!quiet) + fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len, + (int)len, (const char *)payload); +} + +/** + * Message delivery report callback using the richer rd_kafka_message_t object. + */ +static void msg_delivered2 (rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, void *opaque) { + printf("del: %s: offset %"PRId64"\n", + rd_kafka_err2str(rkmessage->err), rkmessage->offset); + if (rkmessage->err) + fprintf(stderr, "%% Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + else if (!quiet) + fprintf(stderr, + "%% Message delivered (%zd bytes, offset %"PRId64", " + "partition %"PRId32"): %.*s\n", + rkmessage->len, rkmessage->offset, + rkmessage->partition, + (int)rkmessage->len, (const char *)rkmessage->payload); +} + + +static void msg_consume (rd_kafka_message_t *rkmessage, + void *opaque) { + if (rkmessage->err) { + if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + fprintf(stderr, + "%% Consumer reached end of %s [%"PRId32"] " + "message queue at offset %"PRId64"\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, rkmessage->offset); + + if (exit_eof) + run = 0; + + return; + } + + fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] " + "offset %"PRId64": %s\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, + rkmessage->offset, + rd_kafka_message_errstr(rkmessage)); + + if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || + rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) + run = 0; + return; + } + + if (!quiet) { + rd_kafka_timestamp_type_t tstype; + int64_t timestamp; + fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n", + rkmessage->offset, rkmessage->len); + + timestamp = rd_kafka_message_timestamp(rkmessage, &tstype); + if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + const char *tsname = "?"; + if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) + tsname = "create time"; + else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) + tsname = "log append time"; + + fprintf(stdout, "%% Message timestamp: %s %"PRId64 + " (%ds ago)\n", + tsname, timestamp, + !timestamp ? 0 : + (int)time(NULL) - (int)(timestamp/1000)); + } + } + + if (rkmessage->key_len) { + if (output == OUTPUT_HEXDUMP) + hexdump(stdout, "Message Key", + rkmessage->key, rkmessage->key_len); + else + printf("Key: %.*s\n", + (int)rkmessage->key_len, (char *)rkmessage->key); + } + + if (output == OUTPUT_HEXDUMP) + hexdump(stdout, "Message Payload", + rkmessage->payload, rkmessage->len); + else + printf("%.*s\n", + (int)rkmessage->len, (char *)rkmessage->payload); +} + + +static void metadata_print (const char *topic, + const struct rd_kafka_metadata *metadata) { + int i, j, k; + + printf("Metadata for %s (from broker %"PRId32": %s):\n", + topic ? : "all topics", + metadata->orig_broker_id, + metadata->orig_broker_name); + + + /* Iterate brokers */ + printf(" %i brokers:\n", metadata->broker_cnt); + for (i = 0 ; i < metadata->broker_cnt ; i++) + printf(" broker %"PRId32" at %s:%i\n", + metadata->brokers[i].id, + metadata->brokers[i].host, + metadata->brokers[i].port); + + /* Iterate topics */ + printf(" %i topics:\n", metadata->topic_cnt); + for (i = 0 ; i < metadata->topic_cnt ; i++) { + const struct rd_kafka_metadata_topic *t = &metadata->topics[i]; + printf(" topic \"%s\" with %i partitions:", + t->topic, + t->partition_cnt); + if (t->err) { + printf(" %s", rd_kafka_err2str(t->err)); + if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) + printf(" (try again)"); + } + printf("\n"); + + /* Iterate topic's partitions */ + for (j = 0 ; j < t->partition_cnt ; j++) { + const struct rd_kafka_metadata_partition *p; + p = &t->partitions[j]; + printf(" partition %"PRId32", " + "leader %"PRId32", replicas: ", + p->id, p->leader); + + /* Iterate partition's replicas */ + for (k = 0 ; k < p->replica_cnt ; k++) + printf("%s%"PRId32, + k > 0 ? ",":"", p->replicas[k]); + + /* Iterate partition's ISRs */ + printf(", isrs: "); + for (k = 0 ; k < p->isr_cnt ; k++) + printf("%s%"PRId32, + k > 0 ? ",":"", p->isrs[k]); + if (p->err) + printf(", %s\n", rd_kafka_err2str(p->err)); + else + printf("\n"); + } + } +} + + +static void sig_usr1 (int sig) { + rd_kafka_dump(stdout, rk); +} + +int main (int argc, char **argv) { + rd_kafka_topic_t *rkt; + char *brokers = "localhost:9092"; + char mode = 'C'; + char *topic = NULL; + int partition = RD_KAFKA_PARTITION_UA; + int opt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char errstr[512]; + int64_t start_offset = 0; + int report_offsets = 0; + int do_conf_dump = 0; + char tmp[16]; + int64_t seek_offset = 0; + int64_t tmp_offset = 0; + int get_wmarks = 0; + + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + + /* Set logger */ + rd_kafka_conf_set_log_cb(conf, logger); + + /* Quick termination */ + snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + + /* Topic configuration */ + topic_conf = rd_kafka_topic_conf_new(); + + while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:As:")) != -1) { + switch (opt) { + case 'P': + case 'C': + case 'L': + mode = opt; + break; + case 't': + topic = optarg; + break; + case 'p': + partition = atoi(optarg); + break; + case 'b': + brokers = optarg; + break; + case 'z': + if (rd_kafka_conf_set(conf, "compression.codec", + optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + break; + case 'o': + case 's': + if (!strcmp(optarg, "end")) + tmp_offset = RD_KAFKA_OFFSET_END; + else if (!strcmp(optarg, "beginning")) + tmp_offset = RD_KAFKA_OFFSET_BEGINNING; + else if (!strcmp(optarg, "stored")) + tmp_offset = RD_KAFKA_OFFSET_STORED; + else if (!strcmp(optarg, "report")) + report_offsets = 1; + else if (!strcmp(optarg, "wmark")) + get_wmarks = 1; + else { + tmp_offset = strtoll(optarg, NULL, 10); + + if (tmp_offset < 0) + tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset); + } + + if (opt == 'o') + start_offset = tmp_offset; + else if (opt == 's') + seek_offset = tmp_offset; + break; + case 'e': + exit_eof = 1; + break; + case 'd': + if (rd_kafka_conf_set(conf, "debug", optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, + "%% Debug configuration failed: " + "%s: %s\n", + errstr, optarg); + exit(1); + } + break; + case 'q': + quiet = 1; + break; + case 'A': + output = OUTPUT_RAW; + break; + case 'X': + { + char *name, *val; + rd_kafka_conf_res_t res; + + if (!strcmp(optarg, "list") || + !strcmp(optarg, "help")) { + rd_kafka_conf_properties_show(stdout); + exit(0); + } + + if (!strcmp(optarg, "dump")) { + do_conf_dump = 1; + continue; + } + + name = optarg; + if (!(val = strchr(name, '='))) { + char dest[512]; + size_t dest_size = sizeof(dest); + /* Return current value for property. */ + + res = RD_KAFKA_CONF_UNKNOWN; + if (!strncmp(name, "topic.", strlen("topic."))) + res = rd_kafka_topic_conf_get( + topic_conf, + name+strlen("topic."), + dest, &dest_size); + if (res == RD_KAFKA_CONF_UNKNOWN) + res = rd_kafka_conf_get( + conf, name, dest, &dest_size); + + if (res == RD_KAFKA_CONF_OK) { + printf("%s = %s\n", name, dest); + exit(0); + } else { + fprintf(stderr, + "%% %s property\n", + res == RD_KAFKA_CONF_UNKNOWN ? + "Unknown" : "Invalid"); + exit(1); + } + } + + *val = '\0'; + val++; + + res = RD_KAFKA_CONF_UNKNOWN; + /* Try "topic." prefixed properties on topic + * conf first, and then fall through to global if + * it didnt match a topic configuration property. */ + if (!strncmp(name, "topic.", strlen("topic."))) + res = rd_kafka_topic_conf_set(topic_conf, + name+ + strlen("topic."), + val, + errstr, + sizeof(errstr)); + + if (res == RD_KAFKA_CONF_UNKNOWN) + res = rd_kafka_conf_set(conf, name, val, + errstr, sizeof(errstr)); + + if (res != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + } + break; + + case 'h': + fprintf(stdout, + USAGE_STR, + argv[0], + rd_kafka_version_str(), rd_kafka_version(), + RD_KAFKA_DEBUG_CONTEXTS); + exit(0); + break; + + default: + goto usage; + } + } + + + if (do_conf_dump) { + const char **arr; + size_t cnt; + int pass; + + for (pass = 0 ; pass < 2 ; pass++) { + int i; + + if (pass == 0) { + arr = rd_kafka_conf_dump(conf, &cnt); + printf("# Global config\n"); + } else { + printf("# Topic config\n"); + arr = rd_kafka_topic_conf_dump(topic_conf, + &cnt); + } + + for (i = 0 ; i < (int)cnt ; i += 2) + printf("%s = %s\n", + arr[i], arr[i+1]); + + printf("\n"); + + rd_kafka_conf_dump_free(arr, cnt); + } + + exit(0); + } + + + if (optind != argc || (mode != 'L' && !topic)) { + usage: + fprintf(stderr, + USAGE_STR, + argv[0], + rd_kafka_version_str(), rd_kafka_version(), + RD_KAFKA_DEBUG_CONTEXTS); + exit(1); + } + + if ((mode == 'C' && !isatty(STDIN_FILENO)) || + (mode != 'C' && !isatty(STDOUT_FILENO))) + quiet = 1; + + + signal(SIGINT, stop); + signal(SIGUSR1, sig_usr1); + + if (mode == 'P') { + /* + * Producer + */ + char buf[2048]; + int sendcnt = 0; + + /* Set up a message delivery report callback. + * It will be called once for each message, either on successful + * delivery to broker, or upon failure to deliver to broker. */ + + /* If offset reporting (-o report) is enabled, use the + * richer dr_msg_cb instead. */ + if (report_offsets) { + rd_kafka_topic_conf_set(topic_conf, + "produce.offset.report", + "true", errstr, sizeof(errstr)); + rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2); + } else + rd_kafka_conf_set_dr_cb(conf, msg_delivered); + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errstr, sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new producer: %s\n", + errstr); + exit(1); + } + + /* Add brokers */ + if (rd_kafka_brokers_add(rk, brokers) == 0) { + fprintf(stderr, "%% No valid brokers specified\n"); + exit(1); + } + + /* Create topic */ + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + + if (!quiet) + fprintf(stderr, + "%% Type stuff and hit enter to send\n"); + + while (run && fgets(buf, sizeof(buf), stdin)) { + size_t len = strlen(buf); + if (buf[len-1] == '\n') + buf[--len] = '\0'; + + /* Send/Produce message. */ + if (rd_kafka_produce(rkt, partition, + RD_KAFKA_MSG_F_COPY, + /* Payload and length */ + buf, len, + /* Optional key and its length */ + NULL, 0, + /* Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + NULL) == -1) { + fprintf(stderr, + "%% Failed to produce to topic %s " + "partition %i: %s\n", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(rd_kafka_last_error())); + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + continue; + } + + if (!quiet) + fprintf(stderr, "%% Sent %zd bytes to topic " + "%s partition %i\n", + len, rd_kafka_topic_name(rkt), partition); + sendcnt++; + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + } + + /* Poll to handle delivery reports */ + rd_kafka_poll(rk, 0); + + /* Wait for messages to be delivered */ + while (run && rd_kafka_outq_len(rk) > 0) + rd_kafka_poll(rk, 100); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + + } else if (mode == 'C') { + /* + * Consumer + */ + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, + errstr, sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new consumer: %s\n", + errstr); + exit(1); + } + + /* Add brokers */ + if (rd_kafka_brokers_add(rk, brokers) == 0) { + fprintf(stderr, "%% No valid brokers specified\n"); + exit(1); + } + + if (get_wmarks) { + int64_t lo, hi; + rd_kafka_resp_err_t err; + + /* Only query for hi&lo partition watermarks */ + + if ((err = rd_kafka_query_watermark_offsets( + rk, topic, partition, &lo, &hi, 5000))) { + fprintf(stderr, "%% query_watermark_offsets() " + "failed: %s\n", + rd_kafka_err2str(err)); + exit(1); + } + + printf("%s [%d]: low - high offsets: " + "%"PRId64" - %"PRId64"\n", + topic, partition, lo, hi); + + rd_kafka_destroy(rk); + exit(0); + } + + + /* Create topic */ + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + + /* Start consuming */ + if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){ + rd_kafka_resp_err_t err = rd_kafka_last_error(); + fprintf(stderr, "%% Failed to start consuming: %s\n", + rd_kafka_err2str(err)); + if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) + fprintf(stderr, + "%% Broker based offset storage " + "requires a group.id, " + "add: -X group.id=yourGroup\n"); + exit(1); + } + + while (run) { + rd_kafka_message_t *rkmessage; + rd_kafka_resp_err_t err; + + /* Poll for errors, etc. */ + rd_kafka_poll(rk, 0); + + /* Consume single message. + * See rdkafka_performance.c for high speed + * consuming of messages. */ + rkmessage = rd_kafka_consume(rkt, partition, 1000); + if (!rkmessage) /* timeout */ + continue; + + msg_consume(rkmessage, NULL); + + /* Return message to rdkafka */ + rd_kafka_message_destroy(rkmessage); + + if (seek_offset) { + err = rd_kafka_seek(rkt, partition, seek_offset, + 2000); + if (err) + printf("Seek failed: %s\n", + rd_kafka_err2str(err)); + else + printf("Seeked to %"PRId64"\n", + seek_offset); + seek_offset = 0; + } + } + + /* Stop consuming */ + rd_kafka_consume_stop(rkt, partition); + + while (rd_kafka_outq_len(rk) > 0) + rd_kafka_poll(rk, 10); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy handle */ + rd_kafka_destroy(rk); + + } else if (mode == 'L') { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Create Kafka handle */ + if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errstr, sizeof(errstr)))) { + fprintf(stderr, + "%% Failed to create new producer: %s\n", + errstr); + exit(1); + } + + /* Add brokers */ + if (rd_kafka_brokers_add(rk, brokers) == 0) { + fprintf(stderr, "%% No valid brokers specified\n"); + exit(1); + } + + /* Create topic */ + if (topic) { + rkt = rd_kafka_topic_new(rk, topic, topic_conf); + topic_conf = NULL; /* Now owned by topic */ + } else + rkt = NULL; + + while (run) { + const struct rd_kafka_metadata *metadata; + + /* Fetch metadata */ + err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt, + &metadata, 5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + fprintf(stderr, + "%% Failed to acquire metadata: %s\n", + rd_kafka_err2str(err)); + run = 0; + break; + } + + metadata_print(topic, metadata); + + rd_kafka_metadata_destroy(metadata); + run = 0; + } + + /* Destroy topic */ + if (rkt) + rd_kafka_topic_destroy(rkt); + + /* Destroy the handle */ + rd_kafka_destroy(rk); + + if (topic_conf) + rd_kafka_topic_conf_destroy(topic_conf); + + + /* Exit right away, dont wait for background cleanup, we haven't + * done anything important anyway. */ + exit(err ? 2 : 0); + } + + if (topic_conf) + rd_kafka_topic_conf_destroy(topic_conf); + + /* Let background threads clean up and terminate cleanly. */ + run = 5; + while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) + printf("Waiting for librdkafka to decommission\n"); + if (run <= 0) + rd_kafka_dump(stdout, rk); + + return 0; +} diff --git a/tests/devel-usability/kafka-test.cpp b/tests/devel-usability/kafka-test.cpp new file mode 100644 index 0000000..dec0a7c --- /dev/null +++ b/tests/devel-usability/kafka-test.cpp @@ -0,0 +1,662 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2014, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Apache Kafka consumer & producer example programs + * using the Kafka driver from librdkafka + * (https://github.com/edenhill/librdkafka) + */ + +#include +#include +#include +#include +#include +#include + +#ifdef _MSC_VER +#include "../win32/wingetopt.h" +#elif _AIX +#include +#else +#include +#endif + +/* + * Typically include path in a real application would be + * #include + */ +#include + +const char * USAGE_STR = + "Usage: %s [-C|-P] -t " + "[-p ] [-b ]\n" + "\n" + "librdkafka version %s (0x%08x, builtin.features \"%s\")\n" + "\n" + " Options:\n" + " -C | -P Consumer or Producer mode\n" + " -L Metadata list mode\n" + " -t Topic to fetch / produce\n" + " -p Partition (random partitioner)\n" + " -p Use partitioner:\n" + " random (default), hash\n" + " -b Broker address (localhost:9092)\n" + " -z Enable compression:\n" + " none|gzip|snappy\n" + " -o Start offset (consumer)\n" + " -e Exit consumer when last message\n" + " in partition has been received.\n" + " -d [facs..] Enable debugging contexts:\n" + " %s\n" + " -M Enable statistics\n" + " -X Set arbitrary librdkafka " + "configuration property\n" + " Properties prefixed with \"topic.\" " + "will be set on topic object.\n" + " Use '-X list' to see the full list\n" + " of supported properties.\n" + " -f Set option:\n" + " ccb - use consume_callback\n" + "\n" + " In Consumer mode:\n" + " writes fetched messages to stdout\n" + " In Producer mode:\n" + " reads messages from stdin and sends to broker\n" + "\n" + "\n" + "\n"; + + +static void metadata_print (const std::string &topic, + const RdKafka::Metadata *metadata) { + std::cout << "Metadata for " << (topic.empty() ? "" : "all topics") + << "(from broker " << metadata->orig_broker_id() + << ":" << metadata->orig_broker_name() << std::endl; + + /* Iterate brokers */ + std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl; + RdKafka::Metadata::BrokerMetadataIterator ib; + for (ib = metadata->brokers()->begin(); + ib != metadata->brokers()->end(); + ++ib) { + std::cout << " broker " << (*ib)->id() << " at " + << (*ib)->host() << ":" << (*ib)->port() << std::endl; + } + /* Iterate topics */ + std::cout << metadata->topics()->size() << " topics:" << std::endl; + RdKafka::Metadata::TopicMetadataIterator it; + for (it = metadata->topics()->begin(); + it != metadata->topics()->end(); + ++it) { + std::cout << " topic \""<< (*it)->topic() << "\" with " + << (*it)->partitions()->size() << " partitions:"; + + if ((*it)->err() != RdKafka::ERR_NO_ERROR) { + std::cout << " " << err2str((*it)->err()); + if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) + std::cout << " (try again)"; + } + std::cout << std::endl; + + /* Iterate topic's partitions */ + RdKafka::TopicMetadata::PartitionMetadataIterator ip; + for (ip = (*it)->partitions()->begin(); + ip != (*it)->partitions()->end(); + ++ip) { + std::cout << " partition " << (*ip)->id() + << ", leader " << (*ip)->leader() + << ", replicas: "; + + /* Iterate partition's replicas */ + RdKafka::PartitionMetadata::ReplicasIterator ir; + for (ir = (*ip)->replicas()->begin(); + ir != (*ip)->replicas()->end(); + ++ir) { + std::cout << (ir == (*ip)->replicas()->begin() ? "":",") << *ir; + } + + /* Iterate partition's ISRs */ + std::cout << ", isrs: "; + RdKafka::PartitionMetadata::ISRSIterator iis; + for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis) + std::cout << (iis == (*ip)->isrs()->begin() ? "":",") << *iis; + + if ((*ip)->err() != RdKafka::ERR_NO_ERROR) + std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl; + else + std::cout << std::endl; + } + } +} + +static bool run = true; +static bool exit_eof = false; + +static void sigterm (int sig) { + run = false; +} + + +class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { + public: + void dr_cb (RdKafka::Message &message) { + std::cout << "Message delivery for (" << message.len() << " bytes): " << + message.errstr() << std::endl; + if (message.key()) + std::cout << "Key: " << *(message.key()) << ";" << std::endl; + } +}; + + +class ExampleEventCb : public RdKafka::EventCb { + public: + void event_cb (RdKafka::Event &event) { + switch (event.type()) + { + case RdKafka::Event::EVENT_ERROR: + std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << + event.str() << std::endl; + if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) + run = false; + break; + + case RdKafka::Event::EVENT_STATS: + std::cerr << "\"STATS\": " << event.str() << std::endl; + break; + + case RdKafka::Event::EVENT_LOG: + fprintf(stderr, "LOG-%i-%s: %s\n", + event.severity(), event.fac().c_str(), event.str().c_str()); + break; + + default: + std::cerr << "EVENT " << event.type() << + " (" << RdKafka::err2str(event.err()) << "): " << + event.str() << std::endl; + break; + } + } +}; + + +/* Use of this partitioner is pretty pointless since no key is provided + * in the produce() call. */ +class MyHashPartitionerCb : public RdKafka::PartitionerCb { + public: + int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key, + int32_t partition_cnt, void *msg_opaque) { + return djb_hash(key->c_str(), key->size()) % partition_cnt; + } + private: + + static inline unsigned int djb_hash (const char *str, size_t len) { + unsigned int hash = 5381; + for (size_t i = 0 ; i < len ; i++) + hash = ((hash << 5) + hash) + str[i]; + return hash; + } +}; + +void msg_consume(RdKafka::Message* message, void* opaque) { + switch (message->err()) { + case RdKafka::ERR__TIMED_OUT: + break; + + case RdKafka::ERR_NO_ERROR: + /* Real message */ + std::cout << "Read msg at offset " << message->offset() << std::endl; + if (message->key()) { + std::cout << "Key: " << *message->key() << std::endl; + } + printf("%.*s\n", + static_cast(message->len()), + static_cast(message->payload())); + break; + + case RdKafka::ERR__PARTITION_EOF: + /* Last message */ + if (exit_eof) { + run = false; + } + break; + + case RdKafka::ERR__UNKNOWN_TOPIC: + case RdKafka::ERR__UNKNOWN_PARTITION: + std::cerr << "Consume failed: " << message->errstr() << std::endl; + run = false; + break; + + default: + /* Errors */ + std::cerr << "Consume failed: " << message->errstr() << std::endl; + run = false; + } +} + + +class ExampleConsumeCb : public RdKafka::ConsumeCb { + public: + void consume_cb (RdKafka::Message &msg, void *opaque) { + msg_consume(&msg, opaque); + } +}; + + + +int main (int argc, char **argv) { + std::string brokers = "localhost"; + std::string errstr; + std::string topic_str; + std::string mode; + std::string debug; + int32_t partition = RdKafka::Topic::PARTITION_UA; + int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; + bool do_conf_dump = false; + int opt; + MyHashPartitionerCb hash_partitioner; + int use_ccb = 0; + + /* + * Create configuration objects + */ + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + + + while ((opt = getopt(argc, argv, "hPCLt:p:b:z:qd:o:eX:AM:f:")) != -1) { + switch (opt) { + case 'P': + case 'C': + case 'L': + mode = opt; + break; + case 't': + topic_str = optarg; + break; + case 'p': + if (!strcmp(optarg, "random")) + /* default */; + else if (!strcmp(optarg, "hash")) { + if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + } else + partition = std::atoi(optarg); + break; + case 'b': + brokers = optarg; + break; + case 'z': + if (conf->set("compression.codec", optarg, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + break; + case 'o': + if (!strcmp(optarg, "end")) + start_offset = RdKafka::Topic::OFFSET_END; + else if (!strcmp(optarg, "beginning")) + start_offset = RdKafka::Topic::OFFSET_BEGINNING; + else if (!strcmp(optarg, "stored")) + start_offset = RdKafka::Topic::OFFSET_STORED; + else + start_offset = strtoll(optarg, NULL, 10); + break; + case 'e': + exit_eof = true; + break; + case 'd': + debug = optarg; + break; + case 'M': + if (conf->set("statistics.interval.ms", optarg, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + break; + case 'X': + { + char *name, *val; + + if (!strcmp(optarg, "dump")) { + do_conf_dump = true; + continue; + } + + name = optarg; + if (!(val = strchr(name, '='))) { + std::cerr << "%% Expected -X property=value, not " << + name << std::endl; + exit(1); + } + + *val = '\0'; + val++; + + /* Try "topic." prefixed properties on topic + * conf first, and then fall through to global if + * it didnt match a topic configuration property. */ + RdKafka::Conf::ConfResult res; + if (!strncmp(name, "topic.", strlen("topic."))) + res = tconf->set(name+strlen("topic."), val, errstr); + else + res = conf->set(name, val, errstr); + + if (res != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + } + break; + + case 'f': + if (!strcmp(optarg, "ccb")) + use_ccb = 1; + else { + std::cerr << "Unknown option: " << optarg << std::endl; + exit(1); + } + break; + + case 'h': + { + std::string features; + conf->get("builtin.features", features); + fprintf(stdout, + USAGE_STR, + argv[0], + RdKafka::version_str().c_str(), RdKafka::version(), + features.c_str(), + RdKafka::get_debug_contexts().c_str()); + exit(0); + } + break; + + default: + goto usage; + } + } + + if (mode.empty() || (topic_str.empty() && mode != "L") || optind != argc) { + usage: + std::string features; + conf->get("builtin.features", features); + fprintf(stderr, + USAGE_STR, + argv[0], + RdKafka::version_str().c_str(), RdKafka::version(), + features.c_str(), + RdKafka::get_debug_contexts().c_str()); + exit(1); + } + + + /* + * Set configuration properties + */ + conf->set("metadata.broker.list", brokers, errstr); + + if (!debug.empty()) { + if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + } + + ExampleEventCb ex_event_cb; + conf->set("event_cb", &ex_event_cb, errstr); + + if (do_conf_dump) { + int pass; + + for (pass = 0 ; pass < 2 ; pass++) { + std::list *dump; + if (pass == 0) { + dump = conf->dump(); + std::cout << "# Global config" << std::endl; + } else { + dump = tconf->dump(); + std::cout << "# Topic config" << std::endl; + } + + for (std::list::iterator it = dump->begin(); + it != dump->end(); ) { + std::cout << *it << " = "; + it++; + std::cout << *it << std::endl; + it++; + } + std::cout << std::endl; + } + exit(0); + } + + signal(SIGINT, sigterm); + signal(SIGTERM, sigterm); + + + if (mode == "P") { + /* + * Producer mode + */ + + if(topic_str.empty()) + goto usage; + + ExampleDeliveryReportCb ex_dr_cb; + + /* Set delivery report callback */ + conf->set("dr_cb", &ex_dr_cb, errstr); + + /* + * Create producer using accumulated global configuration. + */ + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + if (!producer) { + std::cerr << "Failed to create producer: " << errstr << std::endl; + exit(1); + } + + std::cout << "% Created producer " << producer->name() << std::endl; + + /* + * Create topic handle. + */ + RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, + tconf, errstr); + if (!topic) { + std::cerr << "Failed to create topic: " << errstr << std::endl; + exit(1); + } + + /* + * Read messages from stdin and produce to broker. + */ + for (std::string line; run && std::getline(std::cin, line);) { + if (line.empty()) { + producer->poll(0); + continue; + } + + /* + * Produce message + */ + RdKafka::ErrorCode resp = + producer->produce(topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast(line.c_str()), line.size(), + NULL, NULL); + if (resp != RdKafka::ERR_NO_ERROR) + std::cerr << "% Produce failed: " << + RdKafka::err2str(resp) << std::endl; + else + std::cerr << "% Produced message (" << line.size() << " bytes)" << + std::endl; + + producer->poll(0); + } + run = true; + + while (run && producer->outq_len() > 0) { + std::cerr << "Waiting for " << producer->outq_len() << std::endl; + producer->poll(1000); + } + + delete topic; + delete producer; + + + } else if (mode == "C") { + /* + * Consumer mode + */ + + if(topic_str.empty()) + goto usage; + + /* + * Create consumer using accumulated global configuration. + */ + RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr); + if (!consumer) { + std::cerr << "Failed to create consumer: " << errstr << std::endl; + exit(1); + } + + std::cout << "% Created consumer " << consumer->name() << std::endl; + + /* + * Create topic handle. + */ + RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, + tconf, errstr); + if (!topic) { + std::cerr << "Failed to create topic: " << errstr << std::endl; + exit(1); + } + + /* + * Start consumer for topic+partition at start offset + */ + RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "Failed to start consumer: " << + RdKafka::err2str(resp) << std::endl; + exit(1); + } + + ExampleConsumeCb ex_consume_cb; + + /* + * Consume messages + */ + while (run) { + if (use_ccb) { + consumer->consume_callback(topic, partition, 1000, + &ex_consume_cb, &use_ccb); + } else { + RdKafka::Message *msg = consumer->consume(topic, partition, 1000); + msg_consume(msg, NULL); + delete msg; + } + consumer->poll(0); + } + + /* + * Stop consumer + */ + consumer->stop(topic, partition); + + consumer->poll(1000); + + delete topic; + delete consumer; + } else { + /* Metadata mode */ + + /* + * Create producer using accumulated global configuration. + */ + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + if (!producer) { + std::cerr << "Failed to create producer: " << errstr << std::endl; + exit(1); + } + + std::cout << "% Created producer " << producer->name() << std::endl; + + /* + * Create topic handle. + */ + RdKafka::Topic *topic = NULL; + if(!topic_str.empty()) { + topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); + if (!topic) { + std::cerr << "Failed to create topic: " << errstr << std::endl; + exit(1); + } + } + + while (run) { + class RdKafka::Metadata *metadata; + + /* Fetch metadata */ + RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, + &metadata, 5000); + if (err != RdKafka::ERR_NO_ERROR) { + std::cerr << "%% Failed to acquire metadata: " + << RdKafka::err2str(err) << std::endl; + run = 0; + break; + } + + metadata_print(topic_str, metadata); + + delete metadata; + run = 0; + } + + } + + + /* + * Wait for RdKafka to decommission. + * This is not strictly needed (when check outq_len() above), but + * allows RdKafka to clean up all its resources before the application + * exits so that memory profilers such as valgrind wont complain about + * memory leaks. + */ + RdKafka::wait_destroyed(5000); + + return 0; +} diff --git a/tests/devel-usability/runtest.sh b/tests/devel-usability/runtest.sh new file mode 100755 index 0000000..2567d35 --- /dev/null +++ b/tests/devel-usability/runtest.sh @@ -0,0 +1,37 @@ +#!/bin/bash +set -e +set -x + +TEST_SOURCE=kafka-test + +C_TEST_TARGET="${TEST_SOURCE}.c" +CXX_TEST_TARGET="${TEST_SOURCE}.cpp" + +C_BINARY="${TEST_SOURCE}-c" +CXX_BINARY="${TEST_SOURCE}-cpp" + +CFLAGS="$(rpm --eval '%{build_cflags}')" +CXXFLAGS="$(rpm --eval '%{build_cxxflags}')" + +LDFLAGS="$(rpm --eval '%{build_ldflags}')" + +LIBCFLAGS="$(pkg-config rdkafka --libs --cflags)" +LIBCXXFLAGS="$(pkg-config rdkafka++ --libs --cflags)" + +# build target using distribution-specific flags +gcc -std=c11 $CFLAGS $LDFLAGS $LIBCFLAGS -o $C_BINARY $C_TEST_TARGET +g++ -std=c++11 $CXXFLAGS $LDFLAGS $LIBCXXFLAGS -o $CXX_BINARY $CXX_TEST_TARGET + + +# test that target exists +test -f ./$C_BINARY +test -f ./$CXX_BINARY + +# test that target is executable +test -x ./$C_BINARY +test -x ./$CXX_BINARY + + +# test that target runs successfully +./$C_BINARY -h +./$CXX_BINARY -h diff --git a/tests/tests.yml b/tests/tests.yml new file mode 100644 index 0000000..e77f2b5 --- /dev/null +++ b/tests/tests.yml @@ -0,0 +1,14 @@ +--- +- hosts: localhost + tags: + - classic + roles: + - role: standard-test-basic + tests: + - devel-usability + required_packages: + - gcc-c++ + - pkgconf-pkg-config + - rpm + - redhat-rpm-config + - librdkafka-devel