diff options
author | Yuri Victorovich <yuri@FreeBSD.org> | 2025-03-28 09:21:23 +0000 |
---|---|---|
committer | Yuri Victorovich <yuri@FreeBSD.org> | 2025-03-28 09:54:10 +0000 |
commit | 1559e2b08bd0410065139656f1b0c76c8b0d5831 (patch) | |
tree | d81b2ac30f3261016b41f7d2e9be45c6fc4acde5 /net-p2p/pulsar-client-cpp | |
parent | a34ad504d51c6f26fdfe1af8f0377ab9d12486eb (diff) |
Diffstat (limited to 'net-p2p/pulsar-client-cpp')
-rw-r--r-- | net-p2p/pulsar-client-cpp/Makefile | 1 | ||||
-rw-r--r-- | net-p2p/pulsar-client-cpp/distinfo | 2 | ||||
-rw-r--r-- | net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 | 1111 |
3 files changed, 1112 insertions, 2 deletions
diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile index 23a17a8c156e..19d40f02baf2 100644 --- a/net-p2p/pulsar-client-cpp/Makefile +++ b/net-p2p/pulsar-client-cpp/Makefile @@ -13,7 +13,6 @@ LICENSE= APACHE20 LICENSE_FILE= ${WRKSRC}/LICENSE ONLY_FOR_ARCHS= amd64 i386 # due to requirement of instruction sets crc32, pclmul -BROKEN= compilation fails with boost-libs-1.87.0, see https://github.com/apache/pulsar-client-cpp/issues/475 BROKEN_i386= compilation fails due to overflow, see https://github.com/apache/pulsar-client-cpp/issues/449 BUILD_DEPENDS= ${LOCALBASE}/include/boost/algorithm/string.hpp:devel/boost-libs diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo index 3a3e4dec1501..3d7c80b515e0 100644 --- a/net-p2p/pulsar-client-cpp/distinfo +++ b/net-p2p/pulsar-client-cpp/distinfo @@ -1,3 +1,3 @@ -TIMESTAMP = 1736572558 +TIMESTAMP = 1743152964 SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671 SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627 diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 new file mode 100644 index 000000000000..ca6cb6a02135 --- /dev/null +++ b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 @@ -0,0 +1,1111 @@ +- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+ + +diff --git CMakeLists.txt CMakeLists.txt +index b0046534..2efeec89 100644 +--- CMakeLists.txt ++++ CMakeLists.txt +@@ -19,15 +19,16 @@ + + cmake_minimum_required(VERSION 3.13) + +-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) +- + option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) + if (INTEGRATE_VCPKG) +- set(USE_ASIO ON) ++ option(USE_ASIO "Use Asio instead of Boost.Asio" ON) + if (NOT CMAKE_TOOLCHAIN_FILE) + set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") + endif () ++else () ++ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) + endif () ++message(STATUS "USE_ASIO: ${USE_ASIO}") + + option(BUILD_TESTS "Build tests" ON) + message(STATUS "BUILD_TESTS: " ${BUILD_TESTS}) +diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc +index 7233b2c9..bc8da970 100644 +--- lib/AckGroupingTrackerEnabled.cc ++++ lib/AckGroupingTrackerEnabled.cc +@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() { + this->flush(); + std::lock_guard<std::mutex> lock(this->mutexTimer_); + if (this->timer_) { +- ASIO_ERROR ec; +- this->timer_->cancel(ec); ++ this->timer_->cancel(); + } + } + +@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() { + + std::lock_guard<std::mutex> lock(this->mutexTimer_); + this->timer_ = this->executor_->createDeadlineTimer(); +- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); ++ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); + auto self = shared_from_this(); + this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { + if (!ec) { +diff --git lib/ClientConnection.cc lib/ClientConnection.cc +index 2037722f..de226a85 100644 +--- lib/ClientConnection.cc ++++ lib/ClientConnection.cc +@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: + if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { + LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); + std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); +- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); ++ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost)); + } + + LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); +@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC + // Only send keep-alive probes if the broker supports it + keepAliveTimer_ = executor_->createDeadlineTimer(); + if (keepAliveTimer_) { +- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); ++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); + auto weakSelf = weak_from_this(); + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { + auto self = weakSelf.lock(); +@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta + // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero + // Check if we have a timer still before we set the request timer to pop again. + if (consumerStatsRequestTimer_) { +- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); ++ consumerStatsRequestTimer_->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); +@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee + typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle; + #endif + +-/* +- * TCP Connect handler +- * +- * if async_connect without any error, connected_ would be set to true +- * at this point the connection is deemed valid to be used by clients of this class +- */ +-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { +- if (!err) { +- std::stringstream cnxStringStream; +- try { +- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() +- << "] "; +- cnxString_ = cnxStringStream.str(); +- } catch (const ASIO_SYSTEM_ERROR& e) { +- LOG_ERROR("Failed to get endpoint: " << e.what()); +- close(ResultRetryable); +- return; +- } +- if (logicalAddress_ == physicalAddress_) { +- LOG_INFO(cnxString_ << "Connected to broker"); +- } else { +- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ +- << ", proxy: " << proxyServiceUrl_ +- << ", physical address:" << physicalAddress_); +- } ++void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) { ++ std::stringstream cnxStringStream; ++ try { ++ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; ++ cnxString_ = cnxStringStream.str(); ++ } catch (const ASIO_SYSTEM_ERROR& e) { ++ LOG_ERROR("Failed to get endpoint: " << e.what()); ++ close(ResultRetryable); ++ return; ++ } ++ if (logicalAddress_ == physicalAddress_) { ++ LOG_INFO(cnxString_ << "Connected to broker"); ++ } else { ++ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ ++ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_); ++ } + +- Lock lock(mutex_); +- if (isClosed()) { +- LOG_INFO(cnxString_ << "Connection already closed"); +- return; +- } +- state_ = TcpConnected; +- lock.unlock(); ++ Lock lock(mutex_); ++ if (isClosed()) { ++ LOG_INFO(cnxString_ << "Connection already closed"); ++ return; ++ } ++ state_ = TcpConnected; ++ lock.unlock(); + +- ASIO_ERROR error; +- socket_->set_option(tcp::no_delay(true), error); +- if (error) { +- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); +- } ++ ASIO_ERROR error; ++ socket_->set_option(tcp::no_delay(true), error); ++ if (error) { ++ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); ++ } + +- socket_->set_option(tcp::socket::keep_alive(true), error); +- if (error) { +- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); +- } ++ socket_->set_option(tcp::socket::keep_alive(true), error); ++ if (error) { ++ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); ++ } + +- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this +- // should never happen, given that we're sending our own keep-alive probes (within the TCP +- // connection) every 30 seconds +- socket_->set_option(tcp_keep_alive_idle(1 * 60), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); +- } ++ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this ++ // should never happen, given that we're sending our own keep-alive probes (within the TCP ++ // connection) every 30 seconds ++ socket_->set_option(tcp_keep_alive_idle(1 * 60), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); ++ } + +- // Send up to 10 probes before declaring the connection broken +- socket_->set_option(tcp_keep_alive_count(10), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); +- } ++ // Send up to 10 probes before declaring the connection broken ++ socket_->set_option(tcp_keep_alive_count(10), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); ++ } + +- // Interval between probes: 6 seconds +- socket_->set_option(tcp_keep_alive_interval(6), error); +- if (error) { +- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); +- } ++ // Interval between probes: 6 seconds ++ socket_->set_option(tcp_keep_alive_interval(6), error); ++ if (error) { ++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); ++ } + +- if (tlsSocket_) { +- if (!isTlsAllowInsecureConnection_) { +- ASIO_ERROR err; +- Url service_url; +- if (!Url::parse(physicalAddress_, service_url)) { +- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); +- close(); +- return; +- } +- } +- auto weakSelf = weak_from_this(); +- auto socket = socket_; +- auto tlsSocket = tlsSocket_; +- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation +- // fault might happen +- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleHandshake(err); +- } +- }; +- tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client, +- ASIO::bind_executor(strand_, callback)); +- } else { +- handleHandshake(ASIO_SUCCESS); +- } +- } else if (endpointIterator != tcp::resolver::iterator()) { +- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); +- // The connection failed. Try the next endpoint in the list. +- ASIO_ERROR closeError; +- socket_->close(closeError); // ignore the error of close +- if (closeError) { +- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); +- } +- connectTimeoutTask_->stop(); +- ++endpointIterator; +- if (endpointIterator != tcp::resolver::iterator()) { +- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); +- connectTimeoutTask_->start(); +- tcp::endpoint endpoint = *endpointIterator; +- auto weakSelf = weak_from_this(); +- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleTcpConnected(err, endpointIterator); +- } +- }); +- } else { +- if (err == ASIO::error::operation_aborted) { +- // TCP connect timeout, which is not retryable ++ if (tlsSocket_) { ++ if (!isTlsAllowInsecureConnection_) { ++ ASIO_ERROR err; ++ Url service_url; ++ if (!Url::parse(physicalAddress_, service_url)) { ++ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); + close(); +- } else { +- close(ResultRetryable); ++ return; + } + } ++ auto weakSelf = weak_from_this(); ++ auto socket = socket_; ++ auto tlsSocket = tlsSocket_; ++ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation ++ // fault might happen ++ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { ++ auto self = weakSelf.lock(); ++ if (self) { ++ self->handleHandshake(err); ++ } ++ }; ++ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client, ++ ASIO::bind_executor(strand_, callback)); + } else { +- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); +- close(ResultRetryable); ++ handleHandshake(ASIO_SUCCESS); + } + } + +@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() { + } + + LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); +- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); ++ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port()); + auto weakSelf = weak_from_this(); +- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleResolve(err, iterator); +- } +- }); ++ resolver_->async_resolve( ++ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) { ++ auto self = weakSelf.lock(); ++ if (!self) { ++ return; ++ } ++ if (err) { ++ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; ++ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); ++ close(); ++ return; ++ } ++ if (results.empty()) { ++ LOG_ERROR(cnxString_ << "No IP address found"); ++ close(); ++ return; ++ } ++ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { ++ ClientConnectionPtr ptr = weakSelf.lock(); ++ if (!ptr) { ++ // Connection was already destroyed ++ return; ++ } ++ ++ if (ptr->state_ != Ready) { ++ LOG_ERROR(ptr->cnxString_ << "Connection was not established in " ++ << ptr->connectTimeoutTask_->getPeriodMs() ++ << " ms, close the socket"); ++ PeriodicTask::ErrorCode err; ++ ptr->socket_->close(err); ++ if (err) { ++ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); ++ } ++ } ++ ptr->connectTimeoutTask_->stop(); ++ }); ++ connectTimeoutTask_->start(); ++ std::vector<tcp::resolver::endpoint_type> endpoints; ++ for (const auto& result : results) { ++ endpoints.emplace_back(result.endpoint()); ++ } ++ asyncConnect(endpoints, 0); ++ }); + } + +-void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { +- if (err) { +- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; +- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); +- close(); ++void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) { ++ if (index >= endpoints.size()) { ++ close(ResultRetryable); + return; + } +- + auto weakSelf = weak_from_this(); +- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { +- ClientConnectionPtr ptr = weakSelf.lock(); +- if (!ptr) { +- // Connection was already destroyed ++ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) { ++ auto self = weakSelf.lock(); ++ if (!self) { + return; + } +- +- if (ptr->state_ != Ready) { +- LOG_ERROR(ptr->cnxString_ << "Connection was not established in " +- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); +- PeriodicTask::ErrorCode err; +- ptr->socket_->close(err); +- if (err) { +- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); +- } ++ if (err) { ++ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); ++ asyncConnect(endpoints, index + 1); ++ return; + } +- ptr->connectTimeoutTask_->stop(); ++ completeConnect(endpoints[index]); + }); +- +- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); +- connectTimeoutTask_->start(); +- if (endpointIterator != tcp::resolver::iterator()) { +- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // +- << " to " << endpointIterator->endpoint()); +- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { +- auto self = weakSelf.lock(); +- if (self) { +- self->handleTcpConnected(err, endpointIterator); +- } +- }); +- } else { +- LOG_WARN(cnxString_ << "No IP address found"); +- close(); +- return; +- } + } + + void ClientConnection::readNextCommand() { +@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request + LookupRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() { + PairSharedBuffer buffer = + Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); + +- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the +- // callback is called, an invalid buffer range might be passed to the underlying socket send. ++ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before ++ // the callback is called, an invalid buffer range might be passed to the underlying socket ++ // send. + asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { + handleSendPair(err); + })); +@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm + + PendingRequestData requestData; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() { + // be zero And we do not attempt to dereference the pointer. + Lock lock(mutex_); + if (keepAliveTimer_) { +- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); ++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); + auto weakSelf = weak_from_this(); + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { + auto self = weakSelf.lock(); +@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u + LastMessageIdRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); +- requestData.timer->expires_from_now(operationsTimeout_); ++ requestData.timer->expires_after(operationsTimeout_); + auto weakSelf = weak_from_this(); + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top + lock.unlock(); + + auto weakSelf = weak_from_this(); +- timer->expires_from_now(operationsTimeout_); ++ timer->expires_after(operationsTimeout_); + timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); + if (!self) { +@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) { + auto it = pendingRequests_.find(requestId); + if (it != pendingRequests_.end()) { + it->second.promise.setFailed(ResultDisconnected); +- ASIO_ERROR ec; +- it->second.timer->cancel(ec); ++ it->second.timer->cancel(); + pendingRequests_.erase(it); + } + } +diff --git lib/ClientConnection.h lib/ClientConnection.h +index 7646f85e..14e07652 100644 +--- lib/ClientConnection.h ++++ lib/ClientConnection.h +@@ -25,13 +25,13 @@ + #include <atomic> + #ifdef USE_ASIO + #include <asio/bind_executor.hpp> +-#include <asio/io_service.hpp> ++#include <asio/io_context.hpp> + #include <asio/ip/tcp.hpp> + #include <asio/ssl/stream.hpp> + #include <asio/strand.hpp> + #else + #include <boost/asio/bind_executor.hpp> +-#include <boost/asio/io_service.hpp> ++#include <boost/asio/io_context.hpp> + #include <boost/asio/ip/tcp.hpp> + #include <boost/asio/ssl/stream.hpp> + #include <boost/asio/strand.hpp> +@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + DeadlineTimerPtr timer; + }; + +- /* +- * handler for connectAsync +- * creates a ConnectionPtr which has a valid ClientConnection object +- * although not usable at this point, since this is just tcp connection +- * Pulsar - Connect/Connected has yet to happen +- */ +- void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator); ++ void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index); ++ void completeConnect(ASIO::ip::tcp::endpoint endpoint); + + void handleHandshake(const ASIO_ERROR& err); + +@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + + void handlePulsarConnected(const proto::CommandConnected& cmdConnected); + +- void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator); +- + void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd); + void handleSendPair(const ASIO_ERROR& err); + void sendPendingCommands(); +@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien + */ + SocketPtr socket_; + TlsSocketPtr tlsSocket_; +- ASIO::strand<ASIO::io_service::executor_type> strand_; ++ ASIO::strand<ASIO::io_context::executor_type> strand_; + + const std::string logicalAddress_; + /* +diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc +index 250845b3..cfdb0b2d 100644 +--- lib/ConsumerImpl.cc ++++ lib/ConsumerImpl.cc +@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b + } + + void ConsumerImpl::triggerCheckExpiredChunkedTimer() { +- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); ++ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { + auto self = weakSelf.lock(); +@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time + } + remainTime -= next; + +- timer->expires_from_now(next); ++ timer->expires_after(next); + + auto self = shared_from_this(); + timer->async_wait([this, backoff, remainTime, timer, next, callback, +@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() { + } + + void ConsumerImpl::cancelTimers() noexcept { +- ASIO_ERROR ec; +- batchReceiveTimer_->cancel(ec); +- checkExpiredChunkedTimer_->cancel(ec); ++ batchReceiveTimer_->cancel(); ++ checkExpiredChunkedTimer_->cancel(); + unAckedMessageTrackerPtr_->stop(); + consumerStatsBasePtr_->stop(); + } +diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc +index 098f2d5b..76d99370 100644 +--- lib/ConsumerImplBase.cc ++++ lib/ConsumerImplBase.cc +@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi + + void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { + if (timeoutMs > 0) { +- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); ++ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs)); + std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()}; + batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +diff --git lib/ExecutorService.cc lib/ExecutorService.cc +index 794e3619..7f2a2c14 100644 +--- lib/ExecutorService.cc ++++ lib/ExecutorService.cc +@@ -18,6 +18,12 @@ + */ + #include "ExecutorService.h" + ++#ifdef USE_ASIO ++#include <asio/post.hpp> ++#else ++#include <boost/asio/post.hpp> ++#endif ++ + #include "LogUtils.h" + #include "TimeUtils.h" + DECLARE_LOG_OBJECT() +@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); } + void ExecutorService::start() { + auto self = shared_from_this(); + std::thread t{[this, self] { +- LOG_DEBUG("Run io_service in a single thread"); +- ASIO_ERROR ec; ++ LOG_DEBUG("Run io_context in a single thread"); + while (!closed_) { +- io_service_.restart(); +- IOService::work work{getIOService()}; +- io_service_.run(ec); +- } +- if (ec) { +- LOG_ERROR("Failed to run io_service: " << ec.message()); +- } else { +- LOG_DEBUG("Event loop of ExecutorService exits successfully"); ++ io_context_.restart(); ++ auto work{ASIO::make_work_guard(io_context_)}; ++ io_context_.run(); + } ++ LOG_DEBUG("Event loop of ExecutorService exits successfully"); + { + std::lock_guard<std::mutex> lock{mutex_}; + ioServiceDone_ = true; +@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() { + } + + /* +- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance ++ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance + * @ returns shared_ptr to this socket + */ + SocketPtr ExecutorService::createSocket() { + try { +- return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); ++ return SocketPtr(new ASIO::ip::tcp::socket(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create socket: ") + e.what(); +@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont + } + + /* +- * factory method of Resolver object associated with io_service_ instance ++ * factory method of Resolver object associated with io_context_ instance + * @returns shraed_ptr to resolver object + */ + TcpResolverPtr ExecutorService::createTcpResolver() { + try { +- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); ++ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create resolver: ") + e.what(); +@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() { + + DeadlineTimerPtr ExecutorService::createDeadlineTimer() { + try { +- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); ++ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_)); + } catch (const ASIO_SYSTEM_ERROR &e) { + restart(); + auto error = std::string("Failed to create steady_timer: ") + e.what(); +@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { + } + } + +-void ExecutorService::restart() { io_service_.stop(); } ++void ExecutorService::restart() { io_context_.stop(); } + + void ExecutorService::close(long timeoutMs) { + bool expectedState = false; +@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) { + return; + } + if (timeoutMs == 0) { // non-blocking +- io_service_.stop(); ++ io_context_.stop(); + return; + } + + std::unique_lock<std::mutex> lock{mutex_}; +- io_service_.stop(); ++ io_context_.stop(); + if (timeoutMs > 0) { + cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; }); + } else { // < 0 +@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) { + } + } + +-void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); } ++void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); } + + ///////////////////// + +diff --git lib/ExecutorService.h lib/ExecutorService.h +index 89d06d30..626cb203 100644 +--- lib/ExecutorService.h ++++ lib/ExecutorService.h +@@ -23,11 +23,11 @@ + + #include <atomic> + #ifdef USE_ASIO +-#include <asio/io_service.hpp> ++#include <asio/io_context.hpp> + #include <asio/ip/tcp.hpp> + #include <asio/ssl.hpp> + #else +-#include <boost/asio/io_service.hpp> ++#include <boost/asio/io_context.hpp> + #include <boost/asio/ip/tcp.hpp> + #include <boost/asio/ssl.hpp> + #endif +@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt + typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr; + class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> { + public: +- using IOService = ASIO::io_service; ++ using IOService = ASIO::io_context; + using SharedPtr = std::shared_ptr<ExecutorService>; + + static SharedPtr create(); +@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut + // See TimeoutProcessor for the semantics of the parameter. + void close(long timeoutMs = 3000); + +- IOService &getIOService() { return io_service_; } ++ IOService &getIOService() { return io_context_; } + bool isClosed() const noexcept { return closed_; } + + private: + /* +- * io_service is our interface to os, io object schedule async ops on this object ++ * io_context is our interface to os, io object schedule async ops on this object + */ +- IOService io_service_; ++ IOService io_context_; + + std::atomic_bool closed_{false}; + std::mutex mutex_; +diff --git lib/HandlerBase.cc lib/HandlerBase.cc +index 65aa0db1..71902481 100644 +--- lib/HandlerBase.cc ++++ lib/HandlerBase.cc +@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, + redirectedClusterURI_("") {} + + HandlerBase::~HandlerBase() { +- ASIO_ERROR ignored; +- timer_->cancel(ignored); +- creationTimer_->cancel(ignored); ++ timer_->cancel(); ++ creationTimer_->cancel(); + } + + void HandlerBase::start() { +@@ -61,15 +60,14 @@ void HandlerBase::start() { + if (state_.compare_exchange_strong(state, Pending)) { + grabCnx(); + } +- creationTimer_->expires_from_now(operationTimeut_); ++ creationTimer_->expires_after(operationTimeut_); + std::weak_ptr<HandlerBase> weakSelf{shared_from_this()}; + creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { + auto self = weakSelf.lock(); + if (self && !error) { + LOG_WARN("Cancel the pending reconnection due to the start timeout"); + connectionFailed(ResultTimeout); +- ASIO_ERROR ignored; +- timer_->cancel(ignored); ++ timer_->cancel(); + } + }); + } +@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) + connectionTimeMs_ = + duration_cast<milliseconds>(high_resolution_clock::now() - before).count(); + // Prevent the creationTimer_ from cancelling the timer_ in future +- ASIO_ERROR ignored; +- creationTimer_->cancel(ignored); ++ creationTimer_->cancel(); + LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms") + } else if (isResultRetryable(result)) { + scheduleReconnection(); +@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig + TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next(); + + LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); +- timer_->expires_from_now(delay); ++ timer_->expires_after(delay); + // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled + // so we will not run into the case where grabCnx is invoked on out of scope handler + auto name = getName(); +diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc +index dddade5c..61fbf7b8 100644 +--- lib/MultiTopicsConsumerImpl.cc ++++ lib/MultiTopicsConsumerImpl.cc +@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { + return numberOfConnectedConsumer; + } + void MultiTopicsConsumerImpl::runPartitionUpdateTask() { +- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); ++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); + auto weakSelf = weak_from_this(); + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it +@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { + + void MultiTopicsConsumerImpl::cancelTimers() noexcept { + if (partitionsUpdateTimer_) { +- ASIO_ERROR ec; +- partitionsUpdateTimer_->cancel(ec); ++ partitionsUpdateTimer_->cancel(); + } + } + +diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc +index e443496d..e50b4ca2 100644 +--- lib/NegativeAcksTracker.cc ++++ lib/NegativeAcksTracker.cc +@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() { + return; + } + std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()}; +- timer_->expires_from_now(timerInterval_); ++ timer_->expires_after(timerInterval_); + timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { + if (auto self = weakSelf.lock()) { + self->handleTimer(ec); +@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { + + void NegativeAcksTracker::close() { + closed_ = true; +- ASIO_ERROR ec; +- timer_->cancel(ec); ++ timer_->cancel(); + std::lock_guard<std::mutex> lock(mutex_); + nackedMessages_.clear(); + } +diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc +index 4178096c..923c038b 100644 +--- lib/PartitionedProducerImpl.cc ++++ lib/PartitionedProducerImpl.cc +@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { + + void PartitionedProducerImpl::runPartitionUpdateTask() { + auto weakSelf = weak_from_this(); +- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); ++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); + if (self) { +@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { + + void PartitionedProducerImpl::cancelTimers() noexcept { + if (partitionsUpdateTimer_) { +- ASIO_ERROR ec; +- partitionsUpdateTimer_->cancel(ec); ++ partitionsUpdateTimer_->cancel(); + } + } + +diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc +index 4fc7bb61..07d9a7bc 100644 +--- lib/PatternMultiTopicsConsumerImpl.cc ++++ lib/PatternMultiTopicsConsumerImpl.cc +@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern() + + void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { + autoDiscoveryRunning_ = false; +- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); ++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); + + auto weakSelf = weak_from_this(); + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { +@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() { + LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); + + if (conf_.getPatternAutoDiscoveryPeriod() > 0) { +- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); ++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); + auto weakSelf = weak_from_this(); + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { + if (auto self = weakSelf.lock()) { +@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { + MultiTopicsConsumerImpl::closeAsync(callback); + } + +-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { +- ASIO_ERROR ec; +- autoDiscoveryTimer_->cancel(ec); +-} ++void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); } +diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc +index 9fde012a..4b5f9621 100644 +--- lib/PeriodicTask.cc ++++ lib/PeriodicTask.cc +@@ -29,7 +29,7 @@ void PeriodicTask::start() { + state_ = Ready; + if (periodMs_ >= 0) { + std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()}; +- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); ++ timer_->expires_after(std::chrono::milliseconds(periodMs_)); + timer_->async_wait([weakSelf](const ErrorCode& ec) { + auto self = weakSelf.lock(); + if (self) { +@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept { + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } +- ErrorCode ec; +- timer_->cancel(ec); ++ timer_->cancel(); + state_ = Pending; + } + +@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { + // state_ may be changed in handleTimeout, so we check state_ again + if (state_ == Ready) { + auto self = shared_from_this(); +- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); ++ timer_->expires_after(std::chrono::milliseconds(periodMs_)); + timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } + } +diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc +index 4399ce5f..8b112bf1 100644 +--- lib/ProducerImpl.cc ++++ lib/ProducerImpl.cc +@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c + bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); + bool isFull = batchMessageContainer_->add(msg, callback); + if (isFirstMessage) { +- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); ++ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs())); + auto weakSelf = weak_from_this(); + batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() { + + void ProducerImpl::cancelTimers() noexcept { + dataKeyRefreshTask_.stop(); +- ASIO_ERROR ec; +- batchTimer_->cancel(ec); +- sendTimer_->cancel(ec); ++ batchTimer_->cancel(); ++ sendTimer_->cancel(); + } + + bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const { +@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() { + } + + void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { +- sendTimer_->expires_from_now(expiryTime); ++ sendTimer_->expires_after(expiryTime); + + auto weakSelf = weak_from_this(); + sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { +diff --git lib/RetryableOperation.h lib/RetryableOperation.h +index dba190f4..8a235d3a 100644 +--- lib/RetryableOperation.h ++++ lib/RetryableOperation.h +@@ -26,8 +26,8 @@ + #include <functional> + #include <memory> + ++#include "AsioTimer.h" + #include "Backoff.h" +-#include "ExecutorService.h" + #include "Future.h" + #include "LogUtils.h" + #include "ResultUtils.h" +@@ -68,8 +68,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio + + void cancel() { + promise_.setFailed(ResultDisconnected); +- ASIO_ERROR ec; +- timer_->cancel(ec); ++ timer_->cancel(); + } + + private: +@@ -107,7 +106,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio + } + + auto delay = std::min(backoff_.next(), remainingTime); +- timer_->expires_from_now(delay); ++ timer_->expires_after(delay); + + auto nextRemainingTime = remainingTime - delay; + LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) +diff --git lib/RetryableOperationCache.h lib/RetryableOperationCache.h +index e42460dd..5030c94e 100644 +--- lib/RetryableOperationCache.h ++++ lib/RetryableOperationCache.h +@@ -18,7 +18,6 @@ + */ + #pragma once + +-#include <chrono> + #include <mutex> + #include <unordered_map> + +diff --git lib/SharedBuffer.h lib/SharedBuffer.h +index 26fc59ed..a6ced186 100644 +--- lib/SharedBuffer.h ++++ lib/SharedBuffer.h +@@ -151,11 +151,11 @@ class SharedBuffer { + + inline bool writable() const { return writableBytes() > 0; } + +- ASIO::const_buffers_1 const_asio_buffer() const { +- return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes()); ++ ASIO::const_buffer const_asio_buffer() const { ++ return ASIO::const_buffer(ptr_ + readIdx_, readableBytes()); + } + +- ASIO::mutable_buffers_1 asio_buffer() { ++ ASIO::mutable_buffer asio_buffer() { + assert(data_); + return ASIO::buffer(ptr_ + writeIdx_, writableBytes()); + } +diff --git lib/UnAckedMessageTrackerEnabled.cc lib/UnAckedMessageTrackerEnabled.cc +index e371af99..3b959d8a 100644 +--- lib/UnAckedMessageTrackerEnabled.cc ++++ lib/UnAckedMessageTrackerEnabled.cc +@@ -34,7 +34,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { + timeoutHandlerHelper(); + ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); + timer_ = executorService->createDeadlineTimer(); +- timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_)); ++ timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_)); + std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()}; + timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +@@ -173,9 +173,8 @@ void UnAckedMessageTrackerEnabled::clear() { + } + + void UnAckedMessageTrackerEnabled::stop() { +- ASIO_ERROR ec; + if (timer_) { +- timer_->cancel(ec); ++ timer_->cancel(); + } + } + } /* namespace pulsar */ +diff --git lib/stats/ConsumerStatsImpl.cc lib/stats/ConsumerStatsImpl.cc +index 0eefabdc..e8bd919a 100644 +--- lib/stats/ConsumerStatsImpl.cc ++++ lib/stats/ConsumerStatsImpl.cc +@@ -85,7 +85,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy + } + + void ConsumerStatsImpl::scheduleTimer() { +- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); ++ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); + std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()}; + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +diff --git lib/stats/ConsumerStatsImpl.h lib/stats/ConsumerStatsImpl.h +index 3333ea85..35fda9b4 100644 +--- lib/stats/ConsumerStatsImpl.h ++++ lib/stats/ConsumerStatsImpl.h +@@ -59,10 +59,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl> + ConsumerStatsImpl(const ConsumerStatsImpl& stats); + void flushAndReset(const ASIO_ERROR&); + void start() override; +- void stop() override { +- ASIO_ERROR error; +- timer_->cancel(error); +- } ++ void stop() override { timer_->cancel(); } + void receivedMessage(Message&, Result) override; + void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; + virtual ~ConsumerStatsImpl(); +diff --git lib/stats/ProducerStatsImpl.cc lib/stats/ProducerStatsImpl.cc +index 15e9e67e..b5e00794 100644 +--- lib/stats/ProducerStatsImpl.cc ++++ lib/stats/ProducerStatsImpl.cc +@@ -109,7 +109,7 @@ void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) { + ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } + + void ProducerStatsImpl::scheduleTimer() { +- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); ++ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); + std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()}; + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); +diff --git tests/AuthPluginTest.cc tests/AuthPluginTest.cc +index 24549d7f..ed0511ea 100644 +--- tests/AuthPluginTest.cc ++++ tests/AuthPluginTest.cc +@@ -309,16 +309,17 @@ namespace testAthenz { + std::string principalToken; + void mockZTS(Latch& latch, int port) { + LOG_INFO("-- MockZTS started"); +- ASIO::io_service io; +- ASIO::ip::tcp::iostream stream; ++ ASIO::io_context io; ++ ASIO::ip::tcp::socket socket(io); + ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port)); + + LOG_INFO("-- MockZTS waiting for connnection"); + latch.countdown(); +- acceptor.accept(*stream.rdbuf()); ++ acceptor.accept(socket); + LOG_INFO("-- MockZTS got connection"); + + std::string headerLine; ++ ASIO::ip::tcp::iostream stream(std::move(socket)); + while (getline(stream, headerLine)) { + std::vector<std::string> kv; + boost::algorithm::split(kv, headerLine, boost::is_any_of(" ")); +diff --git tests/ConsumerTest.h tests/ConsumerTest.h +index 82482875..9d190c10 100644 +--- tests/ConsumerTest.h ++++ tests/ConsumerTest.h +@@ -46,8 +46,8 @@ class ConsumerTest { + return nullptr; + } + auto timer = cnx->executor_->createDeadlineTimer(); +- timer->expires_from_now(delaySinceStartGrabCnx - +- std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); ++ timer->expires_after(delaySinceStartGrabCnx - ++ std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); + timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); }); + return timer; + } |