aboutsummaryrefslogtreecommitdiff
path: root/net-p2p/pulsar-client-cpp
diff options
context:
space:
mode:
authorYuri Victorovich <yuri@FreeBSD.org>2025-03-28 09:21:23 +0000
committerYuri Victorovich <yuri@FreeBSD.org>2025-03-28 09:54:10 +0000
commit1559e2b08bd0410065139656f1b0c76c8b0d5831 (patch)
treed81b2ac30f3261016b41f7d2e9be45c6fc4acde5 /net-p2p/pulsar-client-cpp
parenta34ad504d51c6f26fdfe1af8f0377ab9d12486eb (diff)
Diffstat (limited to 'net-p2p/pulsar-client-cpp')
-rw-r--r--net-p2p/pulsar-client-cpp/Makefile1
-rw-r--r--net-p2p/pulsar-client-cpp/distinfo2
-rw-r--r--net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc831111
3 files changed, 1112 insertions, 2 deletions
diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile
index 23a17a8c156e..19d40f02baf2 100644
--- a/net-p2p/pulsar-client-cpp/Makefile
+++ b/net-p2p/pulsar-client-cpp/Makefile
@@ -13,7 +13,6 @@ LICENSE= APACHE20
LICENSE_FILE= ${WRKSRC}/LICENSE
ONLY_FOR_ARCHS= amd64 i386 # due to requirement of instruction sets crc32, pclmul
-BROKEN= compilation fails with boost-libs-1.87.0, see https://github.com/apache/pulsar-client-cpp/issues/475
BROKEN_i386= compilation fails due to overflow, see https://github.com/apache/pulsar-client-cpp/issues/449
BUILD_DEPENDS= ${LOCALBASE}/include/boost/algorithm/string.hpp:devel/boost-libs
diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo
index 3a3e4dec1501..3d7c80b515e0 100644
--- a/net-p2p/pulsar-client-cpp/distinfo
+++ b/net-p2p/pulsar-client-cpp/distinfo
@@ -1,3 +1,3 @@
-TIMESTAMP = 1736572558
+TIMESTAMP = 1743152964
SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671
SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627
diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
new file mode 100644
index 000000000000..ca6cb6a02135
--- /dev/null
+++ b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
@@ -0,0 +1,1111 @@
+- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+
+
+diff --git CMakeLists.txt CMakeLists.txt
+index b0046534..2efeec89 100644
+--- CMakeLists.txt
++++ CMakeLists.txt
+@@ -19,15 +19,16 @@
+
+ cmake_minimum_required(VERSION 3.13)
+
+-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
+-
+ option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
+ if (INTEGRATE_VCPKG)
+- set(USE_ASIO ON)
++ option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
+ if (NOT CMAKE_TOOLCHAIN_FILE)
+ set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
+ endif ()
++else ()
++ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
+ endif ()
++message(STATUS "USE_ASIO: ${USE_ASIO}")
+
+ option(BUILD_TESTS "Build tests" ON)
+ message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
+diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc
+index 7233b2c9..bc8da970 100644
+--- lib/AckGroupingTrackerEnabled.cc
++++ lib/AckGroupingTrackerEnabled.cc
+@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() {
+ this->flush();
+ std::lock_guard<std::mutex> lock(this->mutexTimer_);
+ if (this->timer_) {
+- ASIO_ERROR ec;
+- this->timer_->cancel(ec);
++ this->timer_->cancel();
+ }
+ }
+
+@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
+
+ std::lock_guard<std::mutex> lock(this->mutexTimer_);
+ this->timer_ = this->executor_->createDeadlineTimer();
+- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
++ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
+ auto self = shared_from_this();
+ this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
+ if (!ec) {
+diff --git lib/ClientConnection.cc lib/ClientConnection.cc
+index 2037722f..de226a85 100644
+--- lib/ClientConnection.cc
++++ lib/ClientConnection.cc
+@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
+ if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
+ LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
+ std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
+- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
++ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
+ }
+
+ LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
+@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
+ // Only send keep-alive probes if the broker supports it
+ keepAliveTimer_ = executor_->createDeadlineTimer();
+ if (keepAliveTimer_) {
+- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
+ auto weakSelf = weak_from_this();
+ keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
+ auto self = weakSelf.lock();
+@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
+ // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
+ // Check if we have a timer still before we set the request timer to pop again.
+ if (consumerStatsRequestTimer_) {
+- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
++ consumerStatsRequestTimer_->expires_after(operationsTimeout_);
+ auto weakSelf = weak_from_this();
+ consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
+ auto self = weakSelf.lock();
+@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee
+ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
+ #endif
+
+-/*
+- * TCP Connect handler
+- *
+- * if async_connect without any error, connected_ would be set to true
+- * at this point the connection is deemed valid to be used by clients of this class
+- */
+-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
+- if (!err) {
+- std::stringstream cnxStringStream;
+- try {
+- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
+- << "] ";
+- cnxString_ = cnxStringStream.str();
+- } catch (const ASIO_SYSTEM_ERROR& e) {
+- LOG_ERROR("Failed to get endpoint: " << e.what());
+- close(ResultRetryable);
+- return;
+- }
+- if (logicalAddress_ == physicalAddress_) {
+- LOG_INFO(cnxString_ << "Connected to broker");
+- } else {
+- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
+- << ", proxy: " << proxyServiceUrl_
+- << ", physical address:" << physicalAddress_);
+- }
++void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) {
++ std::stringstream cnxStringStream;
++ try {
++ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
++ cnxString_ = cnxStringStream.str();
++ } catch (const ASIO_SYSTEM_ERROR& e) {
++ LOG_ERROR("Failed to get endpoint: " << e.what());
++ close(ResultRetryable);
++ return;
++ }
++ if (logicalAddress_ == physicalAddress_) {
++ LOG_INFO(cnxString_ << "Connected to broker");
++ } else {
++ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
++ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_);
++ }
+
+- Lock lock(mutex_);
+- if (isClosed()) {
+- LOG_INFO(cnxString_ << "Connection already closed");
+- return;
+- }
+- state_ = TcpConnected;
+- lock.unlock();
++ Lock lock(mutex_);
++ if (isClosed()) {
++ LOG_INFO(cnxString_ << "Connection already closed");
++ return;
++ }
++ state_ = TcpConnected;
++ lock.unlock();
+
+- ASIO_ERROR error;
+- socket_->set_option(tcp::no_delay(true), error);
+- if (error) {
+- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
+- }
++ ASIO_ERROR error;
++ socket_->set_option(tcp::no_delay(true), error);
++ if (error) {
++ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
++ }
+
+- socket_->set_option(tcp::socket::keep_alive(true), error);
+- if (error) {
+- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
+- }
++ socket_->set_option(tcp::socket::keep_alive(true), error);
++ if (error) {
++ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
++ }
+
+- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
+- // should never happen, given that we're sending our own keep-alive probes (within the TCP
+- // connection) every 30 seconds
+- socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
+- if (error) {
+- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
+- }
++ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
++ // should never happen, given that we're sending our own keep-alive probes (within the TCP
++ // connection) every 30 seconds
++ socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
++ if (error) {
++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
++ }
+
+- // Send up to 10 probes before declaring the connection broken
+- socket_->set_option(tcp_keep_alive_count(10), error);
+- if (error) {
+- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
+- }
++ // Send up to 10 probes before declaring the connection broken
++ socket_->set_option(tcp_keep_alive_count(10), error);
++ if (error) {
++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
++ }
+
+- // Interval between probes: 6 seconds
+- socket_->set_option(tcp_keep_alive_interval(6), error);
+- if (error) {
+- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
+- }
++ // Interval between probes: 6 seconds
++ socket_->set_option(tcp_keep_alive_interval(6), error);
++ if (error) {
++ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
++ }
+
+- if (tlsSocket_) {
+- if (!isTlsAllowInsecureConnection_) {
+- ASIO_ERROR err;
+- Url service_url;
+- if (!Url::parse(physicalAddress_, service_url)) {
+- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
+- close();
+- return;
+- }
+- }
+- auto weakSelf = weak_from_this();
+- auto socket = socket_;
+- auto tlsSocket = tlsSocket_;
+- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
+- // fault might happen
+- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
+- auto self = weakSelf.lock();
+- if (self) {
+- self->handleHandshake(err);
+- }
+- };
+- tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
+- ASIO::bind_executor(strand_, callback));
+- } else {
+- handleHandshake(ASIO_SUCCESS);
+- }
+- } else if (endpointIterator != tcp::resolver::iterator()) {
+- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
+- // The connection failed. Try the next endpoint in the list.
+- ASIO_ERROR closeError;
+- socket_->close(closeError); // ignore the error of close
+- if (closeError) {
+- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+- }
+- connectTimeoutTask_->stop();
+- ++endpointIterator;
+- if (endpointIterator != tcp::resolver::iterator()) {
+- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
+- connectTimeoutTask_->start();
+- tcp::endpoint endpoint = *endpointIterator;
+- auto weakSelf = weak_from_this();
+- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
+- auto self = weakSelf.lock();
+- if (self) {
+- self->handleTcpConnected(err, endpointIterator);
+- }
+- });
+- } else {
+- if (err == ASIO::error::operation_aborted) {
+- // TCP connect timeout, which is not retryable
++ if (tlsSocket_) {
++ if (!isTlsAllowInsecureConnection_) {
++ ASIO_ERROR err;
++ Url service_url;
++ if (!Url::parse(physicalAddress_, service_url)) {
++ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
+ close();
+- } else {
+- close(ResultRetryable);
++ return;
+ }
+ }
++ auto weakSelf = weak_from_this();
++ auto socket = socket_;
++ auto tlsSocket = tlsSocket_;
++ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
++ // fault might happen
++ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
++ auto self = weakSelf.lock();
++ if (self) {
++ self->handleHandshake(err);
++ }
++ };
++ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
++ ASIO::bind_executor(strand_, callback));
+ } else {
+- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
+- close(ResultRetryable);
++ handleHandshake(ASIO_SUCCESS);
+ }
+ }
+
+@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() {
+ }
+
+ LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
+- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
++ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port());
+ auto weakSelf = weak_from_this();
+- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
+- auto self = weakSelf.lock();
+- if (self) {
+- self->handleResolve(err, iterator);
+- }
+- });
++ resolver_->async_resolve(
++ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
++ auto self = weakSelf.lock();
++ if (!self) {
++ return;
++ }
++ if (err) {
++ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
++ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
++ close();
++ return;
++ }
++ if (results.empty()) {
++ LOG_ERROR(cnxString_ << "No IP address found");
++ close();
++ return;
++ }
++ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
++ ClientConnectionPtr ptr = weakSelf.lock();
++ if (!ptr) {
++ // Connection was already destroyed
++ return;
++ }
++
++ if (ptr->state_ != Ready) {
++ LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
++ << ptr->connectTimeoutTask_->getPeriodMs()
++ << " ms, close the socket");
++ PeriodicTask::ErrorCode err;
++ ptr->socket_->close(err);
++ if (err) {
++ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
++ }
++ }
++ ptr->connectTimeoutTask_->stop();
++ });
++ connectTimeoutTask_->start();
++ std::vector<tcp::resolver::endpoint_type> endpoints;
++ for (const auto& result : results) {
++ endpoints.emplace_back(result.endpoint());
++ }
++ asyncConnect(endpoints, 0);
++ });
+ }
+
+-void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
+- if (err) {
+- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
+- close();
++void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) {
++ if (index >= endpoints.size()) {
++ close(ResultRetryable);
+ return;
+ }
+-
+ auto weakSelf = weak_from_this();
+- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
+- ClientConnectionPtr ptr = weakSelf.lock();
+- if (!ptr) {
+- // Connection was already destroyed
++ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) {
++ auto self = weakSelf.lock();
++ if (!self) {
+ return;
+ }
+-
+- if (ptr->state_ != Ready) {
+- LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
+- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
+- PeriodicTask::ErrorCode err;
+- ptr->socket_->close(err);
+- if (err) {
+- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
+- }
++ if (err) {
++ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
++ asyncConnect(endpoints, index + 1);
++ return;
+ }
+- ptr->connectTimeoutTask_->stop();
++ completeConnect(endpoints[index]);
+ });
+-
+- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
+- connectTimeoutTask_->start();
+- if (endpointIterator != tcp::resolver::iterator()) {
+- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
+- << " to " << endpointIterator->endpoint());
+- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
+- auto self = weakSelf.lock();
+- if (self) {
+- self->handleTcpConnected(err, endpointIterator);
+- }
+- });
+- } else {
+- LOG_WARN(cnxString_ << "No IP address found");
+- close();
+- return;
+- }
+ }
+
+ void ClientConnection::readNextCommand() {
+@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
+ LookupRequestData requestData;
+ requestData.promise = promise;
+ requestData.timer = executor_->createDeadlineTimer();
+- requestData.timer->expires_from_now(operationsTimeout_);
++ requestData.timer->expires_after(operationsTimeout_);
+ auto weakSelf = weak_from_this();
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() {
+ PairSharedBuffer buffer =
+ Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
+
+- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
+- // callback is called, an invalid buffer range might be passed to the underlying socket send.
++ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before
++ // the callback is called, an invalid buffer range might be passed to the underlying socket
++ // send.
+ asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
+ handleSendPair(err);
+ }));
+@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
+
+ PendingRequestData requestData;
+ requestData.timer = executor_->createDeadlineTimer();
+- requestData.timer->expires_from_now(operationsTimeout_);
++ requestData.timer->expires_after(operationsTimeout_);
+ auto weakSelf = weak_from_this();
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() {
+ // be zero And we do not attempt to dereference the pointer.
+ Lock lock(mutex_);
+ if (keepAliveTimer_) {
+- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
++ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
+ auto weakSelf = weak_from_this();
+ keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
+ auto self = weakSelf.lock();
+@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
+ LastMessageIdRequestData requestData;
+ requestData.promise = promise;
+ requestData.timer = executor_->createDeadlineTimer();
+- requestData.timer->expires_from_now(operationsTimeout_);
++ requestData.timer->expires_after(operationsTimeout_);
+ auto weakSelf = weak_from_this();
+ requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
+ lock.unlock();
+
+ auto weakSelf = weak_from_this();
+- timer->expires_from_now(operationsTimeout_);
++ timer->expires_after(operationsTimeout_);
+ timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+ if (!self) {
+@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
+ auto it = pendingRequests_.find(requestId);
+ if (it != pendingRequests_.end()) {
+ it->second.promise.setFailed(ResultDisconnected);
+- ASIO_ERROR ec;
+- it->second.timer->cancel(ec);
++ it->second.timer->cancel();
+ pendingRequests_.erase(it);
+ }
+ }
+diff --git lib/ClientConnection.h lib/ClientConnection.h
+index 7646f85e..14e07652 100644
+--- lib/ClientConnection.h
++++ lib/ClientConnection.h
+@@ -25,13 +25,13 @@
+ #include <atomic>
+ #ifdef USE_ASIO
+ #include <asio/bind_executor.hpp>
+-#include <asio/io_service.hpp>
++#include <asio/io_context.hpp>
+ #include <asio/ip/tcp.hpp>
+ #include <asio/ssl/stream.hpp>
+ #include <asio/strand.hpp>
+ #else
+ #include <boost/asio/bind_executor.hpp>
+-#include <boost/asio/io_service.hpp>
++#include <boost/asio/io_context.hpp>
+ #include <boost/asio/ip/tcp.hpp>
+ #include <boost/asio/ssl/stream.hpp>
+ #include <boost/asio/strand.hpp>
+@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
+ DeadlineTimerPtr timer;
+ };
+
+- /*
+- * handler for connectAsync
+- * creates a ConnectionPtr which has a valid ClientConnection object
+- * although not usable at this point, since this is just tcp connection
+- * Pulsar - Connect/Connected has yet to happen
+- */
+- void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
++ void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index);
++ void completeConnect(ASIO::ip::tcp::endpoint endpoint);
+
+ void handleHandshake(const ASIO_ERROR& err);
+
+@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
+
+ void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
+
+- void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
+-
+ void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
+ void handleSendPair(const ASIO_ERROR& err);
+ void sendPendingCommands();
+@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
+ */
+ SocketPtr socket_;
+ TlsSocketPtr tlsSocket_;
+- ASIO::strand<ASIO::io_service::executor_type> strand_;
++ ASIO::strand<ASIO::io_context::executor_type> strand_;
+
+ const std::string logicalAddress_;
+ /*
+diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc
+index 250845b3..cfdb0b2d 100644
+--- lib/ConsumerImpl.cc
++++ lib/ConsumerImpl.cc
+@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b
+ }
+
+ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
++ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+ std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
+ auto self = weakSelf.lock();
+@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
+ }
+ remainTime -= next;
+
+- timer->expires_from_now(next);
++ timer->expires_after(next);
+
+ auto self = shared_from_this();
+ timer->async_wait([this, backoff, remainTime, timer, next, callback,
+@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
+ }
+
+ void ConsumerImpl::cancelTimers() noexcept {
+- ASIO_ERROR ec;
+- batchReceiveTimer_->cancel(ec);
+- checkExpiredChunkedTimer_->cancel(ec);
++ batchReceiveTimer_->cancel();
++ checkExpiredChunkedTimer_->cancel();
+ unAckedMessageTrackerPtr_->stop();
+ consumerStatsBasePtr_->stop();
+ }
+diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc
+index 098f2d5b..76d99370 100644
+--- lib/ConsumerImplBase.cc
++++ lib/ConsumerImplBase.cc
+@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
+
+ void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+ if (timeoutMs > 0) {
+- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
++ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
+ std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+diff --git lib/ExecutorService.cc lib/ExecutorService.cc
+index 794e3619..7f2a2c14 100644
+--- lib/ExecutorService.cc
++++ lib/ExecutorService.cc
+@@ -18,6 +18,12 @@
+ */
+ #include "ExecutorService.h"
+
++#ifdef USE_ASIO
++#include <asio/post.hpp>
++#else
++#include <boost/asio/post.hpp>
++#endif
++
+ #include "LogUtils.h"
+ #include "TimeUtils.h"
+ DECLARE_LOG_OBJECT()
+@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); }
+ void ExecutorService::start() {
+ auto self = shared_from_this();
+ std::thread t{[this, self] {
+- LOG_DEBUG("Run io_service in a single thread");
+- ASIO_ERROR ec;
++ LOG_DEBUG("Run io_context in a single thread");
+ while (!closed_) {
+- io_service_.restart();
+- IOService::work work{getIOService()};
+- io_service_.run(ec);
+- }
+- if (ec) {
+- LOG_ERROR("Failed to run io_service: " << ec.message());
+- } else {
+- LOG_DEBUG("Event loop of ExecutorService exits successfully");
++ io_context_.restart();
++ auto work{ASIO::make_work_guard(io_context_)};
++ io_context_.run();
+ }
++ LOG_DEBUG("Event loop of ExecutorService exits successfully");
+ {
+ std::lock_guard<std::mutex> lock{mutex_};
+ ioServiceDone_ = true;
+@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() {
+ }
+
+ /*
+- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance
++ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance
+ * @ returns shared_ptr to this socket
+ */
+ SocketPtr ExecutorService::createSocket() {
+ try {
+- return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
++ return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
+ restart();
+ auto error = std::string("Failed to create socket: ") + e.what();
+@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont
+ }
+
+ /*
+- * factory method of Resolver object associated with io_service_ instance
++ * factory method of Resolver object associated with io_context_ instance
+ * @returns shraed_ptr to resolver object
+ */
+ TcpResolverPtr ExecutorService::createTcpResolver() {
+ try {
+- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
++ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
+ restart();
+ auto error = std::string("Failed to create resolver: ") + e.what();
+@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
+
+ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
+ try {
+- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
++ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
+ } catch (const ASIO_SYSTEM_ERROR &e) {
+ restart();
+ auto error = std::string("Failed to create steady_timer: ") + e.what();
+@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
+ }
+ }
+
+-void ExecutorService::restart() { io_service_.stop(); }
++void ExecutorService::restart() { io_context_.stop(); }
+
+ void ExecutorService::close(long timeoutMs) {
+ bool expectedState = false;
+@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) {
+ return;
+ }
+ if (timeoutMs == 0) { // non-blocking
+- io_service_.stop();
++ io_context_.stop();
+ return;
+ }
+
+ std::unique_lock<std::mutex> lock{mutex_};
+- io_service_.stop();
++ io_context_.stop();
+ if (timeoutMs > 0) {
+ cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; });
+ } else { // < 0
+@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) {
+ }
+ }
+
+-void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
++void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); }
+
+ /////////////////////
+
+diff --git lib/ExecutorService.h lib/ExecutorService.h
+index 89d06d30..626cb203 100644
+--- lib/ExecutorService.h
++++ lib/ExecutorService.h
+@@ -23,11 +23,11 @@
+
+ #include <atomic>
+ #ifdef USE_ASIO
+-#include <asio/io_service.hpp>
++#include <asio/io_context.hpp>
+ #include <asio/ip/tcp.hpp>
+ #include <asio/ssl.hpp>
+ #else
+-#include <boost/asio/io_service.hpp>
++#include <boost/asio/io_context.hpp>
+ #include <boost/asio/ip/tcp.hpp>
+ #include <boost/asio/ssl.hpp>
+ #endif
+@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
+ typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
+ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
+ public:
+- using IOService = ASIO::io_service;
++ using IOService = ASIO::io_context;
+ using SharedPtr = std::shared_ptr<ExecutorService>;
+
+ static SharedPtr create();
+@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
+ // See TimeoutProcessor for the semantics of the parameter.
+ void close(long timeoutMs = 3000);
+
+- IOService &getIOService() { return io_service_; }
++ IOService &getIOService() { return io_context_; }
+ bool isClosed() const noexcept { return closed_; }
+
+ private:
+ /*
+- * io_service is our interface to os, io object schedule async ops on this object
++ * io_context is our interface to os, io object schedule async ops on this object
+ */
+- IOService io_service_;
++ IOService io_context_;
+
+ std::atomic_bool closed_{false};
+ std::mutex mutex_;
+diff --git lib/HandlerBase.cc lib/HandlerBase.cc
+index 65aa0db1..71902481 100644
+--- lib/HandlerBase.cc
++++ lib/HandlerBase.cc
+@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
+ redirectedClusterURI_("") {}
+
+ HandlerBase::~HandlerBase() {
+- ASIO_ERROR ignored;
+- timer_->cancel(ignored);
+- creationTimer_->cancel(ignored);
++ timer_->cancel();
++ creationTimer_->cancel();
+ }
+
+ void HandlerBase::start() {
+@@ -61,15 +60,14 @@ void HandlerBase::start() {
+ if (state_.compare_exchange_strong(state, Pending)) {
+ grabCnx();
+ }
+- creationTimer_->expires_from_now(operationTimeut_);
++ creationTimer_->expires_after(operationTimeut_);
+ std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+ creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
+ auto self = weakSelf.lock();
+ if (self && !error) {
+ LOG_WARN("Cancel the pending reconnection due to the start timeout");
+ connectionFailed(ResultTimeout);
+- ASIO_ERROR ignored;
+- timer_->cancel(ignored);
++ timer_->cancel();
+ }
+ });
+ }
+@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
+ connectionTimeMs_ =
+ duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
+ // Prevent the creationTimer_ from cancelling the timer_ in future
+- ASIO_ERROR ignored;
+- creationTimer_->cancel(ignored);
++ creationTimer_->cancel();
+ LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
+ } else if (isResultRetryable(result)) {
+ scheduleReconnection();
+@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
+ TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
+
+ LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
+- timer_->expires_from_now(delay);
++ timer_->expires_after(delay);
+ // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
+ // so we will not run into the case where grabCnx is invoked on out of scope handler
+ auto name = getName();
+diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc
+index dddade5c..61fbf7b8 100644
+--- lib/MultiTopicsConsumerImpl.cc
++++ lib/MultiTopicsConsumerImpl.cc
+@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
+ return numberOfConnectedConsumer;
+ }
+ void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
+- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
+ auto weakSelf = weak_from_this();
+ partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
+ // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
+
+ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
+ if (partitionsUpdateTimer_) {
+- ASIO_ERROR ec;
+- partitionsUpdateTimer_->cancel(ec);
++ partitionsUpdateTimer_->cancel();
+ }
+ }
+
+diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc
+index e443496d..e50b4ca2 100644
+--- lib/NegativeAcksTracker.cc
++++ lib/NegativeAcksTracker.cc
+@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() {
+ return;
+ }
+ std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
+- timer_->expires_from_now(timerInterval_);
++ timer_->expires_after(timerInterval_);
+ timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
+ if (auto self = weakSelf.lock()) {
+ self->handleTimer(ec);
+@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
+
+ void NegativeAcksTracker::close() {
+ closed_ = true;
+- ASIO_ERROR ec;
+- timer_->cancel(ec);
++ timer_->cancel();
+ std::lock_guard<std::mutex> lock(mutex_);
+ nackedMessages_.clear();
+ }
+diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc
+index 4178096c..923c038b 100644
+--- lib/PartitionedProducerImpl.cc
++++ lib/PartitionedProducerImpl.cc
+@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
+
+ void PartitionedProducerImpl::runPartitionUpdateTask() {
+ auto weakSelf = weak_from_this();
+- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
++ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
+ partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+ if (self) {
+@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
+
+ void PartitionedProducerImpl::cancelTimers() noexcept {
+ if (partitionsUpdateTimer_) {
+- ASIO_ERROR ec;
+- partitionsUpdateTimer_->cancel(ec);
++ partitionsUpdateTimer_->cancel();
+ }
+ }
+
+diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc
+index 4fc7bb61..07d9a7bc 100644
+--- lib/PatternMultiTopicsConsumerImpl.cc
++++ lib/PatternMultiTopicsConsumerImpl.cc
+@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern()
+
+ void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
+ autoDiscoveryRunning_ = false;
+- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+
+ auto weakSelf = weak_from_this();
+ autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
+@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() {
+ LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
+
+ if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
+- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
++ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+ auto weakSelf = weak_from_this();
+ autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
+ if (auto self = weakSelf.lock()) {
+@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+ MultiTopicsConsumerImpl::closeAsync(callback);
+ }
+
+-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
+- ASIO_ERROR ec;
+- autoDiscoveryTimer_->cancel(ec);
+-}
++void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); }
+diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc
+index 9fde012a..4b5f9621 100644
+--- lib/PeriodicTask.cc
++++ lib/PeriodicTask.cc
+@@ -29,7 +29,7 @@ void PeriodicTask::start() {
+ state_ = Ready;
+ if (periodMs_ >= 0) {
+ std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
+- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
++ timer_->expires_after(std::chrono::milliseconds(periodMs_));
+ timer_->async_wait([weakSelf](const ErrorCode& ec) {
+ auto self = weakSelf.lock();
+ if (self) {
+@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
+ if (!state_.compare_exchange_strong(state, Closing)) {
+ return;
+ }
+- ErrorCode ec;
+- timer_->cancel(ec);
++ timer_->cancel();
+ state_ = Pending;
+ }
+
+@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
+ // state_ may be changed in handleTimeout, so we check state_ again
+ if (state_ == Ready) {
+ auto self = shared_from_this();
+- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
++ timer_->expires_after(std::chrono::milliseconds(periodMs_));
+ timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
+ }
+ }
+diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc
+index 4399ce5f..8b112bf1 100644
+--- lib/ProducerImpl.cc
++++ lib/ProducerImpl.cc
+@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
+ bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
+ bool isFull = batchMessageContainer_->add(msg, callback);
+ if (isFirstMessage) {
+- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
++ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+ auto weakSelf = weak_from_this();
+ batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() {
+
+ void ProducerImpl::cancelTimers() noexcept {
+ dataKeyRefreshTask_.stop();
+- ASIO_ERROR ec;
+- batchTimer_->cancel(ec);
+- sendTimer_->cancel(ec);
++ batchTimer_->cancel();
++ sendTimer_->cancel();
+ }
+
+ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
+@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() {
+ }
+
+ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
+- sendTimer_->expires_from_now(expiryTime);
++ sendTimer_->expires_after(expiryTime);
+
+ auto weakSelf = weak_from_this();
+ sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
+diff --git lib/RetryableOperation.h lib/RetryableOperation.h
+index dba190f4..8a235d3a 100644
+--- lib/RetryableOperation.h
++++ lib/RetryableOperation.h
+@@ -26,8 +26,8 @@
+ #include <functional>
+ #include <memory>
+
++#include "AsioTimer.h"
+ #include "Backoff.h"
+-#include "ExecutorService.h"
+ #include "Future.h"
+ #include "LogUtils.h"
+ #include "ResultUtils.h"
+@@ -68,8 +68,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
+
+ void cancel() {
+ promise_.setFailed(ResultDisconnected);
+- ASIO_ERROR ec;
+- timer_->cancel(ec);
++ timer_->cancel();
+ }
+
+ private:
+@@ -107,7 +106,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
+ }
+
+ auto delay = std::min(backoff_.next(), remainingTime);
+- timer_->expires_from_now(delay);
++ timer_->expires_after(delay);
+
+ auto nextRemainingTime = remainingTime - delay;
+ LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
+diff --git lib/RetryableOperationCache.h lib/RetryableOperationCache.h
+index e42460dd..5030c94e 100644
+--- lib/RetryableOperationCache.h
++++ lib/RetryableOperationCache.h
+@@ -18,7 +18,6 @@
+ */
+ #pragma once
+
+-#include <chrono>
+ #include <mutex>
+ #include <unordered_map>
+
+diff --git lib/SharedBuffer.h lib/SharedBuffer.h
+index 26fc59ed..a6ced186 100644
+--- lib/SharedBuffer.h
++++ lib/SharedBuffer.h
+@@ -151,11 +151,11 @@ class SharedBuffer {
+
+ inline bool writable() const { return writableBytes() > 0; }
+
+- ASIO::const_buffers_1 const_asio_buffer() const {
+- return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
++ ASIO::const_buffer const_asio_buffer() const {
++ return ASIO::const_buffer(ptr_ + readIdx_, readableBytes());
+ }
+
+- ASIO::mutable_buffers_1 asio_buffer() {
++ ASIO::mutable_buffer asio_buffer() {
+ assert(data_);
+ return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
+ }
+diff --git lib/UnAckedMessageTrackerEnabled.cc lib/UnAckedMessageTrackerEnabled.cc
+index e371af99..3b959d8a 100644
+--- lib/UnAckedMessageTrackerEnabled.cc
++++ lib/UnAckedMessageTrackerEnabled.cc
+@@ -34,7 +34,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
+ timeoutHandlerHelper();
+ ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
+ timer_ = executorService->createDeadlineTimer();
+- timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
++ timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
+ std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
+ timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+@@ -173,9 +173,8 @@ void UnAckedMessageTrackerEnabled::clear() {
+ }
+
+ void UnAckedMessageTrackerEnabled::stop() {
+- ASIO_ERROR ec;
+ if (timer_) {
+- timer_->cancel(ec);
++ timer_->cancel();
+ }
+ }
+ } /* namespace pulsar */
+diff --git lib/stats/ConsumerStatsImpl.cc lib/stats/ConsumerStatsImpl.cc
+index 0eefabdc..e8bd919a 100644
+--- lib/stats/ConsumerStatsImpl.cc
++++ lib/stats/ConsumerStatsImpl.cc
+@@ -85,7 +85,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy
+ }
+
+ void ConsumerStatsImpl::scheduleTimer() {
+- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
++ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
+ std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
+ timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+diff --git lib/stats/ConsumerStatsImpl.h lib/stats/ConsumerStatsImpl.h
+index 3333ea85..35fda9b4 100644
+--- lib/stats/ConsumerStatsImpl.h
++++ lib/stats/ConsumerStatsImpl.h
+@@ -59,10 +59,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>
+ ConsumerStatsImpl(const ConsumerStatsImpl& stats);
+ void flushAndReset(const ASIO_ERROR&);
+ void start() override;
+- void stop() override {
+- ASIO_ERROR error;
+- timer_->cancel(error);
+- }
++ void stop() override { timer_->cancel(); }
+ void receivedMessage(Message&, Result) override;
+ void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
+ virtual ~ConsumerStatsImpl();
+diff --git lib/stats/ProducerStatsImpl.cc lib/stats/ProducerStatsImpl.cc
+index 15e9e67e..b5e00794 100644
+--- lib/stats/ProducerStatsImpl.cc
++++ lib/stats/ProducerStatsImpl.cc
+@@ -109,7 +109,7 @@ void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) {
+ ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
+
+ void ProducerStatsImpl::scheduleTimer() {
+- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
++ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
+ std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
+ timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
+ auto self = weakSelf.lock();
+diff --git tests/AuthPluginTest.cc tests/AuthPluginTest.cc
+index 24549d7f..ed0511ea 100644
+--- tests/AuthPluginTest.cc
++++ tests/AuthPluginTest.cc
+@@ -309,16 +309,17 @@ namespace testAthenz {
+ std::string principalToken;
+ void mockZTS(Latch& latch, int port) {
+ LOG_INFO("-- MockZTS started");
+- ASIO::io_service io;
+- ASIO::ip::tcp::iostream stream;
++ ASIO::io_context io;
++ ASIO::ip::tcp::socket socket(io);
+ ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
+
+ LOG_INFO("-- MockZTS waiting for connnection");
+ latch.countdown();
+- acceptor.accept(*stream.rdbuf());
++ acceptor.accept(socket);
+ LOG_INFO("-- MockZTS got connection");
+
+ std::string headerLine;
++ ASIO::ip::tcp::iostream stream(std::move(socket));
+ while (getline(stream, headerLine)) {
+ std::vector<std::string> kv;
+ boost::algorithm::split(kv, headerLine, boost::is_any_of(" "));
+diff --git tests/ConsumerTest.h tests/ConsumerTest.h
+index 82482875..9d190c10 100644
+--- tests/ConsumerTest.h
++++ tests/ConsumerTest.h
+@@ -46,8 +46,8 @@ class ConsumerTest {
+ return nullptr;
+ }
+ auto timer = cnx->executor_->createDeadlineTimer();
+- timer->expires_from_now(delaySinceStartGrabCnx -
+- std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
++ timer->expires_after(delaySinceStartGrabCnx -
++ std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
+ timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
+ return timer;
+ }