项目流程(1)

Undefined 深圳 2018/07/27

1.启动程序

1.1 开启单线程接受用户请求

//开启一个线程 获取是否有用户连接
g_App.ptrSelectorThread = new commonnet::SelectorThread(g_App.ptrTimer);

//开启任务 处理任务 handleRequest
RequestHandler requestHandler; 

g_App.ptrServer = new commonutil::MWThreadServer(ptrProperties->getProperty("Local.AllIP"),
				 ptrProperties->getPropertyAsInt("tfb_usermgr.Port"),
				 10,
				 g_App.ptrSelectorThread,
				 &requestHandler);
				 
g_App.ptrServer->startWork(); //开始工作 处理任务 
g_App.ptrServer->join(); //等待信号停止任务
g_App.ptrSelectorThread->destroy(); //停止接单
g_App.ptrTimer->destroy();
g_App.ptrSelectorThread->joinWithThread();				 
				 

1.2 监听端口 以及工作线程处理任务

//(new commonnet::TcpAcceptor(strIP, nPort,10)  新建一个tcp连接获取消息,
//参数10为 listen参数。(::listen(fd, backlog) == SOCKET_ERROR)
//服务器监听时,在每次处理一个客户端的连接时是需要一定时间的,这个时间非常的短(也许只有1ms 或者还不到),
//但这个时间还是存在的。而这个backlog 存在的意义就是:在这段时间里面除了第一个连接请求是正在进行处理以外,
//其他的连接请求都在请求队列中等待,而如果超过了队列的最大等待个数时,其他的请求将被忽略或者将不会被处理。

commonutil::MWThreadServer::MWThreadServer(const std::string &strIP, int nPort, int nThreads, commonnet::SelectorThreadPtr &selector, Handler *handler)
        :m_ptrListenAcceptor(new commonnet::TcpAcceptor(strIP, nPort,10)),
        m_ptrSelectorThread(selector),
        m_handler(handler),
        m_nThreads(nThreads),
        m_vecWorkerThreadPtr(nThreads),
        m_vecWorkerThreadControl(nThreads),
        m_bContinue(false),
        m_nWaitCount(0)
{
        m_ptrListenAcceptor->listen();
        commonnet::SocketReadyCallbackPtr ptrCallback = new ListenHandler(m_ptrListenAcceptor, m_ptrSelectorThread, this);
		//线程注册到 底层 监听 //当listen返回时,会调用 ptrCallback 也就是 ListenHandler的socketReady
        m_ptrSelectorThread->_register(m_ptrListenAcceptor->fd(), ptrCallback, commonnet::NeedRead, 0);
}

//startWork在main函数主动调用
void commonutil::MWThreadServer::startWork()
{
        m_bContinue = true;
        for (int i = 0; i < m_nThreads; ++i) {
				//开启工作线程,处理客户端的请求
                WorkerThreadPtr ptrWorkerThread = new WorkerThread(i, this, m_handler);
                m_vecWorkerThreadPtr[i] = ptrWorkerThread;
                m_vecWorkerThreadControl[i] = ptrWorkerThread->start();
        }

}
//添加消息保存 这个调用在 ClientHandler中
void commonutil::MWThreadServer::PushPackage(const commonnet::TransceiverPtr& ptrTransceiver, const commonnet::BasicStreamPtr& ptrBasicStream)
{
        commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
        commonnet::TransceiverBasicStream data;
        data.ptrTransceiver = ptrTransceiver;
        data.ptrBasicStream = ptrBasicStream;

        m_lstTransceiverBasicStreamPtr.push_back(data);

        if (m_nWaitCount > 0) {
                m_monDataItem.notify();
        }
}
//消费消息  注意加锁控制 这个调用在WorkerThread
bool commonutil::MWThreadServer::PopPackage(commonnet::TransceiverPtr& ptrTransceiver, commonnet::BasicStreamPtr& ptrBasicStream)
{
        while (m_bContinue) {
                commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
                if (m_lstTransceiverBasicStreamPtr.size() > 0) {
                        commonnet::TransceiverBasicStream data = m_lstTransceiverBasicStreamPtr.front();
                        ptrTransceiver = data.ptrTransceiver;
                        ptrBasicStream = data.ptrBasicStream;
                        m_lstTransceiverBasicStreamPtr.pop_front();
                        return true;
                }

                ++m_nWaitCount;
                m_monDataItem.wait();
                --m_nWaitCount;
        }
        return false;
}


1.3 listen之后的accept功能,保存用户数据跟连接标识符


commonutil::MWThreadServer::ListenHandler::ListenHandler(const commonnet::AcceptorPtr &ptrAcceptor, 
		   const commonnet::SelectorThreadPtr & ptrSelectorThread,
		   commonutil::MWThreadServer *server)
:m_ptrAcceptor(ptrAcceptor),
m_ptrSelectorThread(ptrSelectorThread),
m_server(server)
{

}

