Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to process tag #1806

Merged
merged 29 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/collection_pipeline/CollectionPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
#include <chrono>
#include <cstdint>

#include <memory>
#include <utility>

#include "json/value.h"

#include "app_config/AppConfig.h"
#include "collection_pipeline/batch/TimeoutFlushManager.h"
#include "collection_pipeline/plugin/PluginRegistry.h"
Expand All @@ -33,6 +36,7 @@
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#include "plugin/processor/inner/ProcessorTagNative.h"

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -219,12 +223,36 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
if (!mContext.InitGlobalConfig(*config.mGlobal, extendedParams)) {
return false;
}
// extended global param includes: tag config
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithInput);
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithoutInput);
}
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithInput);
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithoutInput);

if (config.ShouldAddNativeTagProcessor()) {
LOG_INFO(sLogger, ("add tag processor", "native"));
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, GenNextPluginMeta(false));
Json::Value detail;
if (config.mGlobal) {
detail = *config.mGlobal;
}
if (!processor->Init(detail, mContext)) {
// should not happen
return false;
}
mPipelineInnerProcessor.emplace_back(std::move(processor));
} else {
// processor tag requires tags as input, so it is a special processor, cannot add as plugin
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (!mGoPipelineWithInput.isNull()) {
mGoPipelineWithInput["global"]["EnableProcessorTag"] = true;
}
if (!mGoPipelineWithoutInput.isNull()) {
mGoPipelineWithoutInput["global"]["EnableProcessorTag"] = true;
}
}

// mandatory override global.DefaultLogQueueSize in Go pipeline when input_file and Go processing coexist.
if ((inputFile != nullptr || inputContainerStdio != nullptr) && IsFlushingThroughGoPipeline()) {
mGoPipelineWithoutInput["global"]["DefaultLogQueueSize"]
Expand Down Expand Up @@ -374,6 +402,9 @@ void CollectionPipeline::Process(vector<PipelineEventGroup>& logGroupList, size_
for (auto& p : mInputs[inputIndex]->GetInnerProcessors()) {
p->Process(logGroupList);
}
for (auto& p : mPipelineInnerProcessor) {
p->Process(logGroupList);
}
for (auto& p : mProcessorLine) {
p->Process(logGroupList);
}
Expand Down
1 change: 1 addition & 0 deletions core/collection_pipeline/CollectionPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class CollectionPipeline {

std::string mName;
std::vector<std::unique_ptr<InputInstance>> mInputs;
std::vector<std::unique_ptr<ProcessorInstance>> mPipelineInnerProcessor;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::unique_ptr<ProcessorInstance>> mProcessorLine;
std::vector<std::unique_ptr<FlusherInstance>> mFlushers;
Router mRouter;
Expand Down
4 changes: 3 additions & 1 deletion core/collection_pipeline/GlobalConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#include "collection_pipeline/GlobalConfig.h"

#include <string>
#include <unordered_map>

Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
#include "json/json.h"

#include "collection_pipeline/CollectionPipelineContext.h"
Expand Down Expand Up @@ -151,7 +154,6 @@ bool GlobalConfig::Init(const Json::Value& config, const CollectionPipelineConte
extendedParams[itr.name()] = *itr;
}
}

return true;
}

Expand Down
3 changes: 3 additions & 0 deletions core/collection_pipeline/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
#include <cstdint>

#include <string>
#include <unordered_map>
#include <unordered_set>

#include "json/json.h"

#include "constants/TagConstants.h"
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

namespace logtail {

class CollectionPipelineContext;
Expand Down
78 changes: 78 additions & 0 deletions core/common/ParamExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

#include "common/ParamExtractor.h"

#include <unordered_map>
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

#include "boost/regex.hpp"

#include "constants/TagConstants.h"
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -192,4 +196,78 @@ bool IsValidMap(const Json::Value& config, const string& key, string& errorMsg)
return true;
}

void ParseDefaultAddedTag(const Json::Value* config,
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
string errorMsg;
customTagKey = DEFAULT_CONFIG_TAG_KEY_VALUE;
if (config && config->isMember(configField)) {
if (!GetOptionalStringParam(*config, "Tags." + configField, customTagKey, errorMsg)) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
PARAM_WARNING_DEFAULT(context.GetLogger(),
context.GetAlarm(),
errorMsg,
customTagKey,
pluginType,
context.GetConfigName(),
context.GetProjectName(),
context.GetLogstoreName(),
context.GetRegion());
}
if (customTagKey == DEFAULT_CONFIG_TAG_KEY_VALUE) {
customTagKey = defaultTagKeyValue;
}
} else {
customTagKey = defaultTagKeyValue;
}
}

void ParseOptionalTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
string errorMsg;
if (config && config->isMember(configField)) {
if (!GetOptionalStringParam(*config, "Tags." + configField, customTagKey, errorMsg)) {
PARAM_WARNING_DEFAULT(context.GetLogger(),
context.GetAlarm(),
errorMsg,
customTagKey,
pluginType,
context.GetConfigName(),
context.GetProjectName(),
context.GetLogstoreName(),
context.GetRegion());
}
if (customTagKey == DEFAULT_CONFIG_TAG_KEY_VALUE) {
customTagKey = defaultTagKeyValue;
}
} else {
customTagKey = "";
}
}

