Добавил возможность останавливать сервер

This commit is contained in:
Selim Mustafaev 2014-12-28 22:48:09 +03:00
parent d360fa0252
commit abbd6b72d5
8 changed files with 171 additions and 44 deletions

View File

@ -16,14 +16,18 @@ private:
std::queue<std::function<void()>> _queue;
std::mutex _mutex;
std::condition_variable _cv;
std::condition_variable _wait_cv;
std::vector<std::thread> _workers;
bool _enabled;
std::atomic<std::size_t> _tasksCount;
public:
threadpool();
threadpool(std::size_t concurrency);
~threadpool();
void wait_all();
template <typename R> std::future<R> add_task(std::function<R()> func)
{
std::lock_guard<std::mutex> lock(_mutex);
@ -31,6 +35,7 @@ public:
std::future<R> fut = pt_ptr->get_future();
_queue.push([pt_ptr] { (*pt_ptr)(); });
_tasksCount++;
_cv.notify_one();
return fut;
}

View File

@ -1,10 +1,11 @@
#include <iostream>
#include "threadpool.h"
threadpool::threadpool() : threadpool(std::thread::hardware_concurrency())
{
}
threadpool::threadpool(std::size_t concurrency) : _concurrency(concurrency), _enabled(true)
threadpool::threadpool(std::size_t concurrency) : _concurrency(concurrency), _enabled(true), _tasksCount(0)
{
for (std::size_t i = 0; i < _concurrency; ++i)
{
@ -38,5 +39,13 @@ void threadpool::worker_func()
lock.unlock();
func();
_tasksCount--;
_wait_cv.notify_one();
}
}
void threadpool::wait_all()
{
std::unique_lock<std::mutex> lock(_mutex);
_wait_cv.wait(lock, [this] { return _tasksCount == 0; });
}

View File

