Skip to content

Commit 7129d72

Browse files
authored
Merge pull request #449 from redboltz/add_store_restore_to_client
Improved reconnect.
2 parents 885be63 + f17fcfe commit 7129d72

8 files changed

Lines changed: 148 additions & 21 deletions

File tree

CHANGELOG.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
= History
44

5-
== 10.2.9
5+
== 10.3.0
6+
* Added get_qos2_publish_handled_pids(), restore_qos2_publish_handled_pids(), restore_packets(), and get_stored_packets() to client for convenient. #449
7+
* get_endpoint() can be omitted now.
8+
* Refined reconnect examples. #449
69
* Added websocket async_close timeout. #448
710
* Added documentation regarding stream reuse restrictions for TLS-related streams. #447
811

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# http://www.boost.org/LICENSE_1_0.txt)
66

77
cmake_minimum_required (VERSION 3.13.0)
8-
project(async_mqtt_iface VERSION 10.2.9)
8+
project(async_mqtt_iface VERSION 10.3.0)
99

1010
set(CMAKE_CXX_STANDARD 17)
1111
set(CMAKE_CXX_STANDARD_REQUIRED ON)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
- I/O independent (also known as Sans-I/O) MQTT protocol library for C++17.
44
- Asynchronous MQTT communication library using the MQTT protocol library and Boost.Asio.
55

6-
Version 10.2.9 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
6+
Version 10.3.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
77

88
## Document
99