commonnet::SocketStatus commonutil::MWThreadServer::ListenHandler::socketReady()
{		
		//listen后直接 accept
        commonnet::TransceiverPtr ptrTransceiver = m_ptrAcceptor->accept();
        commonnet::SocketReadyCallbackPtr ptrCallback = new ClientHandler(ptrTransceiver, m_ptrSelectorThread, m_server);
		//当accept成功后 会调用  ClientHandler 的socketReady
        m_ptrSelectorThread->_register(ptrTransceiver->fd(), ptrCallback, commonnet::NeedRead, 100);
        return commonnet::NeedRead;
}


commonutil::MWThreadServer::ClientHandler::ClientHandler(const commonnet::TransceiverPtr &ptrTransceiver, 
		const commonnet::SelectorThreadPtr &ptrSelectorThread,
		commonutil::MWThreadServer *server)
        :m_ptrTransceiver(ptrTransceiver),
        m_ptrSelectorThread(ptrSelectorThread),
        m_server(server)
{

}
commonutil::MWThreadServer::ClientHandler::~ClientHandler()
{
        m_ptrTransceiver->close();
}

commonnet::SocketStatus commonutil::MWThreadServer::ClientHandler::socketReady()
{
        commonnet::BasicStreamPtr ptrBasicStream;
        try {
				//直接socket读取出来数据
                m_ptrTransceiver->read(ptrBasicStream, false);

                ptrBasicStream->i = ptrBasicStream->b.begin();
                m_server->PushPackage(m_ptrTransceiver, ptrBasicStream);

                return commonnet::NeedRead;
        } catch (...) {
				//读取信息失败的话,直接把这个socket丢弃
                m_ptrSelectorThread->unregister(this);
                return commonnet::Finished;
        }
}

1.4 工作线程消费信息

commonutil::MWThreadServer::WorkerThread::WorkerThread(int nIndex, MWThreadServer* server, Handler *handler)
        : commonutil::Thread("WorkerThread"),
        m_bContinue(true),
        m_nIndex(nIndex),
        m_server(server),
        m_handler(handler)
{
}

commonutil::MWThreadServer::WorkerThread::~WorkerThread()
{
}
//消费消息
void commonutil::MWThreadServer::WorkerThread::run()
{
        commonnet::TransceiverPtr ptrTransceiver;
        commonnet::BasicStreamPtr ptrBasicStream;
        while (m_bContinue) {
                if (!m_server->PopPackage(ptrTransceiver, ptrBasicStream)) {
                        break;
                }
                m_handler->handleRequest(ptrTransceiver, ptrBasicStream);
        }
}

2.关闭程序

信号 产生方式 对进程的影响

  • sighup 内核驱动发现终端(或伪终端)关闭 给终端对应的控制进程(bash)发 SIGHUP bash 收到 SIGHUP 后,会给各个作业(包括前后台)发送 SIGHUP,然后自己退出 前后台的各个作业收到来自 bash 的 SIGHUP 后退出(如果程序会处理 SIGHUP,就不会退出)

  • sigint 通过ctrl+c将会对当进程发送此信号 信号被当前进程树接收到,也就是说,不仅当前进程会收到信号,它的子进程也会收到

  • sigterm kill命令不加参数就是发送这个信号 只有当前进程收到信号,子进程不会收到。如果当前进程被kill了,那么它的子进程的父进程将会是init,也就是pid为1的进程

2.1 新建线程 检测是否有 直到获取到ctrl+c 或者kill信号

commonutil::CtrlCHandlerCallback callback = _handler->getCallback();

int main(){
	commonutil::CtrlCHandler ctrlCHandler;
	
	---{
		commonutil::CtrlCHandler::CtrlCHandler(commonutil::CtrlCHandlerCallback callback)
		{
			StaticMutex::Lock lock(globalMutex);
			if (_handler != 0) {
				throw CtrlCHandlerException(__FILE__, __LINE__);
			} else {
				_callback = callback;
				_handler = this;
				lock.release();
				
				// We block these CTRL+C like signals in the main thread,
				// and by default all other threads will inherit this signal
				// mask.
				
				sigset_t ctrlCLikeSignals;
				sigemptyset(&ctrlCLikeSignals);
				sigaddset(&ctrlCLikeSignals, SIGHUP);
				sigaddset(&ctrlCLikeSignals, SIGINT);
				sigaddset(&ctrlCLikeSignals, SIGTERM);
				int rc = pthread_sigmask(SIG_BLOCK, &ctrlCLikeSignals, 0);
				assert(rc == 0);

				// Joinable thread
				rc = pthread_create(&_tid, 0, sigwaitThread, 0);
				
				assert(rc == 0);
			}
		}
		
	}---
	
	ctrlCHandler.setCallback(InterruptedCallback);
}

2.2 监听命令,停止任务