@ -62,7 +62,7 @@ project(${TEST_SERVER_PROJECT_NAME} CXX)
aux_source_directory(${SRC_DIR}/server TEST_SERVER_SRC)
add_executable(${TEST_SERVER_PROJECT_NAME} ${TEST_SERVER_SRC})
add_dependencies(${TEST_SERVER_PROJECT_NAME} boost cpputil)
target_link_libraries(${TEST_SERVER_PROJECT_NAME} cpputil ${CMAKE_SOURCE_DIR}/lib/libboost_system.a ${CMAKE_SOURCE_DIR}/lib/libboost_thread.a ${CMAKE_SOURCE_DIR}/lib/libboost_coroutine.a ${CMAKE_SOURCE_DIR}/lib/libboost_context.a)
target_link_libraries(${TEST_SERVER_PROJECT_NAME} cpputil ${CMAKE_SOURCE_DIR}/lib/libboost_system.a ${CMAKE_SOURCE_DIR}/lib/libboost_coroutine.a ${CMAKE_SOURCE_DIR}/lib/libboost_context.a ${CMAKE_SOURCE_DIR}/lib/libboost_thread.a)
install(TARGETS ${TEST_PROJECT_NAME}
RUNTIME DESTINATION bin

View File

@ -1,35 +1,34 @@
#include "session.h"
#include <boost/asio/spawn.hpp>
#include <iostream>
using tcp = boost::asio::ip::tcp;
void start_server(std::string address, std::string port)
{
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(address, port);
tcp::acceptor acceptor(io_service, *resolver.resolve(query));
boost::asio::spawn(io_service, [&](boost::asio::yield_context yield){
for (;;)
{
session* s = new session(io_service);
acceptor.async_accept(s->socket(), yield);
s->start();
}
});
io_service.run();
}
#include "server.h"
int main()
{
// server srv("0.0.0.0", "12345");
// srv.run();
server srv("0.0.0.0", "12345");
start_server("0.0.0.0", "12345");
std::thread t([&]{
std::this_thread::sleep_for(std::chrono::seconds(10));
srv.stop();
});
t.detach();
srv.run();
std::cout << "after run" << std::endl;
/*
threadpool pool;
pool.add_task<void>([]{
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "first task completed" << std::endl;
});
pool.add_task<void>([]{
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "second task completed" << std::endl;
});
pool.wait_all();
std::cout << "all tasks completed" << std::endl;
*/
return 0;
}

50
test/server/server.cpp Normal file
View File

@ -0,0 +1,50 @@
#include "server.h"
#include <boost/asio/spawn.hpp>
server::server(std::string address, std::string port)
{
tcp::resolver resolver(_io_service);
tcp::resolver::query query(address, port);
_acceptor = new tcp::acceptor(_io_service, *resolver.resolve(query));
}
void server::run()
{
boost::asio::spawn(_io_service, [&](boost::asio::yield_context yield) {
try
{
for (;;)
{
session *s = new session(_io_service, &_pool);
_acceptor->async_accept(s->socket(), yield);
_sessions.insert(s);
s->start();
}
}
catch (boost::system::system_error& ex)
{
std::cout << "server exception: " << ex.what() << std::endl;
}
});
_io_service.run();
std::cout << "after ioservice run" << std::endl;
std::for_each(_sessions.begin(), _sessions.end(), [](session* s){ delete s; });
_sessions.clear();
}
server::~server()
{
std::cout << "server destructor" << std::endl;
delete _acceptor;
}
void server::stop()
{
_io_service.post([&]{
_acceptor->close();
std::for_each(_sessions.begin(), _sessions.end(), [](session* s){ s->stop(); });
_pool.wait_all();
std::cout << "stop exited" << std::endl;
});
}

30
test/server/server.h Normal file
View File

@ -0,0 +1,30 @@
#ifndef _SERVER_H_
#define _SERVER_H_
#include "threadpool.h"
#include "session.h"
#include <boost/asio.hpp>
#include <set>
class server
{
private:
typedef boost::asio::ip::tcp tcp;
private:
boost::asio::io_service _io_service;
tcp::acceptor* _acceptor;
threadpool _pool;
std::set<session*> _sessions;
public:
server() = delete;
server(server&) = delete;
server(std::string address, std::string port);
void run();
void stop();
~server();
};
#endif // _SERVER_H_

View File

@ -1,24 +1,44 @@
#include "session.h"
#include <iostream>
#include <boost/asio/spawn.hpp>
#include <thread>
session::session(boost::asio::io_service &io_service): _socket(io_service)
session::session(boost::asio::io_service &io_service, threadpool* pool): _socket(io_service), _pool(pool)
{
}
void session::start()
{
boost::asio::spawn(_socket.get_io_service(), [this](boost::asio::yield_context yield){
for(;;)
try
{
for (;;) {
std::size_t bytes = _socket.async_read_some(boost::asio::buffer(_data, 128), yield);
std::cout << "before async call" << std::endl;
int n = async_call<int>([this] {
std::cout << "async call start waiting" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(15));
std::cout << "async call ends waiting" << std::endl;
int n = std::atoi(_data);
return n * n;
}, yield);
std::cout << "before write: " << n << std::endl;
_socket.async_write_some(boost::asio::buffer(std::to_string(n) + "\n"), yield);
std::cout << "after write" << std::endl;
}
}
catch (boost::system::system_error& ex)
{
std::cout << "exception: " << ex.code().value() << " - " << ex.what() << std::endl;
if(ex.code().value() == 2) // end of file
{
//FIXME: Удалять сессию должен сервер
std::cout << "deleting session" << std::endl;
delete this;
}
}
});
}
@ -27,3 +47,13 @@ boost::asio::ip::tcp::socket& session::socket()
{
return _socket;
}
void session::stop()
{
_socket.close();
}
session::~session()
{
std::cout << "session destructor called" << std::endl;
}

View File

@ -3,6 +3,7 @@
#include "threadpool.h"
#include <boost/asio.hpp>
#include <iostream>
class session
{
@ -12,21 +13,24 @@ private:
private:
tcp::socket _socket;
char _data[128];
threadpool _pool;
threadpool* _pool;
public:
session() = delete;
session(session&) = delete;
session(boost::asio::io_service& io_service);
session(boost::asio::io_service& io_service, threadpool* pool);
~session();
void start();
void stop();
tcp::socket& socket();
template <typename R, typename CompletionToken> R async_call(std::function<R()> func, CompletionToken&& token)
{
typename boost::asio::handler_type<CompletionToken, void(R)>::type handler(std::forward<CompletionToken>(token));
boost::asio::async_result<decltype(handler)> result(handler);
using namespace boost::asio;
BOOST_ASIO_HANDLER_TYPE(CompletionToken, void(R)) handler(std::forward<CompletionToken>(token));
async_result<decltype(handler)> result(handler);
_pool.add_task<void>([this, handler, &func]{
_pool->add_task<void>([this, handler, &func]{
_socket.get_io_service().dispatch(std::bind(handler, func()));
});