Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature to add a topic remapping when in bridge mode #127

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 4 additions & 68 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,7 @@
Language: Cpp
# BasedOnStyle: LLVM
AccessModifierOffset: -4
AlignAfterOpenBracket: BlockIndent
AlignArrayOfStructures: None
AlignConsecutiveAssignments:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: true
AlignConsecutiveBitFields:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignConsecutiveDeclarations:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignConsecutiveMacros:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
PadOperators: false
AlignAfterOpenBracket: Align
AlignEscapedNewlines: Right
AlignOperands: Align
AlignTrailingComments: false
Expand Down Expand Up @@ -68,7 +43,6 @@ BraceWrapping:
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakBeforeBinaryOperators: None
BreakBeforeConceptDeclarations: Always
#BreakBeforeBraces: Allman
BreakBeforeBraces: Custom
BreakBeforeInheritanceComma: false
Expand All @@ -80,59 +54,46 @@ BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: false
ColumnLimit: 180
CommentPragmas: '^ IWYU pragma:'
QualifierAlignment: Leave
CompactNamespaces: false
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: false
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
ExperimentalAutoDetectBinPacking: false
PackConstructorInitializers: Never
BasedOnStyle: ''
BasedOnStyle: GNU
ConstructorInitializerAllOnOneLineOrOnePerLine: false
AllowAllConstructorInitializersOnNextLine: true
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IfMacros:
- KJ_IF_MAYBE
IncludeBlocks: Preserve
IncludeCategories:
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
Priority: 3
SortPriority: 0
CaseSensitive: false
- Regex: '.*'
Priority: 1
SortPriority: 0
CaseSensitive: false
IncludeIsMainRegex: '(Test)?$'
IncludeIsMainSourceRegex: ''
IndentAccessModifiers: false
IndentCaseLabels: false
IndentCaseBlocks: false
IndentGotoLabels: true
IndentPPDirectives: None
IndentExternBlock: AfterExternBlock
IndentRequiresClause: true
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertBraces: false
InsertTrailingCommas: None
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
LambdaBodyIndentation: Signature
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 2
Expand All @@ -146,61 +107,36 @@ PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 19
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakOpenParenthesis: 0
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
PenaltyIndentedWhitespace: 0
PointerAlignment: Right
PPIndentWidth: -1
ReferenceAlignment: Pointer
ReflowComments: false
RemoveBracesLLVM: false
RequiresClausePosition: OwnLine
SeparateDefinitionBlocks: Leave
ShortNamespaceLines: 1
SortIncludes: Never
SortIncludes: false
SortJavaStaticImport: Before
SortUsingDeclarations: true
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeCaseColon: false
SpaceBeforeCpp11BracedList: true
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceBeforeParensOptions:
AfterControlStatements: true
AfterForeachMacros: true
AfterFunctionDefinitionName: false
AfterFunctionDeclarationName: false
AfterIfMacros: true
AfterOverloadedOperator: false
AfterRequiresInClause: false
AfterRequiresInExpression: false
BeforeNonEmptyParentheses: false
SpaceAroundPointerQualifiers: Default
SpaceBeforeRangeBasedForLoopColon: true
SpaceInEmptyBlock: false
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 1
SpacesInAngles: Never
SpacesInAngles: false
SpacesInConditionalStatement: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInLineCommentPrefix:
Minimum: 1
Maximum: -1
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceBeforeSquareBrackets: false
BitFieldColonSpacing: Both
Standard: Latest
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
- Q_UNUSED
- QT_REQUIRE_VERSION
Expand Down
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ FlashMQBuild*
*.swp
compile_commands.json
.clangd/
/cmake-build-debug/
/.idea/
/cmake-build-debug-remote/
/cmake-build-release/
/cmake-build-release-remote/
.*
19 changes: 9 additions & 10 deletions bridgeconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,16 @@ void BridgeConfig::isValid()
setClientId();
}

