diff options
Diffstat (limited to 'lib/libdevdctl/consumer.cc')
-rw-r--r-- | lib/libdevdctl/consumer.cc | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/lib/libdevdctl/consumer.cc b/lib/libdevdctl/consumer.cc new file mode 100644 index 000000000000..2d5e7ebf549b --- /dev/null +++ b/lib/libdevdctl/consumer.cc @@ -0,0 +1,253 @@ +/*- + * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions, and the following disclaimer, + * without modification. + * 2. Redistributions in binary form must reproduce at minimum a disclaimer + * substantially similar to the "NO WARRANTY" disclaimer below + * ("Disclaimer") and any redistribution must be conditioned upon + * including a substantially similar Disclaimer requirement for further + * binary redistribution. + * + * NO WARRANTY + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGES. + * + * Authors: Justin T. Gibbs (Spectra Logic Corporation) + */ + +/** + * \file consumer.cc + */ + +#include <sys/cdefs.h> +#include <sys/poll.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include <err.h> +#include <errno.h> +#include <fcntl.h> +#include <syslog.h> +#include <unistd.h> + +#include <cstdarg> +#include <cstring> +#include <list> +#include <map> +#include <string> + +#include "guid.h" +#include "event.h" +#include "event_factory.h" +#include "exception.h" + +#include "consumer.h" +/*================================== Macros ==================================*/ +#define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x)) + +/*============================ Namespace Control =============================*/ +using std::string; +namespace DevdCtl +{ + +/*============================= Class Definitions ============================*/ +/*----------------------------- DevdCtl::Consumer ----------------------------*/ +//- Consumer Static Private Data ----------------------------------------------- +const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe"; + +//- Consumer Public Methods ---------------------------------------------------- +Consumer::Consumer(Event::BuildMethod *defBuilder, + EventFactory::Record *regEntries, + size_t numEntries) + : m_devdSockFD(-1), + m_eventFactory(defBuilder), + m_replayingEvents(false) +{ + m_eventFactory.UpdateRegistry(regEntries, numEntries); +} + +Consumer::~Consumer() +{ + DisconnectFromDevd(); +} + +bool +Consumer::ConnectToDevd() +{ + struct sockaddr_un devdAddr; + int sLen; + int result; + + if (m_devdSockFD != -1) { + /* Already connected. */ + syslog(LOG_DEBUG, "%s: Already connected.", __func__); + return (true); + } + syslog(LOG_INFO, "%s: Connecting to devd.", __func__); + + memset(&devdAddr, 0, sizeof(devdAddr)); + devdAddr.sun_family= AF_UNIX; + strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path)); + sLen = SUN_LEN(&devdAddr); + + m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0); + if (m_devdSockFD == -1) + err(1, "Unable to create socket"); + result = connect(m_devdSockFD, + reinterpret_cast<sockaddr *>(&devdAddr), + sLen); + if (result == -1) { + syslog(LOG_INFO, "Unable to connect to devd"); + DisconnectFromDevd(); + return (false); + } + + syslog(LOG_INFO, "Connection to devd successful"); + return (true); +} + +void +Consumer::DisconnectFromDevd() +{ + if (m_devdSockFD != -1) { + syslog(LOG_INFO, "Disconnecting from devd."); + close(m_devdSockFD); + } + m_devdSockFD = -1; +} + +std::string +Consumer::ReadEvent() +{ + char buf[MAX_EVENT_SIZE + 1]; + ssize_t len; + + len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL); + if (len == -1) + return (std::string("")); + else { + /* NULL-terminate the result */ + buf[len] = '\0'; + return (std::string(buf)); + } +} + +void +Consumer::ReplayUnconsumedEvents(bool discardUnconsumed) +{ + EventList::iterator event(m_unconsumedEvents.begin()); + bool replayed_any = (event != m_unconsumedEvents.end()); + + m_replayingEvents = true; + if (replayed_any) + syslog(LOG_INFO, "Started replaying unconsumed events"); + while (event != m_unconsumedEvents.end()) { + bool consumed((*event)->Process()); + if (consumed || discardUnconsumed) { + delete *event; + event = m_unconsumedEvents.erase(event); + } else { + event++; + } + } + if (replayed_any) + syslog(LOG_INFO, "Finished replaying unconsumed events"); + m_replayingEvents = false; +} + +bool +Consumer::SaveEvent(const Event &event) +{ + if (m_replayingEvents) + return (false); + m_unconsumedEvents.push_back(event.DeepCopy()); + return (true); +} + +Event * +Consumer::NextEvent() +{ + if (!Connected()) + return(NULL); + + Event *event(NULL); + try { + string evString; + + evString = ReadEvent(); + if (! evString.empty()) { + Event::TimestampEventString(evString); + event = Event::CreateEvent(m_eventFactory, evString); + } + } catch (const Exception &exp) { + exp.Log(); + DisconnectFromDevd(); + } + return (event); +} + +/* Capture and process buffered events. */ +void +Consumer::ProcessEvents() +{ + Event *event; + while ((event = NextEvent()) != NULL) { + if (event->Process()) + SaveEvent(*event); + delete event; + } +} + +void +Consumer::FlushEvents() +{ + std::string s; + + do + s = ReadEvent(); + while (! s.empty()) ; +} + +bool +Consumer::EventsPending() +{ + struct pollfd fds[1]; + int result; + + do { + fds->fd = m_devdSockFD; + fds->events = POLLIN; + fds->revents = 0; + result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0); + } while (result == -1 && errno == EINTR); + + if (result == -1) + err(1, "Polling for devd events failed"); + + if ((fds->revents & POLLERR) != 0) + throw Exception("Consumer::EventsPending(): " + "POLLERR detected on devd socket."); + + if ((fds->revents & POLLHUP) != 0) + throw Exception("Consumer::EventsPending(): " + "POLLHUP detected on devd socket."); + + return ((fds->revents & POLLIN) != 0); +} + +} // namespace DevdCtl |