static void* sigwaitThread(void*)
{
	sigset_t ctrlCLikeSignals;
	sigemptyset(&ctrlCLikeSignals);
	sigaddset(&ctrlCLikeSignals, SIGHUP); 
	sigaddset(&ctrlCLikeSignals, SIGINT);
	sigaddset(&ctrlCLikeSignals, SIGTERM);

	for (;;) {
		int signal = 0;
		int rc = sigwait(&ctrlCLikeSignals, &signal);

		//
		// Some sigwait() implementations incorrectly return EINTR
		// when interrupted by an unblocked caught signal
		//
		if (rc == EINTR) {   //这里注意下
			continue;
		}
		assert(rc == 0);
		
		rc = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
		assert(rc == 0);
		
		//这里会调用停止处理任务
		commonutil::CtrlCHandlerCallback callback = _handler->getCallback();
		
		if (callback != 0) {
			callback(signal);
		}

		rc = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0);
		assert(rc == 0);
	}
	return 0;
}

				

static void InterruptedCallback(int signal)
{
    g_App.ptrServer->stopWork();
	
	---{  
		//停止工作线程
		void commonutil::MWThreadServer::stopWork()
		{
				m_bContinue = false; // 
				m_ptrListenAcceptor->close();
				for (int i = 0; i < m_nThreads; ++i) {
						m_vecWorkerThreadPtr[i]->Stop();
						
						---
							inline void Stop() {
									m_bContinue = false;
							}
						---
						
				}
				for (int i = 0; i < m_nThreads; ++i) {
						commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
						m_monDataItem.notify();
				}
		}
		
	}---
}  
  

2.3 停止接收客户端的连接

//前面我们看了怎么停止 处理 任务 , 主任务也不接受 客户端发过来的数据了
//还要把获取连接的线程暂停掉 ptrSelectorThread
int main(int argc, char* argv[])
{
	commonutil::CtrlCHandler ctrlCHandler;
	ctrlCHandler.setCallback(InterruptedCallback); //设置了回调函数,等到信号发生  会回调 InterruptedCallback 函数
	

	g_App.ptrTimer = new commonutil::Timer();
	g_App.ptrSelectorThread = new commonnet::SelectorThread(g_App.ptrTimer);
	
	//  ptrSelectorThread 主线程
	---{
		
	
	}---

	RequestHandler requestHandler;
	g_App.ptrServer = new commonutil::MWThreadServer(ptrProperties->getProperty("Local.AllIP"),
										ptrProperties->getPropertyAsInt("T0Service.Port"),
										10,
										g_App.ptrSelectorThread,
										&requestHandler);
	g_App.ptrServer->startWork();
	g_App.ptrServer->join();  //直到获取到ctrl+c 或者kill信号,会不再获取新的连接请求 。
	//stopwork 后 join 退出
	g_App.ptrSelectorThread->destroy();
	
	---{
			void commonnet::SelectorThread::destroy()
			{
				Lock sync(*this);
				assert(!_destroyed);
				_destroyed = true;
				_selector.setInterrupt();
			}
	}---
	
	
	g_App.ptrTimer->destroy();
	g_App.ptrSelectorThread->joinWithThread();   //等到主线程结束任务
	
	---{
		
		void commonnet::SelectorThread::joinWithThread()
		{
			assert(_destroyed);
			if (_thread)
			{
				_thread->getThreadControl().join();
				
				---{
						commonutil::ThreadControl commonutil::Thread::getThreadControl() const
						{
							commonutil::Mutex::Lock lock(_stateMutex);
							if (!_started) {
								throw ThreadNotStartedException(__FILE__, __LINE__);
							}
							return ThreadControl(_thread);
						}
				}---
	
				
			}
		}

	}---
	
}


void commonutil::MWThreadServer::stopWork()
{
        m_bContinue = false;
		m_ptrListenAcceptor->close();
        for (int i = 0; i < m_nThreads; ++i) {
				//	不接受客户端发来的数据
                m_vecWorkerThreadPtr[i]->Stop();
        }
        for (int i = 0; i < m_nThreads; ++i) {
                commonutil::Monitor<commonutil::Mutex>::Lock autolock(m_monDataItem);
                m_monDataItem.notify();
        }
}

//处理用户连接的任务也暂停
// isInterrupted():测试线程 Thread 对象 是否已经是中断状态,但不清除状态标志
void commonnet::SelectorThread::run()
{
	while (true)
	{
		try
		{
			_selector.select();
		}
		catch(...)
		{
			//commonutil::Error out;
			//out << "exception in selector thread:\n" << ex;
			Lock sync(*this);
			//判断主线程是否结束
			if (_selector.isInterrupted())
			{
				if (_selector.processInterrupt())
				{
					continue;
				}

				//
				// There are two possiblities for an interrupt:
				//
				// 1. The selector thread has been destroyed.
				// 2. A callback is being finished (closed).
				//
				
				//
				// Thread destroyed?
				//
				if (_destroyed)
				{
					break;
				}
			}
			//没事就等待1毫秒
			commonutil::ThreadControl::sleep(commonutil::Time::milliSeconds(1));
			continue;
		}



扫描关注我

(转载本站文章请注明作者和出处 Undefined

Post Directory