diff options
author | Michal 'vorner' Vaner <michal.vaner@nic.cz> | 2012-05-21 13:56:57 +0200 |
---|---|---|
committer | Michal 'vorner' Vaner <michal.vaner@nic.cz> | 2012-05-21 13:59:00 +0200 |
commit | 0e0d28e6bf3276aaf59c1c9da97e1bee538eae09 (patch) | |
tree | 4df8ed648de1c86980b0c2d4f21bf91153670375 /src/lib/config | |
parent | [master] Merge branch 'trac1207' (diff) | |
parent | [1914] Use .empty() instead of == "" (diff) | |
download | kea-0e0d28e6bf3276aaf59c1c9da97e1bee538eae09.tar.xz kea-0e0d28e6bf3276aaf59c1c9da97e1bee538eae09.zip |
Merge #1914
This is the asynchronous read on ModuleCCSession.
Diffstat (limited to 'src/lib/config')
-rw-r--r-- | src/lib/config/ccsession.cc | 95 | ||||
-rw-r--r-- | src/lib/config/ccsession.h | 132 | ||||
-rw-r--r-- | src/lib/config/tests/ccsession_unittests.cc | 283 | ||||
-rw-r--r-- | src/lib/config/tests/fake_session.cc | 8 | ||||
-rw-r--r-- | src/lib/config/tests/fake_session.h | 2 |
5 files changed, 511 insertions, 9 deletions
diff --git a/src/lib/config/ccsession.cc b/src/lib/config/ccsession.cc index 63fa4cdcf9..d4c6653b6c 100644 --- a/src/lib/config/ccsession.cc +++ b/src/lib/config/ccsession.cc @@ -601,6 +601,11 @@ ModuleCCSession::checkCommand() { ConstElementPtr cmd, routing, data; if (session_.group_recvmsg(routing, data, true)) { + // In case the message is wanted asynchronously, it gets used. + if (checkAsyncRecv(routing, data)) { + return (0); + } + /* ignore result messages (in case we're out of sync, to prevent * pingpongs */ if (data->getType() != Element::map || data->contains("result")) { @@ -764,5 +769,95 @@ ModuleCCSession::sendStopping() { session_.group_sendmsg(cmd, "ConfigManager"); } +class ModuleCCSession::AsyncRecvRequest { +public: // Everything is public here, as the definition is hidden anyway + AsyncRecvRequest(const AsyncRecvCallback& cb, const string& rcp, int sq, + bool reply) : + callback(cb), + recipient(rcp), + seq(sq), + is_reply(reply) + {} + const AsyncRecvCallback callback; + const string recipient; + const int seq; + const bool is_reply; +}; + +ModuleCCSession::AsyncRecvRequestID +ModuleCCSession::groupRecvMsgAsync(const AsyncRecvCallback& callback, + bool is_reply, int seq, + const string& recipient) { + // This just stores the request, the handling is done in checkCommand() + + // push_back would be simpler, but it does not return the iterator we need + return (async_recv_requests_.insert(async_recv_requests_.end(), + AsyncRecvRequest(callback, recipient, + seq, is_reply))); +} + +bool +ModuleCCSession::checkAsyncRecv(const ConstElementPtr& envelope, + const ConstElementPtr& msg) +{ + for (AsyncRecvRequestID request(async_recv_requests_.begin()); + request != async_recv_requests_.end(); ++request) { + // Just go through all the requests and look for a matching one + if (requestMatch(*request, envelope)) { + // We want the request to be still alive at the time we + // call the callback. But we need to remove it on an exception + // too, so we use the class. If just C++ had the finally keyword. + class RequestDeleter { + public: + RequestDeleter(AsyncRecvRequests& requests, + AsyncRecvRequestID& request) : + requests_(requests), + request_(request) + { } + ~RequestDeleter() { + requests_.erase(request_); + } + private: + AsyncRecvRequests& requests_; + AsyncRecvRequestID& request_; + }; + RequestDeleter deleter(async_recv_requests_, request); + // Call the callback + request->callback(envelope, msg, request); + return (true); + } + } + return (false); +} + +bool +ModuleCCSession::requestMatch(const AsyncRecvRequest& request, + const ConstElementPtr& envelope) const +{ + if (request.is_reply != envelope->contains("reply")) { + // Wrong type of message + return (false); + } + if (request.is_reply && + (request.seq == -1 || + request.seq == envelope->get("reply")->intValue())) { + // This is the correct reply + return (true); + } + if (!request.is_reply && + (request.recipient.empty() || + request.recipient == envelope->get("group")->stringValue())) { + // This is the correct command + return (true); + } + // If nothing from the above, we don't want it + return (false); +} + +void +ModuleCCSession::cancelAsyncRecv(const AsyncRecvRequestID& id) { + async_recv_requests_.erase(id); +} + } } diff --git a/src/lib/config/ccsession.h b/src/lib/config/ccsession.h index 059968c71a..e96a33d44b 100644 --- a/src/lib/config/ccsession.h +++ b/src/lib/config/ccsession.h @@ -15,13 +15,16 @@ #ifndef __CCSESSION_H #define __CCSESSION_H 1 -#include <string> - #include <config/config_data.h> #include <config/module_spec.h> + #include <cc/session.h> #include <cc/data.h> +#include <string> +#include <list> +#include <boost/function.hpp> + namespace isc { namespace config { @@ -358,15 +361,140 @@ public: return (session_.group_recvmsg(envelope, msg, nonblock, seq)); }; + /// \brief Forward declaration of internal data structure. + /// + /// This holds information about one asynchronous request to receive + /// a message. It is declared as public to allow declaring other derived + /// types, but without showing the internal representation. + class AsyncRecvRequest; + + /// \brief List of all requests for asynchronous reads. + typedef std::list<AsyncRecvRequest> AsyncRecvRequests; + + /// \brief Identifier of single request for asynchronous read. + typedef AsyncRecvRequests::iterator AsyncRecvRequestID; + + /// \brief Callback which is called when an asynchronous receive finishes. + /// + /// This is the callback used by groupRecvMsgAsync() function. It is called + /// when a matching message arrives. It receives following parameters when + /// called: + /// - The envelope of the message + /// - The message itself + /// - The ID of the request, as returned by corresponding groupRecvMsgAsync + /// call. + /// + /// It is possible to throw exceptions from the callback, but they will not + /// be caught and they will get propagated out through the checkCommand() + /// call. This, if not handled on higher level, will likely terminate the + /// application. However, the ModuleCCSession internals will be in + /// well-defined state after the call (both the callback and the message + /// will be removed from the queues as already called). + typedef boost::function3<void, const isc::data::ConstElementPtr&, + const isc::data::ConstElementPtr&, + const AsyncRecvRequestID&> + AsyncRecvCallback; + + /// \brief Receive a message from the CC session asynchronously. + /// + /// This registers a callback which is called when a matching message + /// is received. This message returns immediately. + /// + /// Once a matching message arrives, the callback is called with the + /// envelope of the message, the message itself and the result of this + /// function call (which might be useful for identifying which of many + /// events the recipient is waiting for this is). This makes the callback + /// used and is not called again even if a message that would match + /// arrives later (this is a single-shot callback). + /// + /// The callback is never called from within this function. Even if there + /// are queued messages, the callback would be called once checkCommand() + /// is invoked (possibly from start() or the constructor). + /// + /// The matching is as follows. If is_reply is true, only replies are + /// considered. In that case, if seq is -1, any reply is accepted. If + /// it is something else than -1, only the reply with matching seq is + /// taken. This may be used to receive replies to commands + /// asynchronously. + /// + /// In case the is_reply is false, the function looks for command messages. + /// The seq parameter is ignored, but the recipient one is considered. If + /// it is an empty string, any command is taken. If it is non-empty, only + /// commands addressed to the recipient channel (eg. group - instance is + /// ignored for now) are taken. This can be used to receive foreign commands + /// or notifications. In such case, it might be desirable to call the + /// groupRecvMsgAsync again from within the callback, to receive any future + /// commands or events of the same type. + /// + /// The interaction with other receiving functions is slightly complicated. + /// The groupRecvMsg call takes precedence. If the message matches its + /// parameters, it steals the message and no callback matching it as well + /// is called. Then, all the queued asynchronous receives are considered, + /// with the oldest active ones taking precedence (they work as FIFO). + /// If none of them matches, generic command and config handling takes + /// place. If it is not handled by that, the message is dropped. However, + /// it is better if there's just one place that wants to receive each given + /// message. + /// + /// \exception std::bad_alloc if there isn't enough memory to store the + /// callback. + /// \param callback is the function to be called when a matching message + /// arrives. + /// \param is_reply specifies if the desired message should be a reply or + /// a command. + /// \param seq specifies the reply sequence number in case a reply is + /// desired. The default -1 means any reply is OK. + /// \param recipient is the CC channel to which the command should be + /// addressed to match (in case is_reply is false). Empty means any + /// command is good one. + /// \return An identifier of the request. This will be passed to the + /// callback or can be used to cancel the request by cancelAsyncRecv. + /// \todo Decide what to do with instance and what was it meant for anyway. + AsyncRecvRequestID groupRecvMsgAsync(const AsyncRecvCallback& callback, + bool is_reply, int seq = -1, + const std::string& recipient = + std::string()); + + /// \brief Removes yet unused request for asynchronous receive. + /// + /// This function cancels a request previously queued by + /// groupRecvMsgAsync(). You may use it only before the callback was + /// already triggered. If you call it with an ID of callback that + /// already happened or was already canceled, the behaviour is undefined + /// (but something like a crash is very likely, as the function removes + /// an item from a list and this would be removing it from a list that + /// does not contain the item). + /// + /// It is important to cancel requests that are no longer going to happen + /// for some reason, as the request would occupy memory forever. + /// + /// \param id The id of request as returned by groupRecvMsgAsync. + void cancelAsyncRecv(const AsyncRecvRequestID& id); + private: ModuleSpec readModuleSpecification(const std::string& filename); void startCheck(); void sendStopping(); + /// \brief Check if the message is wanted by asynchronous read + /// + /// It checks if any of the previously queued requests match + /// the message. If so, the callback is dispatched and removed. + /// + /// \param envelope The envelope of the message. + /// \param msg The actual message data. + /// \return True if the message was used for a callback, false + /// otherwise. + bool checkAsyncRecv(const data::ConstElementPtr& envelope, + const data::ConstElementPtr& msg); + /// \brief Checks if a message with this envelope matches the request + bool requestMatch(const AsyncRecvRequest& request, + const data::ConstElementPtr& envelope) const; bool started_; std::string module_name_; isc::cc::AbstractSession& session_; ModuleSpec module_specification_; + AsyncRecvRequests async_recv_requests_; isc::data::ConstElementPtr handleConfigUpdate( isc::data::ConstElementPtr new_config); diff --git a/src/lib/config/tests/ccsession_unittests.cc b/src/lib/config/tests/ccsession_unittests.cc index abaff8e95d..3fca741bbb 100644 --- a/src/lib/config/tests/ccsession_unittests.cc +++ b/src/lib/config/tests/ccsession_unittests.cc @@ -27,11 +27,13 @@ #include <log/logger_name.h> #include <boost/scoped_ptr.hpp> +#include <boost/bind.hpp> using namespace isc::data; using namespace isc::config; using namespace isc::cc; using namespace std; +using namespace boost; namespace { std::string @@ -497,10 +499,10 @@ TEST_F(CCSessionTest, remoteConfig) { const size_t qsize(session.getMsgQueue()->size()); EXPECT_TRUE(session.getMsgQueue()->get(qsize - 2)->equals(*el( "[ \"ConfigManager\", \"*\", { \"command\": [" - "\"get_module_spec\", { \"module_name\": \"Spec2\" } ] } ]"))); + "\"get_module_spec\", { \"module_name\": \"Spec2\" } ] }, -1 ]"))); EXPECT_TRUE(session.getMsgQueue()->get(qsize - 1)->equals(*el( "[ \"ConfigManager\", \"*\", { \"command\": [ \"get_config\"," - "{ \"module_name\": \"Spec2\" } ] } ]"))); + "{ \"module_name\": \"Spec2\" } ] }, -1 ]"))); EXPECT_EQ("Spec2", module_name); // Since we returned an empty local config above, the default value // for "item1", which is 1, should be used. @@ -709,13 +711,286 @@ TEST_F(CCSessionTest, doubleStartWithAddRemoteConfig) { FakeSession::DoubleRead); } -namespace { +/// \brief Test fixture for asynchronous receiving of messages. +/// +/// This is an extension to the CCSessionTest. It would be possible to add +/// the functionality to the CCSessionTest, but it is going to be used +/// only by few tests and is non-trivial, so it is placed to a separate +/// sub-class. +class AsyncReceiveCCSessionTest : public CCSessionTest { +protected: + AsyncReceiveCCSessionTest() : + mccs_(ccspecfile("spec29.spec"), session, NULL, NULL, false, false), + msg_(el("{\"result\": [0]}")), + next_flag_(0) + { + // This is just to make sure the messages get through the fake + // session. + session.subscribe("test group"); + session.subscribe("other group"); + session.subscribe("<ignored>"); + // Get rid of all unrelated stray messages + while (session.getMsgQueue()->size() > 0) { + session.getMsgQueue()->remove(0); + } + } + /// \brief Convenience function to queue a request to get a command + /// message. + ModuleCCSession::AsyncRecvRequestID + registerCommand(const string& recipient) + { + return (mccs_.groupRecvMsgAsync( + bind(&AsyncReceiveCCSessionTest::callback, this, next_flag_ ++, _1, + _2, _3), false, -1, recipient)); + } + /// \brief Convenience function to queue a request to get a reply + /// message. + ModuleCCSession::AsyncRecvRequestID + registerReply(int seq) + { + return (mccs_.groupRecvMsgAsync( + bind(&AsyncReceiveCCSessionTest::callback, this, next_flag_ ++, _1, + _2, _3), true, seq)); + } + /// \brief Check the next called callback was with this flag + void called(int flag) { + ASSERT_FALSE(called_.empty()); + EXPECT_EQ(flag, *called_.begin()); + called_.pop_front(); + } + /// \brief Checks that no more callbacks were called. + void nothingCalled() { + EXPECT_TRUE(called_.empty()); + } + /// \brief The tested session. + ModuleCCSession mccs_; + /// \brief The value of message on the last called callback. + ConstElementPtr last_msg_; + /// \brief A message that can be used + ConstElementPtr msg_; + // Shared part of the simpleCommand and similar tests. + void commandTest(const string& group) { + // Push the message inside + session.addMessage(msg_, "test group", "<unused>"); + EXPECT_TRUE(mccs_.hasQueuedMsgs()); + // Register the callback + registerCommand(group); + // But the callback should not be called yet + // (even if the message is there). + nothingCalled(); + // But when we call the checkCommand(), it should be called. + mccs_.checkCommand(); + called(0); + EXPECT_EQ(msg_, last_msg_); + // But only once + nothingCalled(); + // And the message should be eaten + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + // The callback should have been eaten as well, inserting another + // message will not invoke it again + session.addMessage(msg_, "test group", "<unused>"); + mccs_.checkCommand(); + nothingCalled(); + } + /// \brief Shared part of the simpleResponse and wildcardResponse tests. + void responseTest(int seq) { + // Push the message inside + session.addMessage(msg_, "<ignored>", "<unused>", 1); + EXPECT_TRUE(mccs_.hasQueuedMsgs()); + // Register the callback + registerReply(seq); + // But the callback should not be called yet + // (even if the message is there). + nothingCalled(); + // But when we call the checkCommand(), it should be called. + mccs_.checkCommand(); + called(0); + EXPECT_EQ(msg_, last_msg_); + // But only once + nothingCalled(); + // And the message should be eaten + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + // The callback should have been eaten as well, inserting another + // message will not invoke it again + session.addMessage(msg_, "test group", "<unused>"); + mccs_.checkCommand(); + nothingCalled(); + } + /// \brief Shared part of the noMatch* tests + void noMatchTest(int seq, int wanted_seq, bool is_reply) { + // Push the message inside + session.addMessage(msg_, "other group", "<unused>", seq); + EXPECT_TRUE(mccs_.hasQueuedMsgs()); + // Register the callback + if (is_reply) { + registerReply(wanted_seq); + } else { + registerCommand("test group"); + } + // But the callback should not be called yet + // (even if the message is there). + nothingCalled(); + // And even not now, because it does not match. + mccs_.checkCommand(); + nothingCalled(); + // And the message should be eaten by the checkCommand + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + } +private: + /// \brief The next flag to be handed out + int next_flag_; + /// \brief Flags of callbacks already called (as FIFO) + list<int> called_; + /// \brief This is the callback registered to the tested groupRecvMsgAsync + /// function. + void callback(int store_flag, const ConstElementPtr&, + const ConstElementPtr& msg, + const ModuleCCSession::AsyncRecvRequestID&) + { + called_.push_back(store_flag); + last_msg_ = msg; + } +}; + +// Test we can receive a command, without anything fancy yet +TEST_F(AsyncReceiveCCSessionTest, simpleCommand) { + commandTest("test group"); +} + +// Test we can receive a "wildcard" command - without specifying the +// group to subscribe to. Very similar to simpleCommand test. +TEST_F(AsyncReceiveCCSessionTest, wildcardCommand) { + commandTest(""); +} + +// Very similar to simpleCommand, but with a response message +TEST_F(AsyncReceiveCCSessionTest, simpleResponse) { + responseTest(1); +} + +// Matching a response message with wildcard +TEST_F(AsyncReceiveCCSessionTest, wildcardResponse) { + responseTest(-1); +} + +// Check that a wrong command message is not matched +TEST_F(AsyncReceiveCCSessionTest, noMatchCommand) { + noMatchTest(-1, -1, false); +} + +// Check that a wrong response message is not matched +TEST_F(AsyncReceiveCCSessionTest, noMatchResponse) { + noMatchTest(2, 3, true); +} + +// Check that a command will not match on a reply check and vice versa +TEST_F(AsyncReceiveCCSessionTest, noMatchResponseAgainstCommand) { + // Send a command and check it is not matched as a response + noMatchTest(-1, -1, true); +} + +TEST_F(AsyncReceiveCCSessionTest, noMatchCommandAgainstResponse) { + noMatchTest(2, -1, false); +} + +// We check for command several times before the message actually arrives. +TEST_F(AsyncReceiveCCSessionTest, delayedCallback) { + // First, register the callback + registerReply(1); + // And see it is not called, because the message is not there yet + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + for (size_t i(0); i < 100; ++ i) { + mccs_.checkCommand(); + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + nothingCalled(); + } + // Now the message finally arrives + session.addMessage(msg_, "<ignored>", "<unused>", 1); + EXPECT_TRUE(mccs_.hasQueuedMsgs()); + // And now, the callback is happily triggered. + mccs_.checkCommand(); + called(0); + EXPECT_EQ(msg_, last_msg_); + // But only once + nothingCalled(); +} + +// See that if we put multiple messages inside, and request some callbacks, +// the callbacks are called in the order of messages, not in the order they +// were registered. +TEST_F(AsyncReceiveCCSessionTest, outOfOrder) { + // First, put some messages there + session.addMessage(msg_, "<ignored>", "<unused>", 1); + session.addMessage(msg_, "test group", "<unused>"); + session.addMessage(msg_, "other group", "<unused>"); + session.addMessage(msg_, "<ignored>", "<unused>", 2); + session.addMessage(msg_, "<ignored>", "<unused>", 3); + session.addMessage(msg_, "<ignored>", "<unused>", 4); + // Now register some callbacks + registerReply(13); // Will not be called + registerCommand("other group"); // Matches 3rd message + registerReply(2); // Matches 4th message + registerCommand(""); // Matches the 2nd message + registerCommand("test group"); // Will not be called + registerReply(-1); // Matches the 1st message + registerReply(-1); // Matches the 5th message + // Process all messages there + while (mccs_.hasQueuedMsgs()) { + mccs_.checkCommand(); + } + // These are the numbers of callbacks in the order of messages + called(5); + called(3); + called(1); + called(2); + called(6); + // The last message doesn't trigger anything, so nothing more is called + nothingCalled(); +} + +// We first add, then remove the callback again and check that nothing is +// matched. +TEST_F(AsyncReceiveCCSessionTest, cancel) { + // Add the callback + ModuleCCSession::AsyncRecvRequestID request(registerReply(1)); + // Add corresponding message + session.addMessage(msg_, "<ignored>", "<unused>", 1); + EXPECT_TRUE(mccs_.hasQueuedMsgs()); + // And now, remove the callback again + mccs_.cancelAsyncRecv(request); + // And see that Nothing Happens(TM) + mccs_.checkCommand(); + EXPECT_FALSE(mccs_.hasQueuedMsgs()); + nothingCalled(); +} + +// We add multiple requests and cancel only one of them to see the rest +// is unaffected. +TEST_F(AsyncReceiveCCSessionTest, cancelSome) { + // Register few callbacks + registerReply(1); + ModuleCCSession::AsyncRecvRequestID request(registerCommand("")); + registerCommand("test group"); + // Put some messages there + session.addMessage(msg_, "test group", "<unused>"); + session.addMessage(msg_, "<ignored>", "<unused>", 1); + // Cancel the second callback. Therefore the first message will be matched + // by the third callback, not by the second. + mccs_.cancelAsyncRecv(request); + // Now, process the messages + mccs_.checkCommand(); + mccs_.checkCommand(); + // And see how they matched + called(2); + called(0); + nothingCalled(); +} + void doRelatedLoggersTest(const char* input, const char* expected) { ConstElementPtr all_conf = isc::data::Element::fromJSON(input); ConstElementPtr expected_conf = isc::data::Element::fromJSON(expected); EXPECT_EQ(*expected_conf, *isc::config::getRelatedLoggers(all_conf)); } -} // end anonymous namespace TEST(LogConfigTest, relatedLoggersTest) { // make sure logger configs for 'other' programs are ignored, diff --git a/src/lib/config/tests/fake_session.cc b/src/lib/config/tests/fake_session.cc index 177e62942c..157d4d658f 100644 --- a/src/lib/config/tests/fake_session.cc +++ b/src/lib/config/tests/fake_session.cc @@ -139,6 +139,9 @@ FakeSession::recvmsg(ConstElementPtr& env, ConstElementPtr& msg, bool nonblock, ElementPtr new_env = Element::createMap(); new_env->set("group", c_m->get(0)); new_env->set("to", c_m->get(1)); + if (c_m->get(3)->intValue() != -1) { + new_env->set("reply", c_m->get(3)); + } env = new_env; msg = c_m->get(2); to_remove = c_m; @@ -207,7 +210,7 @@ FakeSession::reply(ConstElementPtr envelope, ConstElementPtr newmsg) { bool FakeSession::hasQueuedMsgs() const { - return (false); + return (msg_queue_ && msg_queue_->size() > 0); } ConstElementPtr @@ -228,12 +231,13 @@ FakeSession::getFirstMessage(std::string& group, std::string& to) const { void FakeSession::addMessage(ConstElementPtr msg, const std::string& group, - const std::string& to) + const std::string& to, int seq) { ElementPtr m_el = Element::createList(); m_el->add(Element::create(group)); m_el->add(Element::create(to)); m_el->add(msg); + m_el->add(Element::create(seq)); if (!msg_queue_) { msg_queue_ = Element::createList(); } diff --git a/src/lib/config/tests/fake_session.h b/src/lib/config/tests/fake_session.h index 79ff174110..c91b5199e3 100644 --- a/src/lib/config/tests/fake_session.h +++ b/src/lib/config/tests/fake_session.h @@ -74,7 +74,7 @@ public: isc::data::ConstElementPtr getFirstMessage(std::string& group, std::string& to) const; void addMessage(isc::data::ConstElementPtr, const std::string& group, - const std::string& to); + const std::string& to, int seq = -1); bool haveSubscription(const std::string& group, const std::string& instance); bool haveSubscription(const isc::data::ConstElementPtr group, |