Skip to content

Commit 171b4fa

Browse files
authored
Merge pull request #443 from redboltz/fix_442
Fixed broker tool not delivering all retained messages when wildcard …
2 parents cb3d37d + 38b5b96 commit 171b4fa

6 files changed

Lines changed: 423 additions & 3 deletions

File tree

CHANGELOG.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
= History
44

5+
== 10.2.7
6+
* Fixed broker tool not delivering all retained messages when wildcard subscription matches multiple topics. #442, #443
7+
58
== 10.2.6
69
* Removed unintentional copy requiment from some of async functions parameter. #439, #440
710
* Fixed Heap-use-after-free during broker shutdown. #437

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
- I/O independent (also known as Sans-I/O) MQTT protocol library for C++17.
44
- Asynchronous MQTT communication library using the MQTT protocol library and Boost.Asio.
55

6-
Version 10.2.6 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
6+
Version 10.2.7 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
77

88
## Document
99

test/system/st_retain.cpp

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,210 @@ BOOST_AUTO_TEST_CASE(v5_clear) {
747747
BOOST_TEST(t.finish());
748748
}
749749

750+
BOOST_AUTO_TEST_CASE(wildcard_multiple_matches) {
751+
// Test for issue #442: Multiple retained messages should be delivered
752+
// when subscribing with wildcard that matches multiple topics
753+
broker_runner br;
754+
as::io_context ioc;
755+
static auto guard{as::make_work_guard(ioc.get_executor())};
756+
using ep_t = am::endpoint<am::role::client, am::protocol::mqtt>;
757+
auto amep_pub = ep_t{
758+
am::protocol_version::v5,
759+
ioc.get_executor()
760+
};
761+
auto amep_sub = ep_t{
762+
am::protocol_version::v5,
763+
ioc.get_executor()
764+
};
765+
766+
static std::set<std::string> received_messages;
767+
struct tc : coro_base<ep_t> {
768+
using coro_base<ep_t>::coro_base;
769+
private:
770+
enum : std::size_t {
771+
pub,
772+
sub
773+
};
774+
void proc(
775+
am::error_code ec,
776+
std::optional<am::packet_variant> pv_opt,
777+
am::packet_id_type /*pid*/
778+
) override {
779+
reenter(this) {
780+
ep(pub).set_auto_pub_response(true);
781+
ep(sub).set_auto_pub_response(true);
782+
783+
yield as::dispatch(
784+
as::bind_executor(
785+
ep(pub).get_executor(),
786+
*this
787+
)
788+
);
789+
// connect pub
790+
yield ep(pub).async_underlying_handshake(
791+
"127.0.0.1",
792+
"1883",
793+
*this
794+
);
795+
BOOST_TEST(ec == am::error_code{});
796+
yield ep(pub).async_send(
797+
am::v5::connect_packet{
798+
true, // clean_start
799+
0, // keep_alive
800+
"pub",
801+
std::nullopt, // will
802+
"u1",
803+
"passforu1"
804+
},
805+
*this
806+
);
807+
BOOST_TEST(!ec);
808+
yield ep(pub).async_recv(*this);
809+
BOOST_TEST(pv_opt->get_if<am::v5::connack_packet>());
810+
811+
// publish retained message 1: a/x/b/c
812+
yield ep(pub).async_send(
813+
am::v5::publish_packet{
814+
"a/x/b/c",
815+
"message1",
816+
am::qos::at_most_once | am::pub::retain::yes
817+
},
818+
*this
819+
);
820+
BOOST_TEST(!ec);
821+
822+
// publish retained message 2: a/y/b/c
823+
yield ep(pub).async_send(
824+
am::v5::publish_packet{
825+
"a/y/b/c",
826+
"message2",
827+
am::qos::at_most_once | am::pub::retain::yes
828+
},
829+
*this
830+
);
831+
BOOST_TEST(!ec);
832+
833+
yield as::dispatch(
834+
as::bind_executor(
835+
ep(sub).get_executor(),
836+
*this
837+
)
838+
);
839+
840+
// connect sub
841+
yield ep(sub).async_underlying_handshake(
842+
"127.0.0.1",
843+
"1883",
844+
*this
845+
);
846+
BOOST_TEST(ec == am::error_code{});
847+
yield ep(sub).async_send(
848+
am::v5::connect_packet{
849+
true, // clean_start
850+
0, // keep_alive
851+
"sub",
852+
std::nullopt, // will
853+
"u1",
854+
"passforu1"
855+
},
856+
*this
857+
);
858+
BOOST_TEST(!ec);
859+
yield ep(sub).async_recv(*this);
860+
pv_opt->visit(
861+
am::overload {
862+
[&](am::v5::connack_packet const& p) {
863+
BOOST_TEST(!p.session_present());
864+
},
865+
[](auto const&) {
866+
BOOST_TEST(false);
867+
}
868+
}
869+
);
870+
871+
// wait retain data stored (pub/sub are different endpoint so timing adjustment is required)
872+
yield {
873+
auto tim = std::make_shared<as::steady_timer>(ep(sub).get_executor());
874+
tim->expires_after(std::chrono::seconds(1));
875+
tim->async_wait(
876+
as::consign(
877+
*this,
878+
tim
879+
)
880+
);
881+
}
882+
// Subscribe with wildcard a/+/b/#
883+
yield ep(sub).async_send(
884+
am::v5::subscribe_packet{
885+
*ep(sub).acquire_unique_packet_id(),
886+
{
887+
{"a/+/b/#", am::qos::at_most_once},
888+
}
889+
},
890+
*this
891+
);
892+
BOOST_TEST(!ec);
893+
yield ep(sub).async_recv(*this);
894+
BOOST_TEST(pv_opt->get_if<am::v5::suback_packet>());
895+
896+
// Receive first retained message
897+
yield ep(sub).async_recv(*this);
898+
BOOST_TEST(pv_opt.has_value());
899+
pv_opt->visit(
900+
am::overload {
901+
[&](am::v5::publish_packet const& p) {
902+
BOOST_TEST(p.opts().get_retain() == am::pub::retain::yes);
903+
std::string msg = std::string(p.topic()) + ":" + std::string(p.payload());
904+
received_messages.insert(msg);
905+
},
906+
[](auto const&) {
907+
BOOST_TEST(false);
908+
}
909+
}
910+
);
911+
912+
// Receive second retained message
913+
yield ep(sub).async_recv(*this);
914+
BOOST_TEST(pv_opt.has_value());
915+
pv_opt->visit(
916+
am::overload {
917+
[&](am::v5::publish_packet const& p) {
918+
BOOST_TEST(p.opts().get_retain() == am::pub::retain::yes);
919+
std::string msg = std::string(p.topic()) + ":" + std::string(p.payload());
920+
received_messages.insert(msg);
921+
},
922+
[](auto const&) {
923+
BOOST_TEST(false);
924+
}
925+
}
926+
);
927+
928+
// Verify we received exactly 2 messages with expected content
929+
BOOST_TEST(received_messages.size() == 2);
930+
BOOST_TEST(received_messages.count("a/x/b/c:message1") == 1);
931+
BOOST_TEST(received_messages.count("a/y/b/c:message2") == 1);
932+
933+
yield ep(sub).async_close(*this);
934+
935+
yield as::dispatch(
936+
as::bind_executor(
937+
ep(pub).get_executor(),
938+
*this
939+
)
940+
);
941+
yield ep(pub).async_close(*this);
942+
set_finish();
943+
guard.reset();
944+
}
945+
}
946+
};
947+
948+
tc t{{amep_pub, amep_sub}};
949+
t();
950+
ioc.run();
951+
BOOST_TEST(t.finish());
952+
}
953+
750954
BOOST_AUTO_TEST_SUITE_END()
751955