example/cl_cpp17_mqtt_sub.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,20 @@ struct app {
3232

3333
private:
3434
void connect() {
35-
cli_.async_underlying_handshake(
35+
// For TLS (e.g. mqtts, wss), step 1, 2, and 3 are required due to Boost.Asio limitation.
36+
// 1. Store MQTT session information.
37+
auto packets = cli_->get_stored_packets();
38+
auto pids = cli_->get_qos2_publish_handled_pids();
39+
40+
// 2. Re-construct client
41+
cli_.emplace(cli_->get_executor());
42+
43+
// 3. Restore MQTT session information.
44+
cli_->restore_packets(am::force_move(packets));
45+
cli_->restore_qos2_publish_handled_pids(am::force_move(pids));
46+
47+
// 4. handshake
48+
cli_->async_underlying_handshake(
3649
host_,
3750
port_,
3851
[this](auto&&... args) {
@@ -62,7 +75,7 @@ struct app {
6275
reconnect();
6376
return;
6477
}
65-
cli_.async_start(
78+
cli_->async_start(
6679
true, // clean_start
6780
std::uint16_t(0), // keep_alive
6881
"", // Client Identifier, empty means generated by the broker
@@ -95,8 +108,8 @@ struct app {
95108
{"topic2", am::qos::at_least_once},
96109
{"topic3", am::qos::exactly_once},
97110
};
98-
cli_.async_subscribe(
99-
*cli_.acquire_unique_packet_id(), // sync version only works thread safe context
111+
cli_->async_subscribe(
112+
*cli_->acquire_unique_packet_id(), // sync version only works thread safe context
100113
am::force_move(sub_entry),
101114
[this](auto&&... args) {
102115
handle_subscribe_response(
@@ -118,7 +131,7 @@ struct app {
118131
if (suback_opt) {
119132
std::cout << *suback_opt << std::endl;
120133
}
121-
cli_.async_recv(
134+
cli_->async_recv(
122135
[this](auto&&... args) {
123136
handle_recv(
124137
std::forward<std::remove_reference_t<decltype(args)>>(args)...
@@ -139,7 +152,7 @@ struct app {
139152
BOOST_ASSERT(pv_opt);
140153
std::cout << *pv_opt << std::endl;
141154
// next receive
142-
cli_.async_recv(
155+
cli_->async_recv(
143156
[this](auto&&... args) {
144157
handle_recv(
145158
std::forward<std::remove_reference_t<decltype(args)>>(args)...
@@ -148,10 +161,10 @@ struct app {
148161
);
149162
}
150163

151-
client_t cli_;
164+
std::optional<client_t> cli_;
152165
std::string host_;
153166
std::string port_;
154-
as::steady_timer tim_{cli_.get_executor()};
167+
as::steady_timer tim_{cli_->get_executor()};
155168
};
156169

157170
int main(int argc, char* argv[]) {

example/cl_cpp20coro_mqtt_sub.cpp

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,34 @@ using awaitable_client =
2929

3030
as::awaitable<void>
3131
proc(
32-
awaitable_client& amcl,
3332
std::string_view host,
3433
std::string_view port) {
3534

3635
auto exe = co_await as::this_coro::executor;
3736

3837
std::cout << "start" << std::endl;
3938

39+
std::optional<awaitable_client> amcl{exe};
4040
while (true) {
4141
try {
42-
// all underlying layer handshaking
43-
// (Resolve hostname, TCP handshake)
44-
co_await amcl.async_underlying_handshake(host, port);
42+
// For TLS (e.g. mqtts, wss), step 1, 2, and 3 are required due to Boost.Asio limitation.
43+
// 1. Store MQTT session information.
44+
auto packets = amcl->get_stored_packets();
45+
auto pids = amcl->get_qos2_publish_handled_pids();
46+
47+
// 2. Re-construct client
48+
amcl.emplace(amcl->get_executor());
49+
50+
// 3. Restore MQTT session information.
51+
amcl->restore_packets(am::force_move(packets));
52+
amcl->restore_qos2_publish_handled_pids(am::force_move(pids));
53+
54+
// 4. handshake
55+
co_await amcl->async_underlying_handshake(host, port);
4556
std::cout << "mqtt undlerlying handshaked" << std::endl;
4657

4758
// MQTT connect and receive loop start
48-
auto connack_opt = co_await amcl.async_start(
59+
auto connack_opt = co_await amcl->async_start(
4960
true, // clean_start
5061
std::uint16_t(0), // keep_alive
5162
"", // Client Identifier, empty means generated by the broker
@@ -64,8 +75,8 @@ proc(
6475
{"topic2", am::qos::at_least_once},
6576
{"topic3", am::qos::exactly_once},
6677
};
67-
auto suback_opt = co_await amcl.async_subscribe(
68-
*amcl.acquire_unique_packet_id(), // sync version only works thread safe context
78+
auto suback_opt = co_await amcl->async_subscribe(
79+
*amcl->acquire_unique_packet_id(), // sync version only works thread safe context
6980
am::force_move(sub_entry)
7081
);
7182
if (suback_opt) {
@@ -74,7 +85,7 @@ proc(
7485

7586
// recv (coroutine)
7687
while (true) {
77-
auto pv_opt = co_await amcl.async_recv(as::use_awaitable);
88+
auto pv_opt = co_await amcl->async_recv(as::use_awaitable);
7889
BOOST_ASSERT(pv_opt);
7990
pv_opt->visit(
8091
am::overload{
@@ -115,7 +126,6 @@ int main(int argc, char* argv[]) {
115126
return -1;
116127
}
117128
as::io_context ioc;
118-
auto amcl = awaitable_client{ioc.get_executor()};
119-
as::co_spawn(amcl.get_executor(), proc(amcl, argv[1], argv[2]), as::detached);
129+
as::co_spawn(ioc.get_executor(), proc(argv[1], argv[2]), as::detached);
120130
ioc.run();
121131
}

include/async_mqtt/asio_bind/client.hpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#define ASYNC_MQTT_ASIO_BIND_CLIENT_HPP
99

1010
#include <optional>
11+
#include <set>
12+
#include <vector>
1113
#include <boost/asio/async_result.hpp>
1214
#include <boost/asio/any_io_executor.hpp>
1315

@@ -17,6 +19,7 @@
1719
#include <async_mqtt/protocol/error.hpp>
1820
#include <async_mqtt/protocol/role.hpp>
1921
#include <async_mqtt/protocol/packet/packet_id_type.hpp>
22+
#include <async_mqtt/protocol/packet/store_packet_variant_fwd.hpp>
2023
#include <async_mqtt/asio_bind/endpoint_fwd.hpp>
2124

2225
namespace async_mqtt {
@@ -835,6 +838,39 @@ class client {
835838
*/
836839
void release_packet_id(packet_id_type packet_id);
837840

841+
/**
842+
* @brief Get processed but not released QoS2 packet ids
843+
* This function should be called after disconnection
844+
* @return set of packet_ids
845+
*/
846+
std::set<packet_id_type> get_qos2_publish_handled_pids() const;
847+
848+
/**
849+
* @brief Restore processed but not released QoS2 packet ids
850+
* This function should be called before receive the first publish
851+
* @param pids packet ids
852+
*/
853+
void restore_qos2_publish_handled_pids(std::set<packet_id_type> pids);
854+
855+
/**
856+
* @brief restore packets
857+
* the restored packets would automatically send when CONNACK packet is received
858+
* @param pvs packets to restore
859+
*/
860+
void restore_packets(
861+
std::vector<store_packet_variant> pvs
862+
);
863+
864+
/**
865+
* @brief get stored packets
866+
* sotred packets mean inflight packets.
867+
* @li PUBLISH packet (QoS1) not received PUBACK packet
868+
* @li PUBLISH packet (QoS1) not received PUBREC packet
869+
* @li PUBREL packet not received PUBCOMP packet
870+
* @return std::vector<store_packet_variant>
871+
*/
872+
std::vector<store_packet_variant> get_stored_packets() const;
873+
838874
/**
839875
* @brief rebinds the client type to another executor
840876
*/

include/async_mqtt/asio_bind/impl/client_impl.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ class client_impl {
8181
bool register_packet_id(packet_id_type packet_id);
8282
void release_packet_id(packet_id_type packet_id);
8383

84+
std::set<packet_id_type> get_qos2_publish_handled_pids() const;
85+
void restore_qos2_publish_handled_pids(std::set<packet_id_type> pids);
86+
void restore_packets(std::vector<store_packet_variant> pvs);
87+
std::vector<store_packet_variant> get_stored_packets() const;
88+
8489
template <typename Executor1>
8590
struct rebind_executor {
8691
using other = client_impl<

include/async_mqtt/asio_bind/impl/client_misc.hpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,34 @@ client_impl<Version, NextLayer>::set_read_buffer_size(std::size_t val) {
134134
ep_.set_read_buffer_size(val);
135135
}
136136

137+
template <protocol_version Version, typename NextLayer>
138+
inline
139+
std::set<packet_id_type>
140+
client_impl<Version, NextLayer>::get_qos2_publish_handled_pids() const {
141+
return ep_.get_qos2_publish_handled_pids();
142+
}
143+
144+
template <protocol_version Version, typename NextLayer>
145+
inline
146+
void
147+
client_impl<Version, NextLayer>::restore_qos2_publish_handled_pids(std::set<packet_id_type> pids) {
148+
ep_.restore_qos2_publish_handled_pids(force_move(pids));
149+
}
150+
151+
template <protocol_version Version, typename NextLayer>
152+
inline
153+
void
154+
client_impl<Version, NextLayer>::restore_packets(std::vector<store_packet_variant> pvs) {
155+
ep_.restore_packets(force_move(pvs));
156+
}
157+
158+
template <protocol_version Version, typename NextLayer>
159+
inline
160+
std::vector<store_packet_variant>
161+
client_impl<Version, NextLayer>::get_stored_packets() const {
162+
return ep_.get_stored_packets();
163+
}
164+
137165
} // namespace detail
138166

139167
// member functions
@@ -262,6 +290,38 @@ client<Version, NextLayer>::set_read_buffer_size(std::size_t val) {
262290
impl_->set_read_buffer_size(val);
263291
}
264292

293+
template <protocol_version Version, typename NextLayer>
294+
inline
295+
std::set<packet_id_type>
296+
client<Version, NextLayer>::get_qos2_publish_handled_pids() const {
297+
BOOST_ASSERT(impl_);
298+
return impl_->get_qos2_publish_handled_pids();
299+
}
300+
301+
template <protocol_version Version, typename NextLayer>
302+
inline
303+
void
304+
client<Version, NextLayer>::restore_qos2_publish_handled_pids(std::set<packet_id_type> pids) {
305+
BOOST_ASSERT(impl_);
306+
impl_->restore_qos2_publish_handled_pids(force_move(pids));
307+
}
308+
309+
template <protocol_version Version, typename NextLayer>
310+
inline
311+
void
312+
client<Version, NextLayer>::restore_packets(std::vector<store_packet_variant> pvs) {
313+
BOOST_ASSERT(impl_);
314+
impl_->restore_packets(force_move(pvs));
315+
}
316+
317+
template <protocol_version Version, typename NextLayer>
318+
inline
319+
std::vector<store_packet_variant>
320+
client<Version, NextLayer>::get_stored_packets() const {
321+
BOOST_ASSERT(impl_);
322+
return impl_->get_stored_packets();
323+
}
324+
265325
} // namespace async_mqtt
266326

267327
#if !defined(ASYNC_MQTT_SEPARATE_COMPILATION)

0 commit comments

Comments
 (0)