1.启动程序
1.1 开启单线程接受用户请求
//开启一个线程 获取是否有用户连接
g_App.ptrSelectorThread = new commonnet::SelectorThread(g_App.ptrTimer);
//开启任务 处理任务 handleRequest
RequestHandler requestHandler;
g_App.ptrServer = new commonutil::MWThreadServer(ptrProperties->getProperty("Local.AllIP"),
ptrProperties->getPropertyAsInt("tfb_usermgr.Port"),
10,
g_App.ptrSelectorThread,
&requestHandler);
g_App.ptrServer->startWork(); //开始工作 处理任务
g_App.ptrServer->join(); //等待信号停止任务
g_App.ptrSelectorThread->destroy(); //停止接单
g_App.ptrTimer->destroy();
g_App.ptrSelectorThread->joinWithThread();
1.2 监听端口 以及工作线程处理任务
//(new commonnet::TcpAcceptor(strIP, nPort,10) 新建一个tcp连接获取消息,
//参数10为 listen参数。(::listen(fd, backlog) == SOCKET_ERROR)
//服务器监听时,在每次处理一个客户端的连接时是需要一定时间的,这个时间非常的短(也许只有1ms 或者还不到),
//但这个时间还是存在的。而这个backlog 存在的意义就是:在这段时间里面除了第一个连接请求是正在进行处理以外,
//其他的连接请求都在请求队列中等待,而如果超过了队列的最大等待个数时,其他的请求将被忽略或者将不会被处理。
commonutil::MWThreadServer::MWThreadServer(const std::string &strIP, int nPort, int nThreads, commonnet::SelectorThreadPtr &selector, Handler *handler)
:m_ptrListenAcceptor(new commonnet::TcpAcceptor(strIP, nPort,10)),
m_ptrSelectorThread(selector),
m_handler(handler),
m_nThreads(nThreads),
m_vecWorkerThreadPtr(nThreads),
m_vecWorkerThreadControl(nThreads),
m_bContinue(false),
m_nWaitCount(0)
{
m_ptrListenAcceptor->listen();
commonnet::SocketReadyCallbackPtr ptrCallback = new ListenHandler(m_ptrListenAcceptor, m_ptrSelectorThread, this);
//线程注册到 底层 监听 //当listen返回时,会调用 ptrCallback 也就是 ListenHandler的socketReady
m_ptrSelectorThread->_register(m_ptrListenAcceptor->fd(), ptrCallback, commonnet::NeedRead, 0);
}
//startWork在main函数主动调用
void commonutil::MWThreadServer::startWork()
{
m_bContinue = true;
for (int i = 0; i < m_nThreads; ++i) {
//开启工作线程,处理客户端的请求
WorkerThreadPtr ptrWorkerThread = new WorkerThread(i, this, m_handler);
m_vecWorkerThreadPtr[i] = ptrWorkerThread;
m_vecWorkerThreadControl[i] = ptrWorkerThread->start();
}
}
//添加消息保存 这个调用在 ClientHandler中
void commonutil::MWThreadServer::PushPackage(const commonnet::TransceiverPtr& ptrTransceiver, const commonnet::BasicStreamPtr& ptrBasicStream)
{
commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
commonnet::TransceiverBasicStream data;
data.ptrTransceiver = ptrTransceiver;
data.ptrBasicStream = ptrBasicStream;
m_lstTransceiverBasicStreamPtr.push_back(data);
if (m_nWaitCount > 0) {
m_monDataItem.notify();
}
}
//消费消息 注意加锁控制 这个调用在WorkerThread
bool commonutil::MWThreadServer::PopPackage(commonnet::TransceiverPtr& ptrTransceiver, commonnet::BasicStreamPtr& ptrBasicStream)
{
while (m_bContinue) {
commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
if (m_lstTransceiverBasicStreamPtr.size() > 0) {
commonnet::TransceiverBasicStream data = m_lstTransceiverBasicStreamPtr.front();
ptrTransceiver = data.ptrTransceiver;
ptrBasicStream = data.ptrBasicStream;
m_lstTransceiverBasicStreamPtr.pop_front();
return true;
}
++m_nWaitCount;
m_monDataItem.wait();
--m_nWaitCount;
}
return false;
}
1.3 listen之后的accept功能,保存用户数据跟连接标识符
commonutil::MWThreadServer::ListenHandler::ListenHandler(const commonnet::AcceptorPtr &ptrAcceptor,
const commonnet::SelectorThreadPtr & ptrSelectorThread,
commonutil::MWThreadServer *server)
:m_ptrAcceptor(ptrAcceptor),
m_ptrSelectorThread(ptrSelectorThread),
m_server(server)
{
}
commonnet::SocketStatus commonutil::MWThreadServer::ListenHandler::socketReady()
{
//listen后直接 accept
commonnet::TransceiverPtr ptrTransceiver = m_ptrAcceptor->accept();
commonnet::SocketReadyCallbackPtr ptrCallback = new ClientHandler(ptrTransceiver, m_ptrSelectorThread, m_server);
//当accept成功后 会调用 ClientHandler 的socketReady
m_ptrSelectorThread->_register(ptrTransceiver->fd(), ptrCallback, commonnet::NeedRead, 100);
return commonnet::NeedRead;
}
commonutil::MWThreadServer::ClientHandler::ClientHandler(const commonnet::TransceiverPtr &ptrTransceiver,
const commonnet::SelectorThreadPtr &ptrSelectorThread,
commonutil::MWThreadServer *server)
:m_ptrTransceiver(ptrTransceiver),
m_ptrSelectorThread(ptrSelectorThread),
m_server(server)
{
}
commonutil::MWThreadServer::ClientHandler::~ClientHandler()
{
m_ptrTransceiver->close();
}
commonnet::SocketStatus commonutil::MWThreadServer::ClientHandler::socketReady()
{
commonnet::BasicStreamPtr ptrBasicStream;
try {
//直接socket读取出来数据
m_ptrTransceiver->read(ptrBasicStream, false);
ptrBasicStream->i = ptrBasicStream->b.begin();
m_server->PushPackage(m_ptrTransceiver, ptrBasicStream);
return commonnet::NeedRead;
} catch (...) {
//读取信息失败的话,直接把这个socket丢弃
m_ptrSelectorThread->unregister(this);
return commonnet::Finished;
}
}
1.4 工作线程消费信息
commonutil::MWThreadServer::WorkerThread::WorkerThread(int nIndex, MWThreadServer* server, Handler *handler)
: commonutil::Thread("WorkerThread"),
m_bContinue(true),
m_nIndex(nIndex),
m_server(server),
m_handler(handler)
{
}
commonutil::MWThreadServer::WorkerThread::~WorkerThread()
{
}
//消费消息
void commonutil::MWThreadServer::WorkerThread::run()
{
commonnet::TransceiverPtr ptrTransceiver;
commonnet::BasicStreamPtr ptrBasicStream;
while (m_bContinue) {
if (!m_server->PopPackage(ptrTransceiver, ptrBasicStream)) {
break;
}
m_handler->handleRequest(ptrTransceiver, ptrBasicStream);
}
}
2.关闭程序
信号 产生方式 对进程的影响
-
sighup
内核驱动发现终端(或伪终端)关闭给终端对应的控制进程(bash)发 SIGHUP bash 收到 SIGHUP 后,会给各个作业(包括前后台)发送 SIGHUP,然后自己退出前后台的各个作业收到来自 bash 的 SIGHUP 后退出(如果程序会处理 SIGHUP,就不会退出) -
sigint
通过ctrl+c将会对当进程发送此信号信号被当前进程树接收到,也就是说,不仅当前进程会收到信号,它的子进程也会收到 -
sigterm
kill命令不加参数就是发送这个信号只有当前进程收到信号,子进程不会收到。如果当前进程被kill了,那么它的子进程的父进程将会是init,也就是pid为1的进程
2.1 新建线程 检测是否有 直到获取到ctrl+c 或者kill信号
commonutil::CtrlCHandlerCallback callback = _handler->getCallback();
int main(){
commonutil::CtrlCHandler ctrlCHandler;
---{
commonutil::CtrlCHandler::CtrlCHandler(commonutil::CtrlCHandlerCallback callback)
{
StaticMutex::Lock lock(globalMutex);
if (_handler != 0) {
throw CtrlCHandlerException(__FILE__, __LINE__);
} else {
_callback = callback;
_handler = this;
lock.release();
// We block these CTRL+C like signals in the main thread,
// and by default all other threads will inherit this signal
// mask.
sigset_t ctrlCLikeSignals;
sigemptyset(&ctrlCLikeSignals);
sigaddset(&ctrlCLikeSignals, SIGHUP);
sigaddset(&ctrlCLikeSignals, SIGINT);
sigaddset(&ctrlCLikeSignals, SIGTERM);
int rc = pthread_sigmask(SIG_BLOCK, &ctrlCLikeSignals, 0);
assert(rc == 0);
// Joinable thread
rc = pthread_create(&_tid, 0, sigwaitThread, 0);
assert(rc == 0);
}
}
}---
ctrlCHandler.setCallback(InterruptedCallback);
}
2.2 监听命令,停止任务
static void* sigwaitThread(void*)
{
sigset_t ctrlCLikeSignals;
sigemptyset(&ctrlCLikeSignals);
sigaddset(&ctrlCLikeSignals, SIGHUP);
sigaddset(&ctrlCLikeSignals, SIGINT);
sigaddset(&ctrlCLikeSignals, SIGTERM);
for (;;) {
int signal = 0;
int rc = sigwait(&ctrlCLikeSignals, &signal);
//
// Some sigwait() implementations incorrectly return EINTR
// when interrupted by an unblocked caught signal
//
if (rc == EINTR) { //这里注意下
continue;
}
assert(rc == 0);
rc = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
assert(rc == 0);
//这里会调用停止处理任务
commonutil::CtrlCHandlerCallback callback = _handler->getCallback();
if (callback != 0) {
callback(signal);
}
rc = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
assert(rc == 0);
}
return 0;
}
static void InterruptedCallback(int signal)
{
g_App.ptrServer->stopWork();
---{
//停止工作线程
void commonutil::MWThreadServer::stopWork()
{
m_bContinue = false; //
m_ptrListenAcceptor->close();
for (int i = 0; i < m_nThreads; ++i) {
m_vecWorkerThreadPtr[i]->Stop();
---
inline void Stop() {
m_bContinue = false;
}
---
}
for (int i = 0; i < m_nThreads; ++i) {
commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
m_monDataItem.notify();
}
}
}---
}
2.3 停止接收客户端的连接
//前面我们看了怎么停止 处理 任务 , 主任务也不接受 客户端发过来的数据了
//还要把获取连接的线程暂停掉 ptrSelectorThread
int main(int argc, char* argv[])
{
commonutil::CtrlCHandler ctrlCHandler;
ctrlCHandler.setCallback(InterruptedCallback); //设置了回调函数,等到信号发生 会回调 InterruptedCallback 函数
g_App.ptrTimer = new commonutil::Timer();
g_App.ptrSelectorThread = new commonnet::SelectorThread(g_App.ptrTimer);
// ptrSelectorThread 主线程
---{
}---
RequestHandler requestHandler;
g_App.ptrServer = new commonutil::MWThreadServer(ptrProperties->getProperty("Local.AllIP"),
ptrProperties->getPropertyAsInt("T0Service.Port"),
10,
g_App.ptrSelectorThread,
&requestHandler);
g_App.ptrServer->startWork();
g_App.ptrServer->join(); //直到获取到ctrl+c 或者kill信号,会不再获取新的连接请求 。
//stopwork 后 join 退出
g_App.ptrSelectorThread->destroy();
---{
void commonnet::SelectorThread::destroy()
{
Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
_selector.setInterrupt();
}
}---
g_App.ptrTimer->destroy();
g_App.ptrSelectorThread->joinWithThread(); //等到主线程结束任务
---{
void commonnet::SelectorThread::joinWithThread()
{
assert(_destroyed);
if (_thread)
{
_thread->getThreadControl().join();
---{
commonutil::ThreadControl commonutil::Thread::getThreadControl() const
{
commonutil::Mutex::Lock lock(_stateMutex);
if (!_started) {
throw ThreadNotStartedException(__FILE__, __LINE__);
}
return ThreadControl(_thread);
}
}---
}
}
}---
}
void commonutil::MWThreadServer::stopWork()
{
m_bContinue = false;
m_ptrListenAcceptor->close();
for (int i = 0; i < m_nThreads; ++i) {
// 不接受客户端发来的数据
m_vecWorkerThreadPtr[i]->Stop();
}
for (int i = 0; i < m_nThreads; ++i) {
commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
m_monDataItem.notify();
}
}
//处理用户连接的任务也暂停
// isInterrupted():测试线程 Thread 对象 是否已经是中断状态,但不清除状态标志
void commonnet::SelectorThread::run()
{
while (true)
{
try
{
_selector.select();
}
catch(...)
{
//commonutil::Error out;
//out << "exception in selector thread:\n" << ex;
Lock sync(*this);
//判断主线程是否结束
if (_selector.isInterrupted())
{
if (_selector.processInterrupt())
{
continue;
}
//
// There are two possiblities for an interrupt:
//
// 1. The selector thread has been destroyed.
// 2. A callback is being finished (closed).
//
//
// Thread destroyed?
//
if (_destroyed)
{
break;
}
}
//没事就等待1毫秒
commonutil::ThreadControl::sleep(commonutil::Time::milliSeconds(1));
continue;
}
扫描关注我
(转载本站文章请注明作者和出处 Undefined)