// if there is no tag config, config maybe nullptr, will act as default (default added or optional)
void ParseTagKey(const Json::Value* config,
const string& configField,
TagKey tagKey,
unordered_map<TagKey, string>& tagKeyMap,
const CollectionPipelineContext& context,
const std::string& pluginType,
bool defaultAdded) {
string customTagKey;
if (defaultAdded) {
ParseDefaultAddedTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType, customTagKey);
} else {
ParseOptionalTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType, customTagKey);
}
if (!customTagKey.empty()) {
tagKeyMap[tagKey] = customTagKey;
}
}

} // namespace logtail
9 changes: 9 additions & 0 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include "json/json.h"

#include "collection_pipeline/CollectionPipelineContext.h"
#include "common/StringTools.h"
#include "constants/TagConstants.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"

Expand Down Expand Up @@ -325,4 +327,11 @@ bool IsValidList(const Json::Value& config, const std::string& key, std::string&

bool IsValidMap(const Json::Value& config, const std::string& key, std::string& errorMsg);

void ParseTagKey(const Json::Value* config,
const std::string& configField,
TagKey tagKey,
std::unordered_map<TagKey, std::string>& tagKeyMap,
const CollectionPipelineContext& context,
const std::string& pluginType,
bool defaultAdded);
} // namespace logtail
2 changes: 2 additions & 0 deletions core/config/CollectionConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct CollectionConfig {
return mHasGoFlusher || ShouldNativeFlusherConnectedByGoPipeline();
}

bool ShouldAddNativeTagProcessor() const { return mHasNativeProcessor || (mHasNativeInput && !mHasGoProcessor); }
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
// return !(mHasGoInput && !mHasNativeProcessor);
Expand Down
12 changes: 0 additions & 12 deletions core/constants/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@ const std::string OS_NAME = "Linux";
const std::string OS_NAME = "Windows";
#endif

const std::string LOG_RESERVED_KEY_SOURCE = "__source__";
const std::string LOG_RESERVED_KEY_TOPIC = "__topic__";
const std::string LOG_RESERVED_KEY_USER_DEFINED_ID = "__user_defined_id__";
const std::string LOG_RESERVED_KEY_MACHINE_UUID = "__machine_uuid__";
const std::string LOG_RESERVED_KEY_HOSTNAME = "__hostname__";
const std::string LOG_RESERVED_KEY_PATH = "__path__";
const std::string LOG_RESERVED_KEY_PACKAGE_ID = "__pack_id__";
const std::string LOG_RESERVED_KEY_TRUNCATE_INFO = "__truncate_info__";
// const std::string LOG_RESERVED_KEY_ALIPAY_ZONE = "__alipay_zone__";
const std::string LOG_RESERVED_KEY_INODE = "__inode__";
const std::string LOG_RESERVED_KEY_FILE_OFFSET = "__file_offset__";

const char* SLS_EMPTY_STR_FOR_INDEX = "\01";

// profile project
Expand Down
13 changes: 0 additions & 13 deletions core/constants/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,6 @@ namespace logtail {
// OS name, Linux, Windows.
extern const std::string OS_NAME;

// Resevered key in log.
extern const std::string LOG_RESERVED_KEY_SOURCE;
extern const std::string LOG_RESERVED_KEY_TOPIC;
extern const std::string LOG_RESERVED_KEY_USER_DEFINED_ID;
extern const std::string LOG_RESERVED_KEY_MACHINE_UUID;
extern const std::string LOG_RESERVED_KEY_HOSTNAME;
extern const std::string LOG_RESERVED_KEY_PATH;
extern const std::string LOG_RESERVED_KEY_PACKAGE_ID;
extern const std::string LOG_RESERVED_KEY_TRUNCATE_INFO;
// extern const std::string LOG_RESERVED_KEY_ALIPAY_ZONE;
extern const std::string LOG_RESERVED_KEY_INODE;
extern const std::string LOG_RESERVED_KEY_FILE_OFFSET;

extern const char* SLS_EMPTY_STR_FOR_INDEX;

// profile project
Expand Down
26 changes: 26 additions & 0 deletions core/constants/EntityConstants.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "constants/EntityConstants.h"

using namespace std;

namespace logtail {

const string DEFAULT_VALUE_DOMAIN_ACS = "acs";
const string DEFAULT_VALUE_DOMAIN_INFRA = "infra";

} // namespace logtail
24 changes: 24 additions & 0 deletions core/constants/EntityConstants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>

namespace logtail {

extern const std::string DEFAULT_VALUE_DOMAIN_ACS;
extern const std::string DEFAULT_VALUE_DOMAIN_INFRA;

} // namespace logtail
Loading
Loading