bool BridgeConfig::operator ==(const BridgeConfig &other) const
bool BridgeConfig::operator==(const BridgeConfig &other) const
{
return this->address == other.address && this->port == other.port && this->inet_protocol == other.inet_protocol && this->tlsMode == other.tlsMode
&& this->caFile == other.caFile && this->caDir == other.caDir && this->protocolVersion == other.protocolVersion
&& this->bridgeProtocolBit == other.bridgeProtocolBit && this->keepalive == other.keepalive && this->clientidPrefix == other.clientidPrefix
&& this->publishes == other.publishes && this->subscribes == other.subscribes && this->local_username == other.local_username
&& this->remote_username == other.remote_username && this->remote_password == other.remote_password && this->remoteCleanStart == other.remoteCleanStart
&& this->localCleanStart == other.localCleanStart && this->remoteSessionExpiryInterval == other.remoteSessionExpiryInterval
&& this->localSessionExpiryInterval == other.localSessionExpiryInterval && this->remoteRetainAvailable == other.remoteRetainAvailable
&& this->useSavedClientId == other.useSavedClientId && this->maxOutgoingTopicAliases == other.maxOutgoingTopicAliases
&& this->maxIncomingTopicAliases == other.maxIncomingTopicAliases && this->tcpNoDelay == other.tcpNoDelay;
return this->address == other.address && this->port == other.port && this->inet_protocol == other.inet_protocol && this->tlsMode == other.tlsMode &&
this->caFile == other.caFile && this->caDir == other.caDir && this->protocolVersion == other.protocolVersion && this->bridgeProtocolBit == other.bridgeProtocolBit &&
this->keepalive == other.keepalive && this->clientidPrefix == other.clientidPrefix && this->topicPrefix == other.topicPrefix && this->publishes == other.publishes &&
this->subscribes == other.subscribes && this->local_username == other.local_username && this->remote_username == other.remote_username &&
this->remote_password == other.remote_password && this->remoteCleanStart == other.remoteCleanStart && this->localCleanStart == other.localCleanStart &&
this->remoteSessionExpiryInterval == other.remoteSessionExpiryInterval && this->localSessionExpiryInterval == other.localSessionExpiryInterval &&
this->remoteRetainAvailable == other.remoteRetainAvailable && this->useSavedClientId == other.useSavedClientId &&
this->maxOutgoingTopicAliases == other.maxOutgoingTopicAliases && this->maxIncomingTopicAliases == other.maxIncomingTopicAliases && this->tcpNoDelay == other.tcpNoDelay;
}

