summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFrancis Dupont <fdupont@isc.org>2020-09-06 15:58:58 +0200
committerTomek Mrugalski <thomson@klub.com.pl>2020-10-22 19:19:48 +0200
commitadef8f55650fb18e0d5f2ebf58252272fafcd613 (patch)
treecbb64bd2ddac210e7315023a81892897cc01fb08
parent[#1445] Reverted Changelog reformat. (diff)
downloadkea-adef8f55650fb18e0d5f2ebf58252272fafcd613.tar.xz
kea-adef8f55650fb18e0d5f2ebf58252272fafcd613.zip
[#1306] Added statistic collect
-rw-r--r--src/lib/util/tests/thread_pool_unittest.cc17
-rw-r--r--src/lib/util/thread_pool.h70
2 files changed, 86 insertions, 1 deletions
diff --git a/src/lib/util/tests/thread_pool_unittest.cc b/src/lib/util/tests/thread_pool_unittest.cc
index c39f61e785..4821385347 100644
--- a/src/lib/util/tests/thread_pool_unittest.cc
+++ b/src/lib/util/tests/thread_pool_unittest.cc
@@ -450,6 +450,11 @@ TEST_F(ThreadPoolTest, startAndStop) {
ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
+
+ /// statistics
+ std::cout << "stat10: " << thread_pool.getQueueStat(10) << std::endl;
+ std::cout << "stat100: " << thread_pool.getQueueStat(100) << std::endl;
+ std::cout << "stat1000: " << thread_pool.getQueueStat(1000) << std::endl;
}
/// @brief test ThreadPool max queue size
@@ -526,4 +531,16 @@ TEST_F(ThreadPoolTest, addFront) {
EXPECT_EQ(thread_pool.count(), items_count);
}
+/// @brief test ThreadPool get queue statistics.
+TEST_F(ThreadPoolTest, getQueueStat) {
+ ThreadPool<CallBack> thread_pool;
+ EXPECT_THROW(thread_pool.getQueueStat(0), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(1), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(-10), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(10000), InvalidParameter);
+ EXPECT_NO_THROW(thread_pool.getQueueStat(10));
+ EXPECT_NO_THROW(thread_pool.getQueueStat(100));
+ EXPECT_NO_THROW(thread_pool.getQueueStat(1000));
+}
+
} // namespace
diff --git a/src/lib/util/thread_pool.h b/src/lib/util/thread_pool.h
index 07cc230211..b1e8ca3e4a 100644
--- a/src/lib/util/thread_pool.h
+++ b/src/lib/util/thread_pool.h
@@ -12,6 +12,7 @@
#include <boost/shared_ptr.hpp>
#include <atomic>
+#include <cmath>
#include <condition_variable>
#include <list>
#include <mutex>
@@ -28,6 +29,16 @@ namespace util {
/// @tparam Container a 'queue like' container
template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
struct ThreadPool {
+ /// @brief Rounding value for 10 packet statistic.
+ static const double CEXP10;
+
+ /// @brief Rounding value for 100 packet statistic.
+ static const double CEXP100;
+
+ /// @brief Rounding value for 1000 packet statistic.
+ static const double CEXP1000;
+
+ /// @brief Type of shared pointers to work items.
typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
/// @brief Constructor
@@ -120,6 +131,15 @@ struct ThreadPool {
return (threads_.size());
}
+ /// @brief get queue length statistic
+ ///
+ /// @param which select the statistic (10, 100 or 1000)
+ /// @return the queue length statistic
+ /// @throw InvalidParameter if which is not 10 and 100 and 1000.
+ double getQueueStat(size_t which) {
+ return (queue_.getQueueStat(which));
+ }
+
private:
/// @brief start all the threads
///
@@ -173,7 +193,9 @@ private:
/// @brief Constructor
///
/// Creates the thread pool queue in 'disabled' state
- ThreadPoolQueue() : enabled_(false), max_queue_size_(0) {
+ ThreadPoolQueue()
+ : enabled_(false), max_queue_size_(0),
+ stat10(0.), stat100(0.), stat1000(0.) {
}
/// @brief Destructor
@@ -263,6 +285,7 @@ private:
/// If the queue is 'enabled', this function returns the first element in
/// the queue or blocks the calling thread if there are no work items
/// available.
+ /// Before a work item is returned statistics are updated.
///
/// @return the first work item from the queue or an empty element.
Item pop() {
@@ -272,6 +295,10 @@ private:
if (!enabled_) {
return (Item());
}
+ size_t length = queue_.size();
+ stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
+ stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
+ stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
Item item = queue_.front();
queue_.pop_front();
return (item);
@@ -287,6 +314,26 @@ private:
return (queue_.size());
}
+ /// @brief get queue length statistic
+ ///
+ /// @param which select the statistic (10, 100 or 1000)
+ /// @return the queue length statistic
+ /// @throw InvalidParameter if which is not 10 and 100 and 1000.
+ double getQueueStat(size_t which) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ switch (which) {
+ case 10:
+ return (stat10);
+ case 100:
+ return (stat100);
+ case 1000:
+ return (stat1000);
+ default:
+ isc_throw(InvalidParameter, "supported statistic for "
+ << "10/100/1000 only, not " << which);
+ }
+ }
+
/// @brief clear remove all work items
///
/// Removes all queued work items
@@ -342,6 +389,15 @@ private:
/// @brief maximum number of work items in the queue
/// (0 means unlimited)
size_t max_queue_size_;
+
+ /// @brief queue length statistic for 10 packets
+ double stat10;
+
+ /// @brief queue length statistic for 100 packets
+ double stat100;
+
+ /// @brief queue length statistic for 1000 packets
+ double stat1000;
};
/// @brief run function of each thread
@@ -365,6 +421,18 @@ private:
ThreadPoolQueue<WorkItemPtr, Container> queue_;
};
+/// Initialize the 10 packet rounding to exp(-.1)
+template <typename W, typename C>
+ const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
+
+/// Initialize the 100 packet rounding to exp(-.01)
+template <typename W, typename C>
+const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
+
+/// Initialize the 1000 packet rounding to exp(-.1)
+template <typename W, typename C>
+const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
+
} // namespace util
} // namespace isc