752956
#include <boost/asio/unyield.hpp>

test/unit/ut_retained_topic_map.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,4 +256,48 @@ BOOST_AUTO_TEST_CASE(large_number_of_topics) {
256256
BOOST_TEST(map.internal_size() == 1);
257257
}
258258

259+
BOOST_AUTO_TEST_CASE(wildcard_multiple_matches) {
260+
// Test case for issue #442: Multiple retained messages should be delivered
261+
// when subscribing with wildcard that matches multiple topics
262+
am::retained_topic_map<std::string> map;
263+
264+
// Insert two retained messages
265+
map.insert_or_assign("a/x/b/c", "message1");
266+
map.insert_or_assign("a/y/b/c", "message2");
267+
268+
BOOST_TEST(map.size() == 2);
269+
270+
// Subscribe with wildcard a/+/b/# should match both
271+
std::vector<std::string> matches;
272+
map.find("a/+/b/#", [&matches](std::string const &a) {
273+
matches.push_back(a);
274+
});
275+
276+
// Both messages should be delivered
277+
BOOST_TEST(matches.size() == 2);
278+
std::sort(matches.begin(), matches.end());
279+
BOOST_TEST(matches[0] == "message1");
280+
BOOST_TEST(matches[1] == "message2");
281+
282+
// Also test a/+/b/c pattern
283+
matches.clear();
284+
map.find("a/+/b/c", [&matches](std::string const &a) {
285+
matches.push_back(a);
286+
});
287+
288+
BOOST_TEST(matches.size() == 2);
289+
std::sort(matches.begin(), matches.end());
290+
BOOST_TEST(matches[0] == "message1");
291+
BOOST_TEST(matches[1] == "message2");
292+
293+
// Test non-wildcard exact match (should return only one)
294+
matches.clear();
295+
map.find("a/x/b/c", [&matches](std::string const &a) {
296+
matches.push_back(a);
297+
});
298+
299+
BOOST_TEST(matches.size() == 1);
300+
BOOST_TEST(matches[0] == "message1");
301+
}
302+
259303
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)