diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index ed9359732c..8200403eaf 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -86,12 +86,14 @@ STATSCOUNTER_DEF(rebinds, mutRebinds) static prop_t *pInputName = NULL; # define META_STRT "{\"index\":{\"_index\": \"" -# define META_STRT_CREATE "{\"create\":{\"_index\": \"" +# define META_STRT_CREATE "{\"create\":{" /* \"_index\": \" */ +# define META_IX "\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PIPELINE "\",\"pipeline\":\"" # define META_PARENT "\",\"_parent\":\"" # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" +# define META_END_NOQUOTE " }}\n" typedef enum { ES_WRITE_INDEX, @@ -362,8 +364,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdefaultPort=%d\n", pData->defaultPort); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); - dbgprintf("\tsearch index='%s'\n", pData->searchIndex); - dbgprintf("\tsearch type='%s'\n", pData->searchType); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL ? (uchar*)"(not configured)" : pData->searchIndex); + dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL ? (uchar*)"(not configured)" : pData->searchType); dbgprintf("\tpipeline name='%s'\n", pData->pipelineName); dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName); dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty); @@ -596,8 +598,8 @@ getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls, } done: - assert(srchIndex != NULL); - assert(srchType != NULL); + //assert(srchIndex != NULL); + //assert(srchType != NULL); return; } @@ -633,9 +635,14 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls) parent = NULL; } else { getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); - r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addChar(&url, '/'); - if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); + if(searchIndex != NULL) { + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(searchType != NULL) { + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); + } + } else + r = 0; if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) { if(r == 0) r = es_addChar(&url, separator); if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1); @@ -692,7 +699,11 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData, uchar *pipelineName; getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); - r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType); + r += ustrlen((char *)message); + if(searchIndex != NULL) + r += ustrlen(searchIndex); + if(searchType != NULL) + r += ustrlen(searchType); if(parent != NULL) { r += sizeof(META_PARENT)-1 + ustrlen(parent); @@ -717,6 +728,7 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; + int endQuote = 1; uchar *searchIndex = NULL; uchar *searchType; uchar *parent = NULL; @@ -725,28 +737,43 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) DEFiRet; getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); - if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) { r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1); - else + endQuote = 0; + } else r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, + if(searchIndex != NULL) { + endQuote = 1; + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_IX, sizeof(META_IX)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, + if(searchType != NULL) { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); + } + } if(parent != NULL) { + endQuote = 1; if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) { + endQuote = 1; if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName)); } if(bulkId != NULL) { + endQuote = 1; if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(endQuote == 0) { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END_NOQUOTE, sizeof(META_END_NOQUOTE)-1); + } else { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + } if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { @@ -2094,6 +2121,8 @@ CODESTARTnewActInst CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls)); } + //Only needed befor ES-Version 7.x + /* if(pData->searchIndex == NULL) pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) @@ -2104,6 +2133,7 @@ CODESTARTnewActInst "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + */ if (pData->retryFailures) { CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));