bool BridgeConfig::operator !=(const BridgeConfig &other) const
Expand Down
1 change: 1 addition & 0 deletions bridgeconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class BridgeConfig
uint16_t maxOutgoingTopicAliases = 0;
bool useSavedClientId = false;
bool remoteRetainAvailable = true;
std::optional<std::string> topicPrefix;
std::vector<BridgeTopicPath> subscribes;
std::vector<BridgeTopicPath> publishes;
std::weak_ptr<ThreadData> owner;
Expand Down
8 changes: 8 additions & 0 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ PacketDropReason Client::writeMqttPacketAndBlameThisClient(

MqttPacket *p = copyFactory.getOptimumPacket(max_qos, this->protocolVersion, topic_alias, skip_topic, subscriptionIdentifier);

// If client is a bridge, add the topic prefix, if set.
if (getBridgeState() && getBridgeState()->c.topicPrefix)
p->addTopicPrefix(getBridgeState()->c.topicPrefix.value());
else
p->removeTopicPrefix();

logger->log(LOG_DEBUG) << "writeMqttPacketAndBlameThisClient(): " << p->getTopic();

assert(static_cast<bool>(p->getQos()) == static_cast<bool>(max_qos));
assert(PublishCopyFactory::getPublishLayoutCompareKey(this->protocolVersion, p->getQos()) ==
PublishCopyFactory::getPublishLayoutCompareKey(p->getProtocolVersion(), p->getQos()));
Expand Down
8 changes: 8 additions & 0 deletions configfileparser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ ConfigFileParser::ConfigFileParser(const std::string &path) :
validBridgeKeys.insert("remote_retain_available");
validBridgeKeys.insert("local_clean_start");
validBridgeKeys.insert("local_session_expiry_interval");
validBridgeKeys.insert("topic_prefix");
validBridgeKeys.insert("subscribe");
validBridgeKeys.insert("publish");
validBridgeKeys.insert("clientid_prefix");
Expand Down Expand Up @@ -582,6 +583,13 @@ void ConfigFileParser::loadFile(bool test)
}
curBridge->localSessionExpiryInterval = newVal;
}
if (testKeyValidity(key, "topic_prefix", validBridgeKeys))
{
if (!isValidUtf8(value) || !isValidSubscribePath(value))
throw ConfigFileException(formatString("Path '%s' is not a valid subscribe match", value.c_str()));

curBridge->topicPrefix = value;
}
if (testKeyValidity(key, "subscribe", validBridgeKeys))
{
if (!isValidUtf8(value) || !isValidSubscribePath(value))
Expand Down
27 changes: 26 additions & 1 deletion man/flashmq.conf.5
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
\\$2 \(la\\$1\(ra\\$3
..
.if \n(.g .mso www.tmac
.TH flashmq.conf 5 "Dec 23 2024" "" ""
.TH flashmq.conf 5 "Jan 07 2025" "" ""
.SH NAME
flashmq.conf \- FlashMQ configuration file format
.SH SYNOPSIS
Expand Down Expand Up @@ -518,6 +518,31 @@ The QoS value is like any subscription at a server. Messages received by the oth

Default: \fI0\fR
.TP
\*(T<\fB\m[green]topic_prefix\m[] \fI\m[cyan]prefix\m[]\fR\fR\*(T>
When sending messages to a remote server, the topic defined in \fB\m[green]subscribe\fR\m[] or \fB\m[green]publish\fR\m[] will be prepended with \fI\m[cyan]prefix\m[]\fR. This gives the ability to publish all topics from this broker under a common topic on the remote site.

The difference in using \fB\m[green]topic_prefix\fR\m[] instead of adding it manually to \fB\m[green]subscribe\fR\m[] or \fB\m[green]publish\fR\m[] is, that the \fI\m[cyan]prefix\m[]\fR is being dynamically added or removed when messages are sent or received to/from the broker. This applies to all \fB\m[green]subscribe\fR\m[] or \fB\m[green]publish\fR\m[] options at once.

Always add a trailing slash (/) to the prefix! Otherwise something like \fItopic1/\fR on the local site becomes \fIprefixtopic1/\fR on the remote site and this is probably not intended.

Example 1: Publish all messages in all local topics to the remote system under topic \fIbridge1/\fR and receive all messages from the remote system in topic \fIbridge1/\fR in local. For instance, the local topic \fIhome/sensor1/temp\fR becomes \fIbridge1/home/sensor1/temp\fR on the remote site and vice versa.

Example 2: Publish all messages from local \fItopic1/\fR to the remote system under \fIbridge1/topic1/\fR and receive all messages from the remote system under \fIbridge1/topic2/\fR to local \fItopic2/\fR.
.PP
.nf
.in +7
\m[blue]# Example 1:\m[]
\m[green]topic_prefix \m[]\m[cyan]bridge1/\m[]
\m[green]publish \m[]\m[cyan]#\m[]
\m[green]subscribe \m[]\m[cyan]#\m[]
\m[blue]# Example 2:\m[]
\m[green]topic_prefix \m[]\m[cyan]bridge1/\m[]
\m[green]publish \m[]\m[cyan]topic1/#\m[]
\m[green]subscribe \m[]\m[cyan]topic2/#\m[]

.in
.fi
.TP
\*(T<\fB\m[green]local_username\m[] \fI\m[cyan]username\m[]\fR\fR\*(T>
Username as seen by the local FlashMQ's plugin or ACL checks. This is not always necessary.
.TP
Expand Down
30 changes: 30 additions & 0 deletions man/flashmq.conf.5.dbk5
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,36 @@ listen {
</listitem>
</varlistentry>

<varlistentry xml:id="bridge__topic_prefix" condition="flashmq ≥ 1.7.0">
<term><option>topic_prefix</option> <replaceable>prefix</replaceable></term>
<listitem>
<para>
When sending messages to a remote server, the topic defined in <option>subscribe</option> or <option>publish</option> will be prepended with <replaceable>prefix</replaceable>. This gives the ability to publish all topics from this broker under a common topic on the remote site.
</para>
<para>
The difference in using <option>topic_prefix</option> instead of adding it manually to <option>subscribe</option> or <option>publish</option> is, that the <replaceable>prefix</replaceable> is being dynamically added or removed when messages are sent or received to/from the broker. This applies to all <option>subscribe</option> or <option>publish</option> options at once.
</para>
<para>
Always add a trailing slash (/) to the prefix! Otherwise something like <literal>topic1/</literal> on the local site becomes <literal>prefixtopic1/</literal> on the remote site and this is probably not intended.
</para>
<para>
Example 1: Publish all messages in all local topics to the remote system under topic <literal>bridge1/</literal> and receive all messages from the remote system in topic <literal>bridge1/</literal> in local. For instance, the local topic <literal>home/sensor1/temp</literal> becomes <literal>bridge1/home/sensor1/temp</literal> on the remote site and vice versa.
</para>
<para>
Example 2: Publish all messages from local <literal>topic1/</literal> to the remote system under <literal>bridge1/topic1/</literal> and receive all messages from the remote system under <literal>bridge1/topic2/</literal> to local <literal>topic2/</literal>.
</para>
<literallayout language="flashmq.conf" class="monospaced"><![CDATA[# Example 1:
topic_prefix bridge1/
publish #
subscribe #

# Example 2:
topic_prefix bridge1/
publish topic1/#
subscribe topic2/#]]></literallayout>
</listitem>
</varlistentry>

<varlistentry xml:id="bridge__local_username" condition="flashmq ≥ 1.7.0">
<term><option>local_username</option> <replaceable>username</replaceable></term>
<listitem>
Expand Down
30 changes: 30 additions & 0 deletions man/flashmq.conf.5.html
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,36 @@



<dt id="bridge__topic_prefix"><div xmlns="http://docbook.org/ns/docbook" class="flashmq_version_requirement" data-since-flashmq-version="1.7.0">≥ v1.7.0</div><code class="option">topic_prefix</code> <code class="replaceable">prefix</code><a class="hash-anchor" href="#bridge__topic_prefix">#</a></dt>
<dd>
<p>
When sending messages to a remote server, the topic defined in <code class="option">subscribe</code> or <code class="option">publish</code> will be prepended with <code class="replaceable">prefix</code>. This gives the ability to publish all topics from this broker under a common topic on the remote site.
</p>
<p>
The difference in using <code class="option">topic_prefix</code> instead of adding it manually to <code class="option">subscribe</code> or <code class="option">publish</code> is, that the <code class="replaceable">prefix</code> is being dynamically added or removed when messages are sent or received to/from the broker. This applies to all <code class="option">subscribe</code> or <code class="option">publish</code> options at once.
</p>
<p>
Always add a trailing slash (/) to the prefix! Otherwise something like <code class="literal">topic1/</code> on the local site becomes <code class="literal">prefixtopic1/</code> on the remote site and this is probably not intended.
</p>
<p>
Example 1: Publish all messages in all local topics to the remote system under topic <code class="literal">bridge1/</code> and receive all messages from the remote system in topic <code class="literal">bridge1/</code> in local. For instance, the local topic <code class="literal">home/sensor1/temp</code> becomes <code class="literal">bridge1/home/sensor1/temp</code> on the remote site and vice versa.
</p>
<p>
Example 2: Publish all messages from local <code class="literal">topic1/</code> to the remote system under <code class="literal">bridge1/topic1/</code> and receive all messages from the remote system under <code class="literal">bridge1/topic2/</code> to local <code class="literal">topic2/</code>.
</p>
<pre class="literallayout monospaced"># Example 1:
topic_prefix bridge1/
publish #
subscribe #

# Example 2:
topic_prefix bridge1/
publish topic1/#
subscribe topic2/#</pre>
</dd>



<dt id="bridge__local_username"><div xmlns="http://docbook.org/ns/docbook" class="flashmq_version_requirement" data-since-flashmq-version="1.7.0">≥ v1.7.0</div><code class="option">local_username</code> <code class="replaceable">username</code><a class="hash-anchor" href="#bridge__local_username">#</a></dt>
<dd>
<p>
Expand Down
Loading