149 lines
6.4 KiB
Diff
149 lines
6.4 KiB
Diff
|
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
||
|
index ed9359732c..8200403eaf 100644
|
||
|
--- a/plugins/omelasticsearch/omelasticsearch.c
|
||
|
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
||
|
@@ -86,12 +86,14 @@ STATSCOUNTER_DEF(rebinds, mutRebinds)
|
||
|
static prop_t *pInputName = NULL;
|
||
|
|
||
|
# define META_STRT "{\"index\":{\"_index\": \""
|
||
|
-# define META_STRT_CREATE "{\"create\":{\"_index\": \""
|
||
|
+# define META_STRT_CREATE "{\"create\":{" /* \"_index\": \" */
|
||
|
+# define META_IX "\"_index\": \""
|
||
|
# define META_TYPE "\",\"_type\":\""
|
||
|
# define META_PIPELINE "\",\"pipeline\":\""
|
||
|
# define META_PARENT "\",\"_parent\":\""
|
||
|
# define META_ID "\", \"_id\":\""
|
||
|
# define META_END "\"}}\n"
|
||
|
+# define META_END_NOQUOTE " }}\n"
|
||
|
|
||
|
typedef enum {
|
||
|
ES_WRITE_INDEX,
|
||
|
@@ -362,8 +364,8 @@ CODESTARTdbgPrintInstInfo
|
||
|
dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
|
||
|
dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
|
||
|
dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
|
||
|
- dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
|
||
|
- dbgprintf("\tsearch type='%s'\n", pData->searchType);
|
||
|
+ dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL ? (uchar*)"(not configured)" : pData->searchIndex);
|
||
|
+ dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL ? (uchar*)"(not configured)" : pData->searchType);
|
||
|
dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
|
||
|
dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
|
||
|
dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty);
|
||
|
@@ -596,8 +598,8 @@ getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls,
|
||
|
}
|
||
|
|
||
|
done:
|
||
|
- assert(srchIndex != NULL);
|
||
|
- assert(srchType != NULL);
|
||
|
+ //assert(srchIndex != NULL);
|
||
|
+ //assert(srchType != NULL);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
@@ -633,9 +635,14 @@ setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
|
||
|
parent = NULL;
|
||
|
} else {
|
||
|
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||
|
- r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||
|
- if(r == 0) r = es_addChar(&url, '/');
|
||
|
- if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
|
||
|
+ if(searchIndex != NULL) {
|
||
|
+ r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
|
||
|
+ if(r == 0) r = es_addChar(&url, '/');
|
||
|
+ if(searchType != NULL) {
|
||
|
+ if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
|
||
|
+ }
|
||
|
+ } else
|
||
|
+ r = 0;
|
||
|
if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||
|
if(r == 0) r = es_addChar(&url, separator);
|
||
|
if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
|
||
|
@@ -692,7 +699,11 @@ computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
|
||
|
uchar *pipelineName;
|
||
|
|
||
|
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||
|
- r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType);
|
||
|
+ r += ustrlen((char *)message);
|
||
|
+ if(searchIndex != NULL)
|
||
|
+ r += ustrlen(searchIndex);
|
||
|
+ if(searchType != NULL)
|
||
|
+ r += ustrlen(searchType);
|
||
|
|
||
|
if(parent != NULL) {
|
||
|
r += sizeof(META_PARENT)-1 + ustrlen(parent);
|
||
|
@@ -717,6 +728,7 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||
|
{
|
||
|
int length = strlen((char *)message);
|
||
|
int r;
|
||
|
+ int endQuote = 1;
|
||
|
uchar *searchIndex = NULL;
|
||
|
uchar *searchType;
|
||
|
uchar *parent = NULL;
|
||
|
@@ -725,28 +737,43 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
||
|
DEFiRet;
|
||
|
|
||
|
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
|
||
|
- if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
||
|
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) {
|
||
|
r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
|
||
|
- else
|
||
|
+ endQuote = 0;
|
||
|
+ } else
|
||
|
r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
|
||
|
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
||
|
+ if(searchIndex != NULL) {
|
||
|
+ endQuote = 1;
|
||
|
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_IX, sizeof(META_IX)-1);
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
||
|
ustrlen(searchIndex));
|
||
|
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
||
|
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
|
||
|
+ if(searchType != NULL) {
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
|
||
|
ustrlen(searchType));
|
||
|
+ }
|
||
|
+ }
|
||
|
if(parent != NULL) {
|
||
|
+ endQuote = 1;
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
|
||
|
}
|
||
|
if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
|
||
|
+ endQuote = 1;
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1);
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName));
|
||
|
}
|
||
|
if(bulkId != NULL) {
|
||
|
+ endQuote = 1;
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
|
||
|
}
|
||
|
- if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
|
||
|
+ if(endQuote == 0) {
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END_NOQUOTE, sizeof(META_END_NOQUOTE)-1);
|
||
|
+ } else {
|
||
|
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
|
||
|
+ }
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
|
||
|
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
|
||
|
if(r != 0) {
|
||
|
@@ -2094,6 +2121,8 @@ CODESTARTnewActInst
|
||
|
CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
|
||
|
}
|
||
|
|
||
|
+ //Only needed befor ES-Version 7.x
|
||
|
+ /*
|
||
|
if(pData->searchIndex == NULL)
|
||
|
pData->searchIndex = (uchar*) strdup("system");
|
||
|
if(pData->searchType == NULL)
|
||
|
@@ -2104,6 +2133,7 @@ CODESTARTnewActInst
|
||
|
"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
|
||
|
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||
|
}
|
||
|
+ */
|
||
|
|
||
|
if (pData->retryFailures) {
|
||
|
CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
|