diff --git a/include/threadpool.h b/include/threadpool.h index c218425..f27dfc1 100644 --- a/include/threadpool.h +++ b/include/threadpool.h @@ -16,14 +16,18 @@ private: std::queue> _queue; std::mutex _mutex; std::condition_variable _cv; + std::condition_variable _wait_cv; std::vector _workers; bool _enabled; + std::atomic _tasksCount; public: threadpool(); threadpool(std::size_t concurrency); ~threadpool(); + void wait_all(); + template std::future add_task(std::function func) { std::lock_guard lock(_mutex); @@ -31,6 +35,7 @@ public: std::future fut = pt_ptr->get_future(); _queue.push([pt_ptr] { (*pt_ptr)(); }); + _tasksCount++; _cv.notify_one(); return fut; } diff --git a/src/threadpool.cpp b/src/threadpool.cpp index f58cb22..9035c80 100644 --- a/src/threadpool.cpp +++ b/src/threadpool.cpp @@ -1,10 +1,11 @@ +#include #include "threadpool.h" threadpool::threadpool() : threadpool(std::thread::hardware_concurrency()) { } -threadpool::threadpool(std::size_t concurrency) : _concurrency(concurrency), _enabled(true) +threadpool::threadpool(std::size_t concurrency) : _concurrency(concurrency), _enabled(true), _tasksCount(0) { for (std::size_t i = 0; i < _concurrency; ++i) { @@ -38,5 +39,13 @@ void threadpool::worker_func() lock.unlock(); func(); + _tasksCount--; + _wait_cv.notify_one(); } } + +void threadpool::wait_all() +{ + std::unique_lock lock(_mutex); + _wait_cv.wait(lock, [this] { return _tasksCount == 0; }); +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index aebf0f5..2d214fd 100755 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -62,7 +62,7 @@ project(${TEST_SERVER_PROJECT_NAME} CXX) aux_source_directory(${SRC_DIR}/server TEST_SERVER_SRC) add_executable(${TEST_SERVER_PROJECT_NAME} ${TEST_SERVER_SRC}) add_dependencies(${TEST_SERVER_PROJECT_NAME} boost cpputil) -target_link_libraries(${TEST_SERVER_PROJECT_NAME} cpputil ${CMAKE_SOURCE_DIR}/lib/libboost_system.a ${CMAKE_SOURCE_DIR}/lib/libboost_thread.a ${CMAKE_SOURCE_DIR}/lib/libboost_coroutine.a ${CMAKE_SOURCE_DIR}/lib/libboost_context.a) +target_link_libraries(${TEST_SERVER_PROJECT_NAME} cpputil ${CMAKE_SOURCE_DIR}/lib/libboost_system.a ${CMAKE_SOURCE_DIR}/lib/libboost_coroutine.a ${CMAKE_SOURCE_DIR}/lib/libboost_context.a ${CMAKE_SOURCE_DIR}/lib/libboost_thread.a) install(TARGETS ${TEST_PROJECT_NAME} RUNTIME DESTINATION bin diff --git a/test/server/main.cpp b/test/server/main.cpp index 979796c..c5f8906 100644 --- a/test/server/main.cpp +++ b/test/server/main.cpp @@ -1,35 +1,34 @@ #include "session.h" - -#include -#include - -using tcp = boost::asio::ip::tcp; - -void start_server(std::string address, std::string port) -{ - boost::asio::io_service io_service; - tcp::resolver resolver(io_service); - tcp::resolver::query query(address, port); - tcp::acceptor acceptor(io_service, *resolver.resolve(query)); - - boost::asio::spawn(io_service, [&](boost::asio::yield_context yield){ - for (;;) - { - session* s = new session(io_service); - acceptor.async_accept(s->socket(), yield); - s->start(); - } - }); - - io_service.run(); -} +#include "server.h" int main() { -// server srv("0.0.0.0", "12345"); -// srv.run(); + server srv("0.0.0.0", "12345"); - start_server("0.0.0.0", "12345"); + std::thread t([&]{ + std::this_thread::sleep_for(std::chrono::seconds(10)); + srv.stop(); + }); + t.detach(); + srv.run(); + std::cout << "after run" << std::endl; + +/* + threadpool pool; + + pool.add_task([]{ + std::this_thread::sleep_for(std::chrono::seconds(5)); + std::cout << "first task completed" << std::endl; + }); + + pool.add_task([]{ + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::cout << "second task completed" << std::endl; + }); + + pool.wait_all(); + std::cout << "all tasks completed" << std::endl; +*/ return 0; } \ No newline at end of file diff --git a/test/server/server.cpp b/test/server/server.cpp new file mode 100644 index 0000000..caac96c --- /dev/null +++ b/test/server/server.cpp @@ -0,0 +1,50 @@ +#include "server.h" +#include + +server::server(std::string address, std::string port) +{ + tcp::resolver resolver(_io_service); + tcp::resolver::query query(address, port); + _acceptor = new tcp::acceptor(_io_service, *resolver.resolve(query)); +} + +void server::run() +{ + boost::asio::spawn(_io_service, [&](boost::asio::yield_context yield) { + try + { + for (;;) + { + session *s = new session(_io_service, &_pool); + _acceptor->async_accept(s->socket(), yield); + _sessions.insert(s); + s->start(); + } + } + catch (boost::system::system_error& ex) + { + std::cout << "server exception: " << ex.what() << std::endl; + } + }); + + _io_service.run(); + std::cout << "after ioservice run" << std::endl; + std::for_each(_sessions.begin(), _sessions.end(), [](session* s){ delete s; }); + _sessions.clear(); +} + +server::~server() +{ + std::cout << "server destructor" << std::endl; + delete _acceptor; +} + +void server::stop() +{ + _io_service.post([&]{ + _acceptor->close(); + std::for_each(_sessions.begin(), _sessions.end(), [](session* s){ s->stop(); }); + _pool.wait_all(); + std::cout << "stop exited" << std::endl; + }); +} diff --git a/test/server/server.h b/test/server/server.h new file mode 100644 index 0000000..9c673e5 --- /dev/null +++ b/test/server/server.h @@ -0,0 +1,30 @@ +#ifndef _SERVER_H_ +#define _SERVER_H_ + +#include "threadpool.h" +#include "session.h" +#include +#include + +class server +{ +private: + typedef boost::asio::ip::tcp tcp; + +private: + boost::asio::io_service _io_service; + tcp::acceptor* _acceptor; + threadpool _pool; + std::set _sessions; + +public: + server() = delete; + server(server&) = delete; + server(std::string address, std::string port); + void run(); + void stop(); + ~server(); +}; + +#endif // _SERVER_H_ + diff --git a/test/server/session.cpp b/test/server/session.cpp index 820aa2c..168bf9c 100644 --- a/test/server/session.cpp +++ b/test/server/session.cpp @@ -1,25 +1,45 @@ #include "session.h" #include #include +#include -session::session(boost::asio::io_service &io_service): _socket(io_service) +session::session(boost::asio::io_service &io_service, threadpool* pool): _socket(io_service), _pool(pool) { } void session::start() { boost::asio::spawn(_socket.get_io_service(), [this](boost::asio::yield_context yield){ - for(;;) - { - std::size_t bytes = _socket.async_read_some(boost::asio::buffer(_data, 128), yield); + try + { + for (;;) { + std::size_t bytes = _socket.async_read_some(boost::asio::buffer(_data, 128), yield); - int n = async_call([this]{ - int n = std::atoi(_data); - return n*n; - }, yield); + std::cout << "before async call" << std::endl; + int n = async_call([this] { + std::cout << "async call start waiting" << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(15)); + std::cout << "async call ends waiting" << std::endl; + int n = std::atoi(_data); + return n * n; + }, yield); - _socket.async_write_some(boost::asio::buffer(std::to_string(n) + "\n"), yield); - } + std::cout << "before write: " << n << std::endl; + _socket.async_write_some(boost::asio::buffer(std::to_string(n) + "\n"), yield); + std::cout << "after write" << std::endl; + } + } + catch (boost::system::system_error& ex) + { + std::cout << "exception: " << ex.code().value() << " - " << ex.what() << std::endl; + + if(ex.code().value() == 2) // end of file + { + //FIXME: Удалять сессию должен сервер + std::cout << "deleting session" << std::endl; + delete this; + } + } }); } @@ -27,3 +47,13 @@ boost::asio::ip::tcp::socket& session::socket() { return _socket; } + +void session::stop() +{ + _socket.close(); +} + +session::~session() +{ + std::cout << "session destructor called" << std::endl; +} diff --git a/test/server/session.h b/test/server/session.h index 8bffe52..7b67c6a 100644 --- a/test/server/session.h +++ b/test/server/session.h @@ -3,6 +3,7 @@ #include "threadpool.h" #include +#include class session { @@ -12,21 +13,24 @@ private: private: tcp::socket _socket; char _data[128]; - threadpool _pool; + threadpool* _pool; public: session() = delete; session(session&) = delete; - session(boost::asio::io_service& io_service); + session(boost::asio::io_service& io_service, threadpool* pool); + ~session(); void start(); + void stop(); tcp::socket& socket(); template R async_call(std::function func, CompletionToken&& token) { - typename boost::asio::handler_type::type handler(std::forward(token)); - boost::asio::async_result result(handler); + using namespace boost::asio; + BOOST_ASIO_HANDLER_TYPE(CompletionToken, void(R)) handler(std::forward(token)); + async_result result(handler); - _pool.add_task([this, handler, &func]{ + _pool->add_task([this, handler, &func]{ _socket.get_io_service().dispatch(std::bind(handler, func())); });