diff options
author | Stephen Morris <stephen@isc.org> | 2011-02-18 15:31:20 +0100 |
---|---|---|
committer | Stephen Morris <stephen@isc.org> | 2011-02-18 15:31:20 +0100 |
commit | 85b6fa72d68d019149b8c751d495e34bbd4246a8 (patch) | |
tree | 6b0d011675b98fa97e62588e76647294a14a8051 /src/lib | |
parent | [trac554] Update function names and prepare for TCP looping (diff) | |
download | kea-85b6fa72d68d019149b8c751d495e34bbd4246a8.tar.xz kea-85b6fa72d68d019149b8c751d495e34bbd4246a8.zip |
[trac554] Added IOFetch
IOFetch is a general upstream "fetch" class that should be able to
operate over TCP or UDP. Related changes have been made in the
associated classes. So far, only the unit tests for a UDP fetch
have been made (and passed).
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/asiolink/Makefile.am | 30 | ||||
-rw-r--r-- | src/lib/asiolink/asiolink.h | 3 | ||||
-rw-r--r-- | src/lib/asiolink/io_address.cc | 5 | ||||
-rw-r--r-- | src/lib/asiolink/io_address.h | 6 | ||||
-rw-r--r-- | src/lib/asiolink/io_asio_socket.h | 304 | ||||
-rw-r--r-- | src/lib/asiolink/io_completion_cb.h | 75 | ||||
-rw-r--r-- | src/lib/asiolink/io_endpoint.cc | 3 | ||||
-rw-r--r-- | src/lib/asiolink/io_endpoint.h | 6 | ||||
-rw-r--r-- | src/lib/asiolink/io_fetch.cc | 133 | ||||
-rw-r--r-- | src/lib/asiolink/io_fetch.h | 134 | ||||
-rw-r--r-- | src/lib/asiolink/io_message.h | 6 | ||||
-rw-r--r-- | src/lib/asiolink/io_socket.cc | 72 | ||||
-rw-r--r-- | src/lib/asiolink/io_socket.h | 94 | ||||
-rw-r--r-- | src/lib/asiolink/tcp_server.cc | 12 | ||||
-rw-r--r-- | src/lib/asiolink/tcp_socket.h | 245 | ||||
-rw-r--r-- | src/lib/asiolink/tests/Makefile.am | 13 | ||||
-rw-r--r-- | src/lib/asiolink/tests/io_fetch_unittest.cc | 189 | ||||
-rw-r--r-- | src/lib/asiolink/tests/io_socket_unittest.cc | 2 | ||||
-rw-r--r-- | src/lib/asiolink/tests/udp_socket_unittest.cc | 32 | ||||
-rw-r--r-- | src/lib/asiolink/udp_server.cc | 16 | ||||
-rw-r--r-- | src/lib/asiolink/udp_socket.cc | 131 | ||||
-rw-r--r-- | src/lib/asiolink/udp_socket.h | 163 |
22 files changed, 1078 insertions, 596 deletions
diff --git a/src/lib/asiolink/Makefile.am b/src/lib/asiolink/Makefile.am index fe4d7b08a5..4b9b8f85f8 100644 --- a/src/lib/asiolink/Makefile.am +++ b/src/lib/asiolink/Makefile.am @@ -12,26 +12,28 @@ CLEANFILES = *.gcno *.gcda # have some code fragments that would hit gcc's unused-parameter warning, # which would make the build fail with -Werror (our default setting). lib_LTLIBRARIES = libasiolink.la -libasiolink_la_SOURCES = asiolink.h -libasiolink_la_SOURCES += io_service.cc io_service.h -libasiolink_la_SOURCES += dns_service.cc dns_service.h -libasiolink_la_SOURCES += dns_server.h -libasiolink_la_SOURCES += dns_lookup.h +libasiolink_la_SOURCES = asiolink.h libasiolink_la_SOURCES += dns_answer.h -libasiolink_la_SOURCES += simple_callback.h +libasiolink_la_SOURCES += dns_lookup.h +libasiolink_la_SOURCES += dns_server.h +libasiolink_la_SOURCES += dns_service.h dns_service.cc libasiolink_la_SOURCES += interval_timer.h interval_timer.cc -libasiolink_la_SOURCES += recursive_query.h recursive_query.cc +libasiolink_la_SOURCES += io_address.h io_address.cc +libasiolink_la_SOURCES += io_endpoint.h io_endpoint.cc libasiolink_la_SOURCES += io_error.h -libasiolink_la_SOURCES += io_socket.cc io_socket.h +libasiolink_la_SOURCES += io_fetch.h io_fetch.cc libasiolink_la_SOURCES += io_message.h -libasiolink_la_SOURCES += io_address.cc io_address.h -libasiolink_la_SOURCES += io_endpoint.cc io_endpoint.h +libasiolink_la_SOURCES += io_service.h io_service.cc +libasiolink_la_SOURCES += io_socket.h io_socket.cc +libasiolink_la_SOURCES += recursive_query.h recursive_query.cc +libasiolink_la_SOURCES += simple_callback.h +libasiolink_la_SOURCES += tcp_endpoint.h +libasiolink_la_SOURCES += tcp_server.h tcp_server.cc +libasiolink_la_SOURCES += tcp_socket.h libasiolink_la_SOURCES += udp_endpoint.h -libasiolink_la_SOURCES += udp_server.h udp_server.cc -libasiolink_la_SOURCES += udp_socket.h udp_socket.cc libasiolink_la_SOURCES += udp_query.h udp_query.cc -libasiolink_la_SOURCES += tcp_endpoint.h tcp_socket.h -libasiolink_la_SOURCES += tcp_server.h tcp_server.cc +libasiolink_la_SOURCES += udp_server.h udp_server.cc +libasiolink_la_SOURCES += udp_socket.h # Note: the ordering matters: -Wno-... must follow -Wextra (defined in # B10_CXXFLAGS) libasiolink_la_CXXFLAGS = $(AM_CXXFLAGS) diff --git a/src/lib/asiolink/asiolink.h b/src/lib/asiolink/asiolink.h index 9e402e32b0..03951ae9df 100644 --- a/src/lib/asiolink/asiolink.h +++ b/src/lib/asiolink/asiolink.h @@ -34,9 +34,6 @@ #include <asiolink/io_socket.h> #include <asiolink/io_error.h> -#include <asiolink/udp_endpoint.h> -#include <asiolink/udp_socket.h> - /// \namespace asiolink /// \brief A wrapper interface for the ASIO library. /// diff --git a/src/lib/asiolink/io_address.cc b/src/lib/asiolink/io_address.cc index 990524acfb..70e837456d 100644 --- a/src/lib/asiolink/io_address.cc +++ b/src/lib/asiolink/io_address.cc @@ -20,7 +20,10 @@ #include <asio.hpp> -#include <asiolink/asiolink.h> +#include <exceptions/exceptions.h> +#include <asiolink/io_address.h> +#include <asiolink/io_error.h> + using namespace asio; using asio::ip::udp; diff --git a/src/lib/asiolink/io_address.h b/src/lib/asiolink/io_address.h index 98e6fe883a..0d2787f95f 100644 --- a/src/lib/asiolink/io_address.h +++ b/src/lib/asiolink/io_address.h @@ -12,8 +12,8 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#ifndef __IOADDRESS_H -#define __IOADDRESS_H 1 +#ifndef __IO_ADDRESS_H +#define __IO_ADDRESS_H 1 // IMPORTANT NOTE: only very few ASIO headers files can be included in // this file. In particular, asio.hpp should never be included here. @@ -120,7 +120,7 @@ private: }; } // asiolink -#endif // __IOADDRESS_H +#endif // __IO_ADDRESS_H // Local Variables: // mode: c++ diff --git a/src/lib/asiolink/io_asio_socket.h b/src/lib/asiolink/io_asio_socket.h new file mode 100644 index 0000000000..885a95cb35 --- /dev/null +++ b/src/lib/asiolink/io_asio_socket.h @@ -0,0 +1,304 @@ +// Copyright (C) 2010 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#ifndef __IO_ASIO_SOCKET_H +#define __IO_ASIO_SOCKET_H 1 + +// IMPORTANT NOTE: only very few ASIO headers files can be included in +// this file. In particular, asio.hpp should never be included here. +// See the description of the namespace below. +#include <unistd.h> // for some network system calls + +#include <functional> +#include <string> + +#include <exceptions/exceptions.h> +#include <coroutine.h> + +#include <asiolink/io_error.h> +#include <asiolink/io_socket.h> +#include <asiolink/io_completion_cb.h> + +using namespace asio; + +namespace asiolink { + +/// \brief Socket not open +/// +/// Thrown on an attempt to do read/write to a socket that is not open. +class SocketNotOpen : public IOError { +public: + SocketNotOpen(const char* file, size_t line, const char* what) : + IOError(file, line, what) {} +}; + + + +/// Forward declaration of an IOEndpoint +class IOEndpoint; + + +/// \brief I/O Socket with asynchronous operations +/// +/// This class is a wrapper for the ASIO socket classes such as +/// \c ip::tcp::socket and \c ip::udp::socket. +/// +/// This is the basic IOSocket with additional operations - open, send, receive +/// and close. Depending on how the asiolink code develops, it may be a +/// temporary class: its main use is to add the template parameter needed for +/// the derived classes UDPSocket and TCPSocket but without changing the +/// signature of the more basic IOSocket class. +/// +/// We may revisit this decision when we generalize the wrapper and more +/// modules use it. Also, at that point we may define a separate (visible) +/// derived class for testing purposes rather than providing factory methods +/// (i.e., getDummy variants below). +/// +/// TODO: Check if IOAsioSocket class is still needed +/// +/// \param C Template parameter identifying type of the callback object. + +template <typename C> +class IOAsioSocket : public IOSocket { + /// + /// \name Constructors and Destructor + /// + /// Note: The copy constructor and the assignment operator are + /// intentionally defined as private, making this class non-copyable. + //@{ +private: + IOAsioSocket(const IOAsioSocket<C>& source); + IOAsioSocket& operator=(const IOAsioSocket<C>& source); +protected: + /// \brief The default constructor. + /// + /// This is intentionally defined as \c protected as this base class + /// should never be instantiated (except as part of a derived class). + IOAsioSocket() {} +public: + /// The destructor. + virtual ~IOAsioSocket() {} + //@} + + /// \brief Return the "native" representation of the socket. + /// + /// In practice, this is the file descriptor of the socket for + /// UNIX-like systems so the current implementation simply uses + /// \c int as the type of the return value. + /// We may have to need revisit this decision later. + /// + /// In general, the application should avoid using this method; + /// it essentially discloses an implementation specific "handle" that + /// can change the internal state of the socket (consider the + /// application closes it, for example). + /// But we sometimes need to perform very low-level operations that + /// requires the native representation. Passing the file descriptor + /// to a different process is one example. + /// This method is provided as a necessary evil for such limited purposes. + /// + /// This method never throws an exception. + /// + /// \return The native representation of the socket. This is the socket + /// file descriptor for UNIX-like systems. + virtual int getNative() const = 0; + + /// \brief Return the transport protocol of the socket. + /// + /// Currently, it returns \c IPPROTO_UDP for UDP sockets, and + /// \c IPPROTO_TCP for TCP sockets. + /// + /// This method never throws an exception. + /// + /// \return IPPROTO_UDP for UDP sockets + /// \return IPPROTO_TCP for TCP sockets + virtual int getProtocol() const = 0; + + /// \brief Open AsioSocket + /// + /// A call that is a no-op on UDP sockets, this opens a connection to the + /// system identified by the given endpoint. + /// + /// \param endpoint Pointer to the endpoint object. This is ignored for + /// a UDP socket (the target is specified in the send call), but should + /// be of type TCPEndpoint for a TCP connection. + /// \param callback I/O Completion callback, called when the connect has + /// completed. In the stackless coroutines model, this will be the + /// method containing the call to this function, allowing the operation to + /// resume after the socket open has completed. + /// + /// \return true if an asynchronous operation was started and the caller + /// should yield and wait for completion, false if not. (i.e. The UDP + /// derived class will return false, the TCP class will return true). This + /// optimisation avoids the overhead required to post a callback to the + /// I/O Service queue where nothing is done. + virtual bool open(const IOEndpoint* endpoint, C& callback) = 0; + + /// \brief Send Asynchronously + /// + /// This corresponds to async_send_to() for UDP sockets and async_send() + /// for TCP. In both cases an endpoint argument is supplied indicating the + /// target of the send - this is ignored for TCP. + /// + /// \param data Data to send + /// \param length Length of data to send + /// \param endpoint Target of the send + /// \param callback Callback object. + virtual void asyncSend(const void* data, size_t length, + const IOEndpoint* endpoint, C& callback) = 0; + + /// \brief Receive Asynchronously + /// + /// This correstponds to async_receive_from() for UDP sockets and + /// async_receive() for TCP. In both cases, an endpoint argument is + /// supplied to receive the source of the communication. For TCP it will + /// be filled in with details of the connection. + /// + /// \param data Buffer to receive incoming message + /// \param length Length of the data buffer + /// \param cumulative Amount of data that should already be in the buffer. + /// \param endpoint Source of the communication + /// \param callback Callback object + virtual void asyncReceive(void* data, size_t length, size_t cumulative, + IOEndpoint* endpoint, C& callback) = 0; + + /// \brief Checks if the data received is complete. + /// + /// This applies to TCP receives, where the data is a byte stream and a + /// receive is not guaranteed to receive the entire message. DNS messages + /// over TCP are prefixed by a two-byte count field. This method takes the + /// amount received so far and the amount received in this I/O and checks + /// if the message is complete, returning the appropriate indication. As + /// a side-effect, it also updates the amount received. + /// + /// For a UDP receive, all the data is received in one I/O, so this is + /// effectively a no-op (although it does update the amount received). + /// + /// \param data Data buffer containing data to date + /// \param length Amount of data received in last asynchronous I/O + /// \param cumulative On input, amount of data received before the last + /// I/O. On output, the total amount of data received to date. + /// + /// \return true if the receive is complete, false if another receive is + /// needed. + virtual bool receiveComplete(void* data, size_t length, + size_t& cumulative) = 0; + + /// \brief Cancel I/O On AsioSocket + virtual void cancel() = 0; + + /// \brief Close socket + virtual void close() = 0; +}; + + +#include "io_socket.h" + +/// \brief The \c DummyAsioSocket class is a concrete derived class of +/// \c IOAsioSocket that is not associated with any real socket. +/// +/// This main purpose of this class is tests, where it may be desirable to +/// instantiate an \c IOAsioSocket object without involving system resource +/// allocation such as real network sockets. +/// +/// \param C Template parameter identifying type of the callback object. + +template <typename C> +class DummyAsioSocket : public IOAsioSocket<C> { +private: + DummyAsioSocket(const DummyAsioSocket<C>& source); + DummyAsioSocket& operator=(const DummyAsioSocket<C>& source); +public: + /// \brief Constructor from the protocol number. + /// + /// The protocol must validly identify a standard network protocol. + /// For example, to specify TCP \c protocol must be \c IPPROTO_TCP. + /// + /// \param protocol The network protocol number for the socket. + DummyAsioSocket(const int protocol) : protocol_(protocol) {} + + /// \brief A dummy derived method of \c IOAsioSocket::getNative(). + /// + /// \return Always returns -1 as the object is not associated with a real + /// (native) socket. + virtual int getNative() const { return (-1); } + + /// \brief A dummy derived method of \c IOAsioSocket::getProtocol(). + /// + /// \return Protocol socket was created with + virtual int getProtocol() const { return (protocol_); } + + + /// \brief Open AsioSocket + /// + /// A call that is a no-op on UDP sockets, this opens a connection to the + /// system identified by the given endpoint. + /// + /// \param endpoint Unused + /// \param callback Unused. + ///false indicating that the operation completed synchronously. + virtual bool open(const IOEndpoint*, C&) { + return (false); + } + + /// \brief Send Asynchronously + /// + /// Must be supplied as it is abstract in the base class. + /// + /// \param data Unused + /// \param length Unused + /// \param endpoint Unused + /// \param callback Unused + virtual void asyncSend(const void*, size_t, const IOEndpoint*, C&) { + } + + /// \brief Receive Asynchronously + /// + /// Must be supplied as it is abstract in the base class. + /// + /// \param data Unused + /// \param length Unused + /// \param cumulative Unused + /// \param endpoint Unused + /// \param callback Unused + virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, C&) { } + /// \brief Checks if the data received is complete. + /// + /// \param data Unused + /// \param length Unused + /// \param cumulative Unused + /// + /// \return Always true + virtual bool receiveComplete(void*, size_t, size_t&) { + return (true); + } + + /// \brief Cancel I/O On AsioSocket + /// + /// Must be supplied as it is abstract in the base class. + virtual void cancel() { + } + + /// \brief Close socket + /// + /// Must be supplied as it is abstract in the base class. + virtual void close() { + } + +private: + const int protocol_; +}; + +} // namespace asiolink + +#endif // __IO_ASIO_SOCKET_H diff --git a/src/lib/asiolink/io_completion_cb.h b/src/lib/asiolink/io_completion_cb.h index c6943af5b1..27b7407fa1 100644 --- a/src/lib/asiolink/io_completion_cb.h +++ b/src/lib/asiolink/io_completion_cb.h @@ -15,74 +15,33 @@ #ifndef __IO_COMPLETION_CB_H #define __IO_COMPLETION_CB_H +#include <iostream> + #include <asio/error.hpp> #include <asio/error_code.hpp> -#include <coroutine.h> namespace asiolink { /// \brief Asynchronous I/O Completion Callback /// -/// The stackless coroutine code requires that there be an "entry function" -/// containing the coroutine macros. When the coroutine yields, its state is -/// stored and when the "entry function" is called again, it jumps to the -/// line when processing left off last time. In BIND-10, that "entry function" -/// is the Boost asynchronous I/O completion callback - in essence operator(). -/// -/// This class solves the problem of circularity in class definitions. In -/// BIND10, classes such as IOFetch contain the coroutine code. These include -/// objects of classes such as IOSocket, whose signature has to include the -/// callback object - IOFetch. By abstracting the I/O completion callback into -/// this class, that circularity is broken. -/// -/// One more point: the asynchronous I/O functions take the callback object by -/// reference. But if a derived class object is passed as a reference to its -/// base class, "class slicing" takes place - the derived part of the class is -/// lost and only the base class functionality remains. By storing a pointer -/// to the true object and having the base class method call the derived class -/// method through that, the behaviour of class inheritance is restored. In -/// other words: -/// \code -/// class derived: public class base { -/// : -/// }; -/// derived drv; -/// -/// // Call with pointer to base class -/// void f(base* b, asio::error_code& ec, size_t length) { -/// b->operator()(ec, length); -/// } -/// -/// // Call with reference to base class -/// void g(base& b, asio::error_code& ec, size_t length) { -/// b.operator()(ec, length); -/// } -/// -/// void function xyz(derived *d, asio::error_code& ec, size_t length) { -/// f(d, ec, length); // Calls derived::operator() -/// g(*d, ec, length); // Also calls derived::operator() -/// } -/// \endcode - -class IOCompletionCallback : public coroutine { +/// The two socket classes (UDPSocket and TCPSocket) require that the I/O +/// completion callback function have an operator() method with the appropriate +/// signature. The classes are templates, any class with that method and +/// signature can be passed as the callback object - there is no need for a +/// base class defining the interface. However, some users of the socket +/// classes do not use the asynchronous I/O operations, yet have to supply a +/// template parameter. This is the reason for this class - it is the dummy +/// template parameter. + +class IOCompletionCallback { public: - /// \brief Constructor - IOCompletionCallback() : self_(this) - {} - - /// \brief Virtual Destructor - virtual ~IOCompletionCallback() + /// \brief Asynchronous I/O callback method + /// + /// \param error Unused + /// \param length Unused + void operator()(asio::error_code, size_t) {} - - /// \brief Callback Method - virtual void operator()(asio::error_code ec = asio::error_code(), - size_t length = 0) { - (*self_)(ec, length); - } - -private: - IOCompletionCallback* self_; ///< Pointer to real object }; } // namespace asiolink diff --git a/src/lib/asiolink/io_endpoint.cc b/src/lib/asiolink/io_endpoint.cc index 86e06070da..bf79f61868 100644 --- a/src/lib/asiolink/io_endpoint.cc +++ b/src/lib/asiolink/io_endpoint.cc @@ -20,7 +20,8 @@ #include <asio.hpp> -#include <asiolink/asiolink.h> +#include <asiolink/io_address.h> +#include <asiolink/io_error.h> #include <asiolink/tcp_endpoint.h> #include <asiolink/udp_endpoint.h> diff --git a/src/lib/asiolink/io_endpoint.h b/src/lib/asiolink/io_endpoint.h index 37f9087086..62b9e47942 100644 --- a/src/lib/asiolink/io_endpoint.h +++ b/src/lib/asiolink/io_endpoint.h @@ -12,8 +12,8 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#ifndef __IOENDPOINT_H -#define __IOENDPOINT_H 1 +#ifndef __IO_ENDPOINT_H +#define __IO_ENDPOINT_H 1 // IMPORTANT NOTE: only very few ASIO headers files can be included in // this file. In particular, asio.hpp should never be included here. @@ -115,7 +115,7 @@ public: }; } // asiolink -#endif // __IOENDPOINT_H +#endif // __IO_ENDPOINT_H // Local Variables: // mode: c++ diff --git a/src/lib/asiolink/io_fetch.cc b/src/lib/asiolink/io_fetch.cc index 5ab64794ec..1a5c04d562 100644 --- a/src/lib/asiolink/io_fetch.cc +++ b/src/lib/asiolink/io_fetch.cc @@ -18,95 +18,38 @@ #include <sys/socket.h> #include <netinet/in.h> -#include <asio.hpp> -#include <asio/deadline_timer.hpp> -#include <asio/ip/address.hpp> - -#include <boost/shared_ptr.hpp> #include <boost/bind.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp> -#include <dns/buffer.h> #include <dns/message.h> #include <dns/messagerenderer.h> -#include <log/dummylog.h> #include <dns/opcode.h> #include <dns/rcode.h> +#include <log/dummylog.h> -#include <asiolink.h> -#include <coroutine.h> -#include <internal/udpdns.h> -#include <internal/tcpdns.h> -#include <internal/iofetch.h> +#include <asio.hpp> +#include <asiolink/io_fetch.h> using namespace asio; -using asio::ip::udp; -using asio::ip::tcp; -using isc::log::dlog; - -using namespace std; using namespace isc::dns; +using namespace isc::log; +using namespace std; namespace asiolink { -// Constructor for the IOFetchData member - -/// \brief Constructor -/// -/// Just fills in the data members of the IOFetchData structure -/// -/// \param io_service I/O Service object to handle the asynchronous -/// operations. -/// \param question DNS question to send to the upstream server. -/// \param address IP address of upstream server -/// \param port Port to use for the query -/// \param buffer Output buffer into which the response (in wire format) -/// is written (if a response is received). -/// \param callback Callback object containing the callback to be called -/// when we terminate. The caller is responsible for managing this -/// object and deleting it if necessary. -/// \param timeout Timeout for the fetch (in ms). The default value of -/// -1 indicates no timeout. -/// \param protocol Protocol to use for the fetch. The default is UDP - -IOFetch::IOFetchData::IOFetchData(IOService& io_service, - const isc::dns::Question& query, const IOAddress& address, uint16_t port, - isc::dns::OutputBufferPtr buff, Callback* cb, int wait, int protocol) - : - socket((protocol == IPPROTO_UDP) ? - static_cast<IOSocket*>(new UDPSocket(io_service, address)) : - static_cast<IOSocket*>(new TCPSocket(io_service, address)) - ), - remote((protocol == IPPROTO_UDP) ? - static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) : - static_cast<IOEndpoint*>(new TCPEndpoint(address, port)) - ), - question(query), - buffer(buff), - msgbuf(new OutputBuffer(512)), // TODO: Why this number? - data(new char[IOFetch::MAX_LENGTH]), - callback(cb), - rcv_amount(0), - stopped(false), - timer(io_service.get_io_service()), - timeout(wait) -{ -} - - - /// IOFetch Constructor - just initialize the private data -IOFetch::IOFetch(IOService& io_service, const Question& question, - const IOAddress& address, uint16_t port, OutputBufferPtr buffer, - Callback *callback, int timeout, int protocol) : - data_(new IOFetch::IOFetchData(io_service, question, address, port, - buffer, callback, timeout, protocol) - ) + +IOFetch::IOFetch(int protocol, IOService& service, + const isc::dns::Question& question, const IOAddress& address, uint16_t port, + isc::dns::OutputBufferPtr& buff, Callback* cb, int wait) + : + data_(new IOFetch::IOFetchData(protocol, service, question, address, + port, buff, cb, wait)) { } /// The function operator is implemented with the "stackless coroutine" /// pattern; see internal/coroutine.h for details. + void IOFetch::operator()(error_code ec, size_t length) { if (ec || data_->stopped) { @@ -114,6 +57,7 @@ IOFetch::operator()(error_code ec, size_t length) { } CORO_REENTER (this) { + /// Generate the upstream query and render it to wire format /// This is done in a different scope to allow inline variable /// declarations. @@ -130,7 +74,7 @@ IOFetch::operator()(error_code ec, size_t length) { msg.toWire(renderer); // As this is a new fetch, clear the amount of data received - data_->rcv_amount = 0; + data_->cumulative = 0; dlog("Sending " + msg.toText() + " to " + data_->remote->getAddress().toText()); @@ -148,22 +92,30 @@ IOFetch::operator()(error_code ec, size_t length) { // Open a connection to the target system. For speed, if the operation // was completed synchronously (i.e. UDP operation) we bypass the yield. - bool asynch = data_->socket->open(data->remote.get(), *this); - if (asynch) { + if (data_->socket->open(data_->remote.get(), *this)) { CORO_YIELD; } - // Begin an asynchronous send, and then yield. When the + // Begin an asynchronous send, and then yield. When the send completes // send completes, we will resume immediately after this point. - CORO_YIELD data_->socket->async_send(data_->msgbuf->getData(), + CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(), data_->msgbuf->getLength(), data_->remote.get(), *this); - /// Begin an asynchronous receive, and yield. When the receive - /// completes, we will resume immediately after this point. - CORO_YIELD data_->socket->async_receive(data_->data.get(), - static_cast<size_t>(MAX_LENGTH), data_->remote.get(), *this); - - // The message is not rendered yet, so we can't print it easilly + // Now receive the response. Since TCP may not receive the entire + // message in one operation, we need to loop until we have received + // it. (This can't be done within the asyncReceive() method because + // each I/O operation will be done asynchronously and between each one + // we need to yield ... and we *really* don't want to set up another + // coroutine within that method.) So after each receive (and yield), + // we check if the operation is complete and if not, loop to read again. + do { + CORO_YIELD data_->socket->asyncReceive(data_->data.get(), + static_cast<size_t>(MAX_LENGTH), data_->cumulative, + data_->remote.get(), *this); + } while (!data_->socket->receiveComplete(data_->data.get(), length, + data_->cumulative)); + + // The message is not rendered yet, so we can't print it easily dlog("Received response from " + data_->remote->getAddress().toText()); /// Copy the answer into the response buffer. (TODO: If the @@ -188,6 +140,7 @@ IOFetch::operator()(error_code ec, size_t length) { // As the function may be entered multiple times as things wind down, the // stopped_ flag checks if stop() has already been called. If it has, // subsequent calls are no-ops. + void IOFetch::stop(Result result) { if (!data_->stopped) { @@ -203,15 +156,23 @@ IOFetch::stop(Result result) { default: ; } - data_->stopped = true; - data_->socket->cancel(); // Cancel outstanding I/O - data_->socket->close(); // ... and close the socket - data_->timer.cancel(); // Cancel timeout timer + // Stop requested, cancel and I/O's on the socket and shut it down, + // and cancel the timer. + data_->socket->cancel(); + data_->socket->close(); + + data_->timer.cancel(); + + // Execute the I/O completion callback (if present). if (data_->callback) { - (*(data_->callback))(result); // Call callback + (*(data_->callback))(result); } + + // Mark that stop() has now been called. + data_->stopped = true; } } } // namespace asiolink + diff --git a/src/lib/asiolink/io_fetch.h b/src/lib/asiolink/io_fetch.h index 00f276c0cc..69af83014e 100644 --- a/src/lib/asiolink/io_fetch.h +++ b/src/lib/asiolink/io_fetch.h @@ -12,40 +12,44 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#ifndef __IOFETCH_H -#define __IOFETCH_H 1 - -#include <netinet/in.h> +#ifndef __IO_FETCH_H +#define __IO_FETCH_H 1 #include <config.h> -#include <asio.hpp> +#include <netinet/in.h> +#include <sys/socket.h> +#include <unistd.h> // for some IPC/network system calls + #include <boost/shared_array.hpp> #include <boost/shared_ptr.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> #include <asio/deadline_timer.hpp> +#include <coroutine.h> + #include <dns/buffer.h> #include <dns/question.h> -#include <asiolink/asiolink.h> -#include <asiolink/ioaddress.h> -#include <asiolink/iocompletioncb.h> -#include <asiolink/iocompletioncb.h> - - -#include <asiolink/iosocket.h> -#include <asiolink/ioendpoint.h> -#include <coroutine.h> - +#include <asiolink/io_asio_socket.h> +#include <asiolink/io_endpoint.h> +#include <asiolink/io_service.h> +#include <asiolink/tcp_socket.h> +#include <asiolink/tcp_endpoint.h> +#include <asiolink/udp_socket.h> +#include <asiolink/udp_endpoint.h> namespace asiolink { + /// \brief Upstream Fetch Processing /// /// IOFetch is the class used to send upstream fetches and to handle responses. -/// It is more or less transport-agnostic, although the -class IOFetch : public IOCompletionCallback { +/// +/// \param E Endpoint type to use. + +class IOFetch : public coroutine { public: /// \brief Result of Upstream Fetch @@ -56,8 +60,10 @@ public: enum Result { SUCCESS = 0, ///< Success, fetch completed TIME_OUT, ///< Failure, fetch timed out - STOPPED ///< Control code, fetch has been stopped + STOPPED, ///< Control code, fetch has been stopped + NOTSET ///< For testing, indicates value not set }; + // The next enum is a "trick" to allow constants to be defined in a class // declaration. @@ -65,8 +71,10 @@ public: enum { MAX_LENGTH = 4096 ///< Maximum size of receive buffer }; + /// \brief I/O Fetch Callback /// + /// TODO: change documentation /// Callback object for when the fetch itself has completed. Note that this /// is different to the IOCompletionCallback; that is used to signal the /// completion of an asynchronous I/O call. The IOFetch::Callback is called @@ -94,9 +102,9 @@ public: /// /// The data for IOFetch is held in a separate struct pointed to by a /// shared_ptr object. This is because the IOFetch object will be copied - /// often (it is used as a coroutine and passed as callback to many async_*() - /// functions) and we want keep the same data). Organising the data this - /// way keeps copying to a minimum. + /// often (it is used as a coroutine and passed as callback to many + /// async_*() functions) and we want keep the same data). Organising the + /// data in this way keeps copying to a minimum. struct IOFetchData { // The next two members are shared pointers to a base class because what @@ -104,15 +112,15 @@ public: // TCP, which is not known until construction of the IOFetch. Use of // a shared pointer here is merely to ensure deletion when the data // object is deleted. - boost::shared_ptr<IOSocket> socket; ///< Socket to use for I/O + boost::shared_ptr<IOAsioSocket<IOFetch> > socket; + ///< Socket to use for I/O boost::shared_ptr<IOEndpoint> remote; ///< Where the fetch was sent - isc::dns::Question question; ///< Question to be asked + isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question isc::dns::OutputBufferPtr buffer; ///< Received data held here - isc::dns::OutputBufferPtr msgbuf; ///< ... and here - boost::shared_array<char> data; ///< Temporary array for the data - Callback* callback; ///< Called on I/O Completion - size_t rcv_amount; ///< Received amount + boost::shared_array<char> data; ///< Temporary array for data + IOFetch::Callback* callback; ///< Called on I/O Completion + size_t cumulative; ///< Cumulative received amount bool stopped; ///< Have we stopped running? asio::deadline_timer timer; ///< Timer to measure timeouts int timeout; ///< Timeout in ms @@ -121,7 +129,8 @@ public: /// /// Just fills in the data members of the IOFetchData structure /// - /// \param io_service I/O Service object to handle the asynchronous + /// \param protocol either IPPROTO_UDP or IPPROTO_TCP + /// \param service I/O Service object to handle the asynchronous /// operations. /// \param query DNS question to send to the upstream server. /// \param address IP address of upstream server @@ -131,38 +140,60 @@ public: /// \param cb Callback object containing the callback to be called /// when we terminate. The caller is responsible for managing this /// object and deleting it if necessary. - /// \param wait Timeout for the fetch (in ms). The default value of - /// -1 indicates no timeout. - /// \param protocol Protocol to use for the fetch. The default is UDP - - IOFetchData(IOService& io_service, const isc::dns::Question& query, - const IOAddress& address, uint16_t port, - isc::dns::OutputBufferPtr buff, Callback* cb, int wait = -1, - int protocol = IPPROTO_UDP); + /// \param wait Timeout for the fetch (in ms). + IOFetchData(int protocol, IOService& service, + const isc::dns::Question& query, const IOAddress& address, + uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb, + int wait) + : + socket((protocol == IPPROTO_UDP) ? + static_cast<IOAsioSocket<IOFetch>*>( + new UDPSocket<IOFetch>(service)) : + static_cast<IOAsioSocket<IOFetch>*>( + new TCPSocket<IOFetch>(service)) + ), + remote((protocol == IPPROTO_UDP) ? + static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) : + static_cast<IOEndpoint*>(new TCPEndpoint(address, port)) + ), + question(query), + msgbuf(new isc::dns::OutputBuffer(512)), + buffer(buff), + data(new char[IOFetch::MAX_LENGTH]), + callback(cb), + cumulative(0), + stopped(false), + timer(service.get_io_service()), + timeout(wait) + {} }; /// \brief Constructor. /// /// Creates the object that will handle the upstream fetch. /// - /// \param io_service I/O Service object to handle the asynchronous + /// TODO: Need to randomise the source port + /// + /// \param protocol Fetch protocol, either IPPROTO_UDP or IPPROTO_TCP + /// \param service I/O Service object to handle the asynchronous /// operations. /// \param question DNS question to send to the upstream server. - /// \param address IP address of upstream server - /// \param port Port to use for the query - /// \param buffer Output buffer into which the response (in wire format) + /// \param buff Output buffer into which the response (in wire format) /// is written (if a response is received). - /// \param callback Callback object containing the callback to be called + /// \param cb Callback object containing the callback to be called /// when we terminate. The caller is responsible for managing this /// object and deleting it if necessary. - /// \param timeout Timeout for the fetch (in ms). The default value of + /// \param address IP address of upstream server + /// \param port Port to which to connect on the upstream server + /// (default = 53) + /// \param wait Timeout for the fetch (in ms). The default value of /// -1 indicates no timeout. - /// \param protocol Protocol to use for the fetch. The default is UDP - IOFetch(IOService& io_service, const isc::dns::Question& question, - const IOAddress& address, uint16_t port, - isc::dns::OutputBufferPtr buffer, Callback* callback, - int timeout = -1, int protocol = IPPROTO_UDP); + IOFetch(int protocol, IOService& service, + const isc::dns::Question& question, const IOAddress& address, + uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb, + int wait = -1); + // The default constructor and copy constructor are correct for this method. /// \brief Coroutine entry point /// @@ -174,7 +205,6 @@ public: void operator()(asio::error_code ec = asio::error_code(), size_t length = 0); - /// \brief Terminate query /// /// This method can be called at any point. It terminates the current @@ -184,10 +214,10 @@ public: void stop(Result reason = STOPPED); private: - boost::shared_ptr<IOFetchData> data_; ///< Private data -}; + boost::shared_ptr<IOFetchData> data_; ///< Private data -} +}; +} // namespace asiolink -#endif // __IOFETCH_H +#endif // __IO_FETCH_H diff --git a/src/lib/asiolink/io_message.h b/src/lib/asiolink/io_message.h index 5ea99141d8..532f4492d9 100644 --- a/src/lib/asiolink/io_message.h +++ b/src/lib/asiolink/io_message.h @@ -12,8 +12,8 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#ifndef __IOMESSAGE_H -#define __IOMESSAGE_H 1 +#ifndef __IO_MESSAGE_H +#define __IO_MESSAGE_H 1 // IMPORTANT NOTE: only very few ASIO headers files can be included in // this file. In particular, asio.hpp should never be included here. @@ -97,7 +97,7 @@ private: } // asiolink -#endif // __IOMESSAGE_H +#endif // __IO_MESSAGE_H // Local Variables: // mode: c++ diff --git a/src/lib/asiolink/io_socket.cc b/src/lib/asiolink/io_socket.cc index 11b0194e1e..fb325e9172 100644 --- a/src/lib/asiolink/io_socket.cc +++ b/src/lib/asiolink/io_socket.cc @@ -14,10 +14,9 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#include <asio.hpp> - #include "io_socket.h" +#include <asio.hpp> using namespace asio; @@ -44,76 +43,11 @@ public: /// \brief A dummy derived method of \c IOSocket::getNative(). /// - /// \return Always returns -1 as the object is not associated with a real - /// (native) socket. + /// This version of method always returns -1 as the object is not + /// associated with a real (native) socket. virtual int getNative() const { return (-1); } - /// \brief A dummy derived method of \c IOSocket::getProtocol(). - /// - /// \return Protocol socket was created with virtual int getProtocol() const { return (protocol_); } - - - /// \brief Open Socket - /// - /// A call that is a no-op on UDP sockets, this opens a connection to the - /// system identified by the given endpoint. - /// - /// \param endpoint Unused - /// \param callback Unused. - ///false indicating that the operation completed synchronously. - virtual bool open(const IOEndpoint*, IOCompletionCallback&) { - return (false); - } - - /// \brief Send Asynchronously - /// - /// Must be supplied as it is abstract in the base class. - /// - /// \param data Unused - /// \param length Unused - /// \param endpoint Unused - /// \param callback Unused - virtual void asyncSend(const void*, size_t, const IOEndpoint*, - IOCompletionCallback&) { - } - - /// \brief Receive Asynchronously - /// - /// Must be supplied as it is abstract in the base class. - /// - /// \param data Unused - /// \param length Unused - /// \param cumulative Unused - /// \param endpoint Unused - /// \param callback Unused - virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, - IOCompletionCallback&) { - } - - /// \brief Checks if the data received is complete. - /// - /// \param data Unused - /// \param length Unused - /// \param cumulative Unused - /// - /// \return Always true - virtual bool receiveComplete(void*, size_t, size_t&) { - return (true); - } - - /// \brief Cancel I/O On Socket - /// - /// Must be supplied as it is abstract in the base class. - virtual void cancel() { - } - - /// \brief Close socket - /// - /// Must be supplied as it is abstract in the base class. - virtual void close() { - } - private: const int protocol_; }; diff --git a/src/lib/asiolink/io_socket.h b/src/lib/asiolink/io_socket.h index 4fb68f8af0..bebc8b6bf8 100644 --- a/src/lib/asiolink/io_socket.h +++ b/src/lib/asiolink/io_socket.h @@ -12,8 +12,8 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -#ifndef __IOSOCKET_H -#define __IOSOCKET_H 1 +#ifndef __IO_SOCKET_H +#define __IO_SOCKET_H 1 // IMPORTANT NOTE: only very few ASIO headers files can be included in // this file. In particular, asio.hpp should never be included here. @@ -24,16 +24,9 @@ #include <string> #include <exceptions/exceptions.h> -#include <coroutine.h> - -#include <asiolink/io_completion_cb.h> namespace asiolink { -/// Forward declaration of an IOEndpoint -class IOEndpoint; - - /// \brief The \c IOSocket class is an abstract base class to represent /// various types of network sockets. /// @@ -102,82 +95,6 @@ public: /// \return IPPROTO_TCP for TCP sockets virtual int getProtocol() const = 0; - /// \brief Open Socket - /// - /// A call that is a no-op on UDP sockets, this opens a connection to the - /// system identified by the given endpoint. - /// - /// \param endpoint Pointer to the endpoint object. This is ignored for - /// a UDP socket (the target is specified in the send call), but should - /// be of type TCPEndpoint for a TCP connection. - /// \param callback I/O Completion callback, called when the connect has - /// completed. In the stackless coroutines model, this will be the - /// method containing the call to this function, allowing the operation to - /// resume after the socket open has completed. - /// - /// \return true if an asynchronous operation was started and the caller - /// should yield and wait for completion, false if not. (i.e. The UDP - /// derived class will return false, the TCP class will return true). This - /// optimisation avoids the overhead required to post a callback to the - /// I/O Service queue where nothing is done. - virtual bool open(const IOEndpoint* endpoint, - IOCompletionCallback& callback) = 0; - - /// \brief Send Asynchronously - /// - /// This corresponds to async_send_to() for UDP sockets and async_send() - /// for TCP. In both cases an endpoint argument is supplied indicating the - /// target of the send - this is ignored for TCP. - /// - /// \param data Data to send - /// \param length Length of data to send - /// \param endpoint Target of the send - /// \param callback Callback object. - virtual void asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, IOCompletionCallback& callback) = 0; - - /// \brief Receive Asynchronously - /// - /// This correstponds to async_receive_from() for UDP sockets and - /// async_receive() for TCP. In both cases, an endpoint argument is - /// supplied to receive the source of the communication. For TCP it will - /// be filled in with details of the connection. - /// - /// \param data Buffer to receive incoming message - /// \param length Length of the data buffer - /// \param cumulative Amount of data that should already be in the buffer. - /// \param endpoint Source of the communication - /// \param callback Callback object - virtual void asyncReceive(void* data, size_t length, size_t cumulative, - IOEndpoint* endpoint, IOCompletionCallback& callback) = 0; - - /// \brief Checks if the data received is complete. - /// - /// This applies to TCP receives, where the data is a byte stream and a - /// receive is not guaranteed to receive the entire message. DNS messages - /// over TCP are prefixed by a two-byte count field. This method takes the - /// amount received so far and the amount received in this I/O and checks - /// if the message is complete, returning the appropriate indication. As - /// a side-effect, it also updates the amount received. - /// - /// For a UDP receive, all the data is received in one I/O, so this is - /// effectively a no-op (although it does update the amount received). - /// - /// \param data Data buffer containing data to date - /// \param length Amount of data received in last asynchronous I/O - /// \param cumulative On input, amount of data received before the last - /// I/O. On output, the total amount of data received to date. - /// - /// \return true if the receive is complete, false if another receive is - /// needed. - virtual bool receiveComplete(void* data, size_t length, size_t& cumulative) = 0; - - /// \brief Cancel I/O On Socket - virtual void cancel() = 0; - - /// \brief Close socket - virtual void close() = 0; - /// \brief Return a non-usable "dummy" UDP socket for testing. /// /// This is a class method that returns a "mock" of UDP socket. @@ -202,9 +119,6 @@ public: static IOSocket& getDummyTCPSocket(); }; -} // asiolink -#endif // __IOSOCKET_H +} // namespace asiolink -// Local Variables: -// mode: c++ -// End: +#endif // __IO_SOCKET_H diff --git a/src/lib/asiolink/tcp_server.cc b/src/lib/asiolink/tcp_server.cc index 3928bc1e92..768f6e8860 100644 --- a/src/lib/asiolink/tcp_server.cc +++ b/src/lib/asiolink/tcp_server.cc @@ -20,9 +20,9 @@ #include <log/dummylog.h> +#include <asiolink/io_completion_cb.h> #include <asiolink/tcp_endpoint.h> #include <asiolink/tcp_socket.h> - #include <asiolink/tcp_server.h> @@ -115,7 +115,15 @@ TCPServer::operator()(error_code ec, size_t length) { // that would quickly generate an IOMessage object without // all these calls to "new".) peer_.reset(new TCPEndpoint(socket_->remote_endpoint())); - iosock_.reset(new TCPSocket(*socket_)); + + // The TCP socket class has been extended with asynchronous functions + // and takes as a template parameter a completion callback class. As + // TCPServer does not use these extended functions (only those defined + // in the IOSocket base class) - but needs a TCPSocket to get hold of + // the underlying Boost TCP socket - use "IOCompletionCallback" - + // a basic callback class: it is not used but provides the appropriate + // signature. + iosock_.reset(new TCPSocket<IOCompletionCallback>(*socket_)); io_message_.reset(new IOMessage(data_.get(), length, *iosock_, *peer_)); bytes_ = length; diff --git a/src/lib/asiolink/tcp_socket.h b/src/lib/asiolink/tcp_socket.h index abcc3d8c3d..c45501ec7d 100644 --- a/src/lib/asiolink/tcp_socket.h +++ b/src/lib/asiolink/tcp_socket.h @@ -19,55 +19,78 @@ #error "asio.hpp must be included before including this, see asiolink.h as to why" #endif -#include <asiolink/io_socket.h> +#include <log/dummylog.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <unistd.h> // for some IPC/network system calls + +#include <iostream> +#include <cstddef> + +#include <config.h> + +#include <asiolink/io_asio_socket.h> +#include <asiolink/io_endpoint.h> +#include <asiolink/io_service.h> +#include <asiolink/tcp_endpoint.h> namespace asiolink { /// \brief The \c TCPSocket class is a concrete derived class of /// \c IOSocket that represents a TCP socket. /// -/// In the current implementation, an object of this class is always -/// instantiated within the wrapper routines. Applications are expected to -/// get access to the object via the abstract base class, \c IOSocket. -/// This design may be changed when we generalize the wrapper interface. -class TCPSocket : public IOSocket { +/// Other notes about \c TCPSocket applies to this class, too. +/// +/// \param C Callback type +template <typename C> +class TCPSocket : public IOAsioSocket<C> { private: - TCPSocket(const TCPSocket& source); - TCPSocket& operator=(const TCPSocket& source); + /// \brief Class is non-copyable + TCPSocket(const TCPSocket&); + TCPSocket& operator=(const TCPSocket&); + public: + enum { + MAX_SIZE = 4096 // Send and receive size + }; + /// \brief Constructor from an ASIO TCP socket. /// - /// \param socket The ASIO representation of the TCP socket. - TCPSocket(asio::ip::tcp::socket& socket) : socket_(socket) {} + /// \param socket The ASIO representation of the TCP socket. It + /// is assumed that the caller will open and close the socket, so + /// these operations are a no-op for that socket. + TCPSocket(asio::ip::tcp::socket& socket); + + /// \brief Constructor + /// + /// Used when the TCPSocket is being asked to manage its own internal + /// socket. It is assumed that open() and close() will not be used. + /// + /// \param service I/O Service object used to manage the socket. + TCPSocket(IOService& service); + + /// \brief Destructor + virtual ~TCPSocket(); - int getNative() const { return (socket_.native()); } - int getProtocol() const { return (IPPROTO_TCP); } + virtual int getNative() const { return (socket_.native()); } + virtual int getProtocol() const { return (IPPROTO_TCP); } /// \brief Open Socket /// - /// A call that is a no-op on UDP sockets, this opens a connection to the - /// system identified by the given endpoint. + /// Opens the TCP socket. In the model for transport-layer agnostic I/O, + /// an "open" operation includes a connection to the remote end (which + /// may take time). This does not happen for TCP, so the method returns + /// "false" to indicate that the operation completed synchronously. /// - /// \param endpoint Pointer to the endpoint object. This is ignored for - /// a UDP socket (the target is specified in the send call), but should - /// be of type TCPEndpoint for a TCP connection. - /// \param callback I/O Completion callback, called when the connect has - /// completed. In the stackless coroutines model, this will be the - /// method containing the call to this function, allowing the operation to - /// resume after the socket open has completed. + /// \param endpoint Endpoint to which the socket will connect to. + /// \param callback Unused. /// - /// \return true if an asynchronous operation was started and the caller - /// should yield and wait for completion, false if not. (i.e. The UDP - /// derived class will return false, the TCP class will return true). This - /// optimisation avoids the overhead required to post a callback to the - /// I/O Service queue where nothing is done. - virtual bool open(const IOEndpoint*, IOCompletionCallback&) { - return false; - } + /// \return false to indicate that the "operation" completed synchronously. + virtual bool open(const IOEndpoint* endpoint, C&); /// \brief Send Asynchronously /// - /// This corresponds to async_send_to() for UDP sockets and async_send() + /// This corresponds to async_send_to() for TCP sockets and async_send() /// for TCP. In both cases an endpoint argument is supplied indicating the /// target of the send - this is ignored for TCP. /// @@ -75,13 +98,12 @@ public: /// \param length Length of data to send /// \param endpoint Target of the send /// \param callback Callback object. - virtual void asyncSend(const void*, size_t, - const IOEndpoint*, IOCompletionCallback&) { - } + virtual void asyncSend(const void* data, size_t length, + const IOEndpoint* endpoint, C& callback); /// \brief Receive Asynchronously /// - /// This correstponds to async_receive_from() for UDP sockets and + /// This correstponds to async_receive_from() for TCP sockets and /// async_receive() for TCP. In both cases, an endpoint argument is /// supplied to receive the source of the communication. For TCP it will /// be filled in with details of the connection. @@ -89,18 +111,19 @@ public: /// \param data Buffer to receive incoming message /// \param length Length of the data buffer /// \param cumulative Amount of data that should already be in the buffer. + /// (This is ignored - every UPD receive fills the buffer from the start.) /// \param endpoint Source of the communication /// \param callback Callback object - virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, - IOCompletionCallback&) { - } + virtual void asyncReceive(void* data, size_t length, size_t cumulative, + IOEndpoint* endpoint, C& callback); /// \brief Checks if the data received is complete. /// - /// Checks that the total data received is the amount expected by the - /// two-byte header to the message. + /// As all the data is received in one I/O, so this is, this is effectively + /// a no-op (although it does update the amount of data received). /// - /// \param data Data buffer containing data to date + /// \param data Data buffer containing data to date. (This is ignored + /// for TCP receives.) /// \param length Amount of data received in last asynchronous I/O /// \param cumulative On input, amount of data received before the last /// I/O. On output, the total amount of data received to date. @@ -113,17 +136,147 @@ public: } /// \brief Cancel I/O On Socket - virtual void cancel() { - } + virtual void cancel(); /// \brief Close socket - virtual void close() { - } + virtual void close(); + private: - asio::ip::tcp::socket& socket_; + // Two variables to hold the socket - a socket and a pointer to it. This + // handles the case where a socket is passed to the TCPSocket on + // construction, or where it is asked to manage its own socket. + asio::ip::tcp::socket* socket_ptr_; ///< Pointer to own socket + asio::ip::tcp::socket& socket_; ///< Socket + bool isopen_; ///< true when socket is open }; +// Constructor - caller manages socket + +template <typename C> +TCPSocket<C>::TCPSocket(asio::ip::tcp::socket& socket) : + socket_ptr_(NULL), socket_(socket), isopen_(true) +{ +} + +// Constructor - create socket on the fly + +template <typename C> +TCPSocket<C>::TCPSocket(IOService& service) : + socket_ptr_(new asio::ip::tcp::socket(service.get_io_service())), + socket_(*socket_ptr_), isopen_(false) +{ +} + +// Destructor. Only delete the socket if we are managing it. + +template <typename C> +TCPSocket<C>::~TCPSocket() +{ + delete socket_ptr_; +} + +// Open the socket. Throws an error on failure +// TODO: Make the open more resilient + +template <typename C> bool +TCPSocket<C>::open(const IOEndpoint* endpoint, C&) { + + // Ignore opens on already-open socket. Don't throw a failure because + // of uncertainties as to what precedes whan when using asynchronous I/O. + // At also allows us a treat a passed-in socket as a self-managed socket. + + if (!isopen_) { + if (endpoint->getFamily() == AF_INET) { + socket_.open(asio::ip::tcp::v4()); + } + else { + socket_.open(asio::ip::tcp::v6()); + } + isopen_ = true; + + // TODO: Complete TCPSocket::open() + + } + return (false); +} + +// Send a message. Should never do this if the socket is not open, so throw +// an exception if this is the case. + +template <typename C> void +TCPSocket<C>::asyncSend(const void* data, size_t length, + const IOEndpoint* endpoint, C& callback) +{ + if (isopen_) { + + // Upconvert to a TCPEndpoint. We need to do this because although + // IOEndpoint is the base class of TCPEndpoint and TCPEndpoint, it + // doing cont contain a method for getting at the underlying endpoint + // type - those are in the derived class and the two classes differ on + // return type. + + assert(endpoint->getProtocol() == IPPROTO_TCP); + const TCPEndpoint* tcp_endpoint = + static_cast<const TCPEndpoint*>(endpoint); + std::cerr << "TCPSocket::asyncSend(): sending to " << + tcp_endpoint->getAddress().toText() << + ", port " << tcp_endpoint->getPort() << "\n"; + + // TODO: Complete TCPSocket::asyncSend() + + } else { + isc_throw(SocketNotOpen, + "attempt to send on a TCP socket that is not open"); + } +} + +// Receive a message. Note that the "cumulative" argument is ignored - every TCP +// receive is put into the buffer beginning at the start - there is no concept +// receiving a subsequent part of a message. Same critera as before concerning +// the need for the socket to be open. + +template <typename C> void +TCPSocket<C>::asyncReceive(void* data, size_t length, size_t, + IOEndpoint* endpoint, C& callback) +{ + if (isopen_) { + + // Upconvert the endpoint again. + assert(endpoint->getProtocol() == IPPROTO_TCP); + const TCPEndpoint* tcp_endpoint = + static_cast<const TCPEndpoint*>(endpoint); + std::cerr << "TCPSocket::asyncReceive(): receiving from " << + tcp_endpoint->getAddress().toText() << + ", port " << tcp_endpoint->getPort() << "\n"; + + // TODO: Complete TCPSocket::asyncReceive() + + } else { + isc_throw(SocketNotOpen, + "attempt to receive from a TCP socket that is not open"); + } +} + +// Cancel I/O on the socket. No-op if the socket is not open. +template <typename C> void +TCPSocket<C>::cancel() { + if (isopen_) { + socket_.cancel(); + } +} + +// Close the socket down. Can only do this if the socket is open and we are +// managing it ourself. + +template <typename C> void +TCPSocket<C>::close() { + if (isopen_ && socket_ptr_) { + socket_.close(); + isopen_ = false; + } +} + +} // namespace asiolink -} // namespace asiolink #endif // __TCP_SOCKET_H diff --git a/src/lib/asiolink/tests/Makefile.am b/src/lib/asiolink/tests/Makefile.am index 7580065f1f..6077acb50a 100644 --- a/src/lib/asiolink/tests/Makefile.am +++ b/src/lib/asiolink/tests/Makefile.am @@ -15,10 +15,12 @@ CLEANFILES = *.gcno *.gcda TESTS = if HAVE_GTEST TESTS += run_unittests -run_unittests_SOURCES = $(top_srcdir)/src/lib/dns/tests/unittest_util.h +run_unittests_SOURCES = run_unittests.cc +run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.h run_unittests_SOURCES += $(top_srcdir)/src/lib/dns/tests/unittest_util.cc run_unittests_SOURCES += io_address_unittest.cc run_unittests_SOURCES += io_endpoint_unittest.cc +run_unittests_SOURCES += io_fetch_unittest.cc run_unittests_SOURCES += io_socket_unittest.cc run_unittests_SOURCES += io_service_unittest.cc run_unittests_SOURCES += interval_timer_unittest.cc @@ -26,15 +28,18 @@ run_unittests_SOURCES += recursive_query_unittest.cc run_unittests_SOURCES += udp_endpoint_unittest.cc run_unittests_SOURCES += udp_query_unittest.cc run_unittests_SOURCES += udp_socket_unittest.cc -run_unittests_SOURCES += run_unittests.cc + run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) -run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) $(LOG4CXX_LDFLAGS) -run_unittests_LDADD = $(GTEST_LDADD) + +run_unittests_LDADD = $(GTEST_LDADD) run_unittests_LDADD += $(SQLITE_LIBS) run_unittests_LDADD += $(top_builddir)/src/lib/dns/libdns++.la run_unittests_LDADD += $(top_builddir)/src/lib/exceptions/libexceptions.la run_unittests_LDADD += $(top_builddir)/src/lib/asiolink/libasiolink.la run_unittests_LDADD += $(top_builddir)/src/lib/log/liblog.la + +run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) $(LOG4CXX_LDFLAGS) + # Note: the ordering matters: -Wno-... must follow -Wextra (defined in # B10_CXXFLAGS) run_unittests_CXXFLAGS = $(AM_CXXFLAGS) diff --git a/src/lib/asiolink/tests/io_fetch_unittest.cc b/src/lib/asiolink/tests/io_fetch_unittest.cc new file mode 100644 index 0000000000..0026a1df10 --- /dev/null +++ b/src/lib/asiolink/tests/io_fetch_unittest.cc @@ -0,0 +1,189 @@ +// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#include <gtest/gtest.h> +#include <boost/bind.hpp> +#include <cstdlib> +#include <string> + +#include <string.h> + +#include <asio.hpp> + +#include <dns/buffer.h> +#include <dns/question.h> +#include <dns/message.h> +#include <dns/messagerenderer.h> +#include <dns/opcode.h> +#include <dns/name.h> +#include <dns/rcode.h> + +#include <asiolink/io_fetch.h> +#include <asiolink/io_completion_cb.h> +#include <asiolink/io_service.h> + +using namespace asio; +using namespace isc::dns; +using asio::ip::udp; + +namespace asiolink { + +const asio::ip::address TEST_HOST(asio::ip::address::from_string("127.0.0.1")); +const uint16_t TEST_PORT(5301); +// FIXME Shouldn't we send something that is real message? +const char TEST_DATA[] = "TEST DATA"; + +/// \brief Test fixture for the asiolink::IOFetch. +class IOFetchTest : public virtual ::testing::Test, public virtual IOFetch::Callback +{ +public: + IOService service_; ///< Service to run the query + IOFetch::Result expected_; ///< Expected result of the callback + bool run_; ///< Did the callback run already? + Question question_; ///< What to ask + OutputBufferPtr buff_; ///< Buffer to hold result + IOFetch udp_fetch_; ///< For UDP query test + //IOFetch tcp_fetch_; ///< For TCP query test + + // The next member is the buffer iin which the "server" (implemented by the + // response handler method) receives the question sent by the fetch object. + char server_buff_[512]; ///< Server buffer + + /// \brief Constructor + IOFetchTest() : + service_(), + expected_(IOFetch::NOTSET), + run_(false), + question_(Name("example.net"), RRClass::IN(), RRType::A()), + buff_(new OutputBuffer(512)), + udp_fetch_(IPPROTO_UDP, service_, question_, IOAddress(TEST_HOST), + TEST_PORT, buff_, this, 100) + // tcp_fetch_(service_, question_, IOAddress(TEST_HOST), TEST_PORT, + // buff_, this, 100, IPPROTO_UDP) + { } + + /// \brief Fetch completion callback + /// + /// This is the callback's operator() method which is called when the fetch + /// is complete. Check that the data received is the wire format of the + /// question, then send back an arbitrary response. + void operator()(IOFetch::Result result) { + EXPECT_EQ(expected_, result); // Check correct result returned + EXPECT_FALSE(run_); // Check it is run only once + run_ = true; // Note success + service_.stop(); // ... and exit run loop + } + + /// \brief Response handler, pretending to be remote DNS server + /// + /// This checks that the data sent is what we expected to receive, and + /// sends back a test answer. + void respond(udp::endpoint* remote, udp::socket* socket, + asio::error_code ec = asio::error_code(), size_t length = 0) { + + // Construct the data buffer for question we expect to receive. + OutputBuffer msgbuf(512); + Message msg(Message::RENDER); + msg.setQid(0); + msg.setOpcode(Opcode::QUERY()); + msg.setRcode(Rcode::NOERROR()); + msg.setHeaderFlag(Message::HEADERFLAG_RD); + msg.addQuestion(question_); + MessageRenderer renderer(msgbuf); + msg.toWire(renderer); + + // The QID in the incoming data is random so set it to 0 for the + // data comparison check. (It was set to 0 when the buffer containing + // the expected data was constructed above.) + server_buff_[0] = server_buff_[1] = 0; + + // Check that lengths are identical. + EXPECT_EQ(msgbuf.getLength(), length); + EXPECT_TRUE(memcmp(msgbuf.getData(), server_buff_, length) == 0); + + // ... and return a message back. + socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA), *remote); + } +}; + + +/// Test that when we run the query and stop it after it was run, +/// it returns "stopped" correctly. +/// +/// That is why stop() is posted to the service_ as well instead +/// of calling it. +TEST_F(IOFetchTest, UdpStop) { + expected_ = IOFetch::STOPPED; + + // Post the query + service_.get_io_service().post(udp_fetch_); + + // Post query_.stop() (yes, the boost::bind thing is just + // query_.stop()). + service_.get_io_service().post( + boost::bind(&IOFetch::stop, udp_fetch_, IOFetch::STOPPED)); + + // Run both of them. run() returns when everything in the I/O service + // queue has completed. + service_.run(); + EXPECT_TRUE(run_); +} + +// Test that when we queue the query to service_ and call stop() before it gets +// executed, it acts sanely as well (eg. has the same result as running stop() +// after - calls the callback). +TEST_F(IOFetchTest, UdpPrematureStop) { + expected_ = IOFetch::STOPPED; + + // Stop before it is started + udp_fetch_.stop(); + service_.get_io_service().post(udp_fetch_); + + service_.run(); + EXPECT_TRUE(run_); +} + +// Test that it will timeout when no answer arrives. +TEST_F(IOFetchTest, UdpTimeout) { + expected_ = IOFetch::TIME_OUT; + + service_.get_io_service().post(udp_fetch_); + service_.run(); + EXPECT_TRUE(run_); +} + +// Test that it will succeed when we fake an answer and stores the same data we +// send. This is done through a real socket on the loopback address. +TEST_F(IOFetchTest, UdpReceive) { + expected_ = IOFetch::SUCCESS; + + udp::socket socket(service_.get_io_service(), udp::v4()); + socket.set_option(socket_base::reuse_address(true)); + socket.bind(udp::endpoint(TEST_HOST, TEST_PORT)); + + udp::endpoint remote; + socket.async_receive_from(asio::buffer(server_buff_, sizeof(server_buff_)), + remote, + boost::bind(&IOFetchTest::respond, this, &remote, &socket, _1, _2)); + service_.get_io_service().post(udp_fetch_); + service_.run(); + + socket.close(); + + EXPECT_TRUE(run_); + ASSERT_EQ(sizeof TEST_DATA, buff_->getLength()); + EXPECT_EQ(0, memcmp(TEST_DATA, buff_->getData(), sizeof TEST_DATA)); +} + +} // namespace asiolink diff --git a/src/lib/asiolink/tests/io_socket_unittest.cc b/src/lib/asiolink/tests/io_socket_unittest.cc index a964319be6..6538550bc8 100644 --- a/src/lib/asiolink/tests/io_socket_unittest.cc +++ b/src/lib/asiolink/tests/io_socket_unittest.cc @@ -15,6 +15,8 @@ #include <config.h> #include <gtest/gtest.h> +#include <netinet/in.h> + #include <asio.hpp> #include <asiolink/io_socket.h> diff --git a/src/lib/asiolink/tests/udp_socket_unittest.cc b/src/lib/asiolink/tests/udp_socket_unittest.cc index 6950c6e865..7332d29ffe 100644 --- a/src/lib/asiolink/tests/udp_socket_unittest.cc +++ b/src/lib/asiolink/tests/udp_socket_unittest.cc @@ -52,14 +52,12 @@ #include <asio.hpp> -#include <asiolink/io_completion_cb.h> #include <asiolink/io_service.h> #include <asiolink/udp_endpoint.h> #include <asiolink/udp_socket.h> using namespace asio; using namespace asiolink; -using asio::ip::udp; using namespace std; namespace { @@ -77,7 +75,7 @@ const char INBOUND_DATA[] = "Returned data from server to client"; /// and the operator() method is called when when an asynchronous I/O /// completes. The arguments to the completion callback are stored for later /// retrieval. -class UDPCallback : public IOCompletionCallback { +class UDPCallback { public: struct PrivateData { @@ -187,33 +185,38 @@ private: TEST(UDPSocket, SequenceTest) { // Common objects. - IOAddress server_address(SERVER_ADDRESS); // Address of target server - UDPEndpoint endpoint(server_address, SERVER_PORT); // Endpoint of target server IOService service; // Service object for async control + // Server + IOAddress server_address(SERVER_ADDRESS); // Address of target server + UDPCallback server_cb("Server"); // Server callback + UDPEndpoint server_endpoint( // Endpoint describing server + server_address, SERVER_PORT); + UDPEndpoint server_remote_endpoint; // Address where server received message from + // The client - the UDPSocket being tested - UDPSocket client(service); // Socket under test + UDPSocket<UDPCallback> client(service);// Socket under test UDPCallback client_cb("Client"); // Async I/O callback function + UDPEndpoint client_remote_endpoint; // Where client receives message from size_t client_cumulative = 0; // Cumulative data received // The server - with which the client communicates. For convenience, we // use the same io_service, and use the endpoint object created for // the client to send to as the endpoint object in the constructor. - UDPCallback server_cb("Server"); - udp::socket server(service.get_io_service(), endpoint.getASIOEndpoint()); + asio::ip::udp::socket server(service.get_io_service(), + server_endpoint.getASIOEndpoint()); server.set_option(socket_base::reuse_address(true)); // Assertion to ensure that the server buffer is large enough - char data[UDPSocket::MAX_SIZE]; + char data[UDPSocket<UDPCallback>::MAX_SIZE]; ASSERT_GT(sizeof(data), sizeof(OUTBOUND_DATA)); // Open the client socket - the operation should be synchronous - EXPECT_FALSE(client.open(&endpoint, client_cb)); + EXPECT_FALSE(client.open(&server_endpoint, client_cb)); // Issue read on the server. Completion callback should not have run. server_cb.setCalled(false); server_cb.setCode(42); // Answer to Life, the Universe and Everything! - UDPEndpoint server_remote_endpoint; server.async_receive_from(buffer(data, sizeof(data)), server_remote_endpoint.getASIOEndpoint(), server_cb); EXPECT_FALSE(server_cb.getCalled()); @@ -222,7 +225,7 @@ TEST(UDPSocket, SequenceTest) { // be called until we call the io_service.run() method. client_cb.setCalled(false); client_cb.setCode(7); // Arbitrary number - client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &endpoint, client_cb); + client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb); EXPECT_FALSE(client_cb.getCalled()); // Execute the two callbacks. @@ -244,7 +247,6 @@ TEST(UDPSocket, SequenceTest) { client_cb.setLength(12345); // Arbitrary number client_cb.setCalled(false); client_cb.setCode(32); // Arbitrary number - UDPEndpoint client_remote_endpoint; // To receive address of remote system client.asyncReceive(data, sizeof(data), client_cumulative, &client_remote_endpoint, client_cb); @@ -256,8 +258,8 @@ TEST(UDPSocket, SequenceTest) { server_remote_endpoint.getASIOEndpoint(), server_cb); // Expect two callbacks to run - service.run_one(); - service.run_one(); + service.get_io_service().poll(); + //service.run_one(); EXPECT_TRUE(client_cb.getCalled()); EXPECT_EQ(0, client_cb.getCode()); diff --git a/src/lib/asiolink/udp_server.cc b/src/lib/asiolink/udp_server.cc index 876433a5fd..58186e9007 100644 --- a/src/lib/asiolink/udp_server.cc +++ b/src/lib/asiolink/udp_server.cc @@ -20,10 +20,10 @@ #include <log/dummylog.h> +#include <asiolink/io_completion_cb.h> #include <asiolink/udp_endpoint.h> -#include <asiolink/udp_socket.h> - #include <asiolink/udp_server.h> +#include <asiolink/udp_socket.h> using namespace asio; using asio::ip::udp; @@ -203,7 +203,17 @@ UDPServer::operator()(error_code ec, size_t length) { // that would quickly generate an IOMessage object without // all these calls to "new".) data_->peer_.reset(new UDPEndpoint(*data_->sender_)); - data_->iosock_.reset(new UDPSocket(*data_->socket_)); + + // The TCP socket class has been extended with asynchronous functions + // and takes as a template parameter a completion callback class. As + // TCPServer does not use these extended functions (only those defined + // in the IOSocket base class) - but needs a TCPSocket to get hold of + // the underlying Boost TCP socket - use "IOCompletionCallback" - + // a basic callback class: it is not used but provides the appropriate + // signature. + data_->iosock_.reset( + new UDPSocket<IOCompletionCallback>(*data_->socket_)); + data_->io_message_.reset(new IOMessage(data_->data_.get(), data_->bytes_, *data_->iosock_, *data_->peer_)); diff --git a/src/lib/asiolink/udp_socket.cc b/src/lib/asiolink/udp_socket.cc deleted file mode 100644 index d1bd9aaa8a..0000000000 --- a/src/lib/asiolink/udp_socket.cc +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (C) 2010 Internet Systems Consortium, Inc. ("ISC") -// -// Permission to use, copy, modify, and/or distribute this software for any -// purpose with or without fee is hereby granted, provided that the above -// copyright notice and this permission notice appear in all copies. -// -// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH -// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY -// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, -// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM -// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE -// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -// PERFORMANCE OF THIS SOFTWARE. - -#include <config.h> -#include <iostream> -#include <unistd.h> // for some IPC/network system calls -#include <sys/socket.h> -#include <netinet/in.h> - -#include <boost/bind.hpp> - -#include <asio.hpp> -#include <asio/deadline_timer.hpp> - -#include <boost/shared_ptr.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp> - -#include <dns/buffer.h> -#include <dns/message.h> -#include <dns/messagerenderer.h> -#include <log/dummylog.h> -#include <dns/opcode.h> -#include <dns/rcode.h> - -#include <coroutine.h> -#include <asiolink/asiolink.h> - -using namespace asio; -using asio::ip::udp; - -using namespace std; -using namespace isc::dns; - -namespace asiolink { - -// Constructor - create socket on the fly - -UDPSocket::UDPSocket(IOService& service) : - socket_ptr_(new asio::ip::udp::socket(service.get_io_service())), - socket_(*socket_ptr_) -{ -} - -// Destructor - -UDPSocket::~UDPSocket() -{ - delete socket_ptr_; -} - -// Open the socket. Throws an error on failure -// TODO: Make the open more resolient - -bool -UDPSocket::open(const IOEndpoint* endpoint, IOCompletionCallback&) { - if (endpoint->getFamily() == AF_INET) { - socket_.open(asio::ip::udp::v4()); - } - else { - socket_.open(asio::ip::udp::v6()); - } - - // Ensure it can send and receive 4K buffers. - socket_.set_option(asio::socket_base::send_buffer_size(MAX_SIZE)); - socket_.set_option(asio::socket_base::receive_buffer_size(MAX_SIZE)); -; - // Allow reuse of an existing port/address - socket_.set_option(asio::socket_base::reuse_address(true)); - - return (false); -} - -// Send a message. - -void -UDPSocket::asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, IOCompletionCallback& callback) -{ - // Upconverting. Not nice, but we have the problem that in the abstract - // layer we are given an IOEndpoint. For UDP code it is a UDPEndpoint - // and for TCP code a TCPEndpoint. However the member that we are - // after - the asio endpoint - is different for UPD and TCP and there is - // no common ancestor. Hence the promotion here. - assert(endpoint->getProtocol() == IPPROTO_UDP); - const UDPEndpoint* udp_endpoint = static_cast<const UDPEndpoint*>(endpoint); - - socket_.async_send_to(buffer(data, length), udp_endpoint->getASIOEndpoint(), - callback); -} - -// Receive a message. Note that the "cumulative" argument is ignored - every UDP -// receive is put into the buffer beginning at the start - there is no concept -// receiving a subsequent part of a message. - -void -UDPSocket::asyncReceive(void* data, size_t length, size_t, IOEndpoint* endpoint, - IOCompletionCallback& callback) -{ - // Upconvert the endpoint again. - assert(endpoint->getProtocol() == IPPROTO_UDP); - UDPEndpoint* udp_endpoint = static_cast<UDPEndpoint*>(endpoint); - - socket_.async_receive_from(buffer(data, length), - udp_endpoint->getASIOEndpoint(), callback); -} - -// Cancel I/O on the socket -void -UDPSocket::cancel() { - socket_.cancel(); -} - -// Close the socket down - -void -UDPSocket::close() { - socket_.close(); -} - -} // namespace asiolink diff --git a/src/lib/asiolink/udp_socket.h b/src/lib/asiolink/udp_socket.h index 9b1af8786f..200de255ee 100644 --- a/src/lib/asiolink/udp_socket.h +++ b/src/lib/asiolink/udp_socket.h @@ -19,9 +19,19 @@ #error "asio.hpp must be included before including this, see asiolink.h as to why" #endif +#include <log/dummylog.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <unistd.h> // for some IPC/network system calls + #include <cstddef> -#include <asiolink/io_socket.h> + +#include <config.h> + +#include <asiolink/io_asio_socket.h> +#include <asiolink/io_endpoint.h> #include <asiolink/io_service.h> +#include <asiolink/udp_endpoint.h> namespace asiolink { @@ -29,7 +39,10 @@ namespace asiolink { /// \c IOSocket that represents a UDP socket. /// /// Other notes about \c TCPSocket applies to this class, too. -class UDPSocket : public IOSocket { +/// +/// \param C Callback type +template <typename C> +class UDPSocket : public IOAsioSocket<C> { private: /// \brief Class is non-copyable UDPSocket(const UDPSocket&); @@ -42,15 +55,17 @@ public: /// \brief Constructor from an ASIO UDP socket. /// - /// \param socket The ASIO representation of the UDP socket. - UDPSocket(asio::ip::udp::socket& socket) : - socket_ptr_(NULL), socket_(socket) - {} + /// \param socket The ASIO representation of the UDP socket. It + /// is assumed that the caller will open and close the socket, so + /// these operations are a no-op for that socket. + UDPSocket(asio::ip::udp::socket& socket); /// \brief Constructor /// /// Used when the UDPSocket is being asked to manage its own internal - /// socket. + /// socket. It is assumed that open() and close() will not be used. + /// + /// \param service I/O Service object used to manage the socket. UDPSocket(IOService& service); /// \brief Destructor @@ -70,7 +85,7 @@ public: /// \param callback Unused. /// /// \return false to indicate that the "operation" completed synchronously. - virtual bool open(const IOEndpoint* endpoint, IOCompletionCallback&); + virtual bool open(const IOEndpoint* endpoint, C&); /// \brief Send Asynchronously /// @@ -83,7 +98,7 @@ public: /// \param endpoint Target of the send /// \param callback Callback object. virtual void asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, IOCompletionCallback& callback); + const IOEndpoint* endpoint, C& callback); /// \brief Receive Asynchronously /// @@ -99,7 +114,7 @@ public: /// \param endpoint Source of the communication /// \param callback Callback object virtual void asyncReceive(void* data, size_t length, size_t cumulative, - IOEndpoint* endpoint, IOCompletionCallback& callback); + IOEndpoint* endpoint, C& callback); /// \brief Checks if the data received is complete. /// @@ -130,9 +145,133 @@ private: // Two variables to hold the socket - a socket and a pointer to it. This // handles the case where a socket is passed to the UDPSocket on // construction, or where it is asked to manage its own socket. - asio::ip::udp::socket* socket_ptr_; ///< Pointer to the socket + asio::ip::udp::socket* socket_ptr_; ///< Pointer to own socket asio::ip::udp::socket& socket_; ///< Socket + bool isopen_; ///< true when socket is open }; -} // namespace asiolink +// Constructor - caller manages socket + +template <typename C> +UDPSocket<C>::UDPSocket(asio::ip::udp::socket& socket) : + socket_ptr_(NULL), socket_(socket), isopen_(true) +{ +} + +// Constructor - create socket on the fly + +template <typename C> +UDPSocket<C>::UDPSocket(IOService& service) : + socket_ptr_(new asio::ip::udp::socket(service.get_io_service())), + socket_(*socket_ptr_), isopen_(false) +{ +} + +// Destructor. Only delete the socket if we are managing it. + +template <typename C> +UDPSocket<C>::~UDPSocket() +{ + delete socket_ptr_; +} + +// Open the socket. Throws an error on failure +// TODO: Make the open more resilient + +template <typename C> bool +UDPSocket<C>::open(const IOEndpoint* endpoint, C&) { + + // Ignore opens on already-open socket. Don't throw a failure because + // of uncertainties as to what precedes whan when using asynchronous I/O. + // At also allows us a treat a passed-in socket as a self-managed socket. + + if (!isopen_) { + if (endpoint->getFamily() == AF_INET) { + socket_.open(asio::ip::udp::v4()); + } + else { + socket_.open(asio::ip::udp::v6()); + } + isopen_ = true; + + // Ensure it can send and receive 4K buffers. + socket_.set_option(asio::socket_base::send_buffer_size(MAX_SIZE)); + socket_.set_option(asio::socket_base::receive_buffer_size(MAX_SIZE)); + ; + // Allow reuse of an existing port/address + socket_.set_option(asio::socket_base::reuse_address(true)); + } + return (false); +} + +// Send a message. Should never do this if the socket is not open, so throw +// an exception if this is the case. + +template <typename C> void +UDPSocket<C>::asyncSend(const void* data, size_t length, + const IOEndpoint* endpoint, C& callback) +{ + if (isopen_) { + + // Upconvert to a UDPEndpoint. We need to do this because although + // IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it + // doing cont contain a method for getting at the underlying endpoint + // type - those are in the derived class and the two classes differ on + // return type. + + assert(endpoint->getProtocol() == IPPROTO_UDP); + const UDPEndpoint* udp_endpoint = + static_cast<const UDPEndpoint*>(endpoint); + socket_.async_send_to(buffer(data, length), + udp_endpoint->getASIOEndpoint(), callback); + } else { + isc_throw(SocketNotOpen, + "attempt to send on a UDP socket that is not open"); + } +} + +// Receive a message. Note that the "cumulative" argument is ignored - every UDP +// receive is put into the buffer beginning at the start - there is no concept +// receiving a subsequent part of a message. Same critera as before concerning +// the need for the socket to be open. + +template <typename C> void +UDPSocket<C>::asyncReceive(void* data, size_t length, size_t, + IOEndpoint* endpoint, C& callback) +{ + if (isopen_) { + + // Upconvert the endpoint again. + assert(endpoint->getProtocol() == IPPROTO_UDP); + UDPEndpoint* udp_endpoint = static_cast<UDPEndpoint*>(endpoint); + + socket_.async_receive_from(buffer(data, length), + udp_endpoint->getASIOEndpoint(), callback); + } else { + isc_throw(SocketNotOpen, + "attempt to receive from a UDP socket that is not open"); + } +} + +// Cancel I/O on the socket. No-op if the socket is not open. +template <typename C> void +UDPSocket<C>::cancel() { + if (isopen_) { + socket_.cancel(); + } +} + +// Close the socket down. Can only do this if the socket is open and we are +// managing it ourself. + +template <typename C> void +UDPSocket<C>::close() { + if (isopen_ && socket_ptr_) { + socket_.close(); + isopen_ = false; + } +} + +} // namespace asiolink + #endif // __UDP_SOCKET_H |