C++多线程基础

Linux 线程/进程 编程 2016/12/13

C++多线程基础-线程/进程

1.多线程同步方式

Linux下提供了多种方式来处理线程同步,最常用的是互斥锁读写锁条件变量信号量

1.互斥锁(mutex)

通过锁机制实现线程间的同步。

初始化锁。在Linux下,线程的互斥量数据类型是pthread_mutex_t。在使用前,要对它进行初始化。

静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

动态分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);

加锁。对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁。

int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);

解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。

int pthread_mutex_unlock(pthread_mutex_t *mutex);

销毁锁。锁在是使用完成后,需要进行销毁以释放资源。

int pthread_mutex_destroy(pthread_mutex *mutex);

2.条件变量(cond)

跟互斥锁不同,条件变量是用来等待而不是用来上锁的。·条件变量用来自动阻塞一个线程·, 直到某特殊情况发生为止。通常条件变量和互斥锁·同时使用·。条件变量分为两部分: 条件和变量条件本身是由互斥量保护的。线程在改变条件状态前·先要锁住互斥量·。 条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,

主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起; 另一个线程使”条件成立”(给出条件成立信号)。条件的检测是在互斥锁的保护下进行的。

如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。 如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程, 重新获得互斥锁,重新评价条件。

如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

初始化条件变量。

静态态初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;

动态初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

等待条件成立。释放锁,同时阻塞等待条件变量为真才行。timewait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);

激活条件变量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)

int pthread_cond_signal(pthread_cond_t *cond);
//激活一个等待该条件的线程(存在多个等待线程时按入队顺序激活其中一个)  
int pthread_cond_broadcast(pthread_cond_t *cond); 
//解除所有线程的阻塞

清除条件变量。无线程等待,否则返回EBUSY

int pthread_cond_destroy(pthread_cond_t *cond);
//只有在没有线程在该条件变量上等待的时候才能销毁这个条件变量,否则返回EBUSY

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
  if(没有条件信号)
  {
	 (1)pthread_mutex_unlock (mutex); // 因为用户在函数外面已经加锁了(这是使用约定),但是在没有信号的情况下为了让其他线程也能等待cond,必须解锁。
	 (2) 阻塞当前线程,等待条件信号(当然应该是类似于中断触发的方式等待,而不是软件轮询的方式等待)... 有信号就继续执行后面。
	 (3) pthread_mutex_lock (mutex); // 因为用户在函数外面要解锁(这也是使用约定),所以要与1呼应加锁,保证用户感觉依然是自己加锁、自己解锁。
  }      
  ...
}

3.信号量(sem)

如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。信号量函数的名字都以”sem_“打头。线程使用的基本信号量函数有四个。

信号量初始化。

int sem_init (sem_t *sem , int pshared, unsigned int value);

这是对由sem指定的信号量进行初始化,设置好它的共享选项(linux 只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。

等待信号量。给信号量减1,然后等待直到信号量的值大于0。

int sem_wait(sem_t *sem);

释放信号量。信号量值加1。并通知其他等待线程。

int sem_post(sem_t *sem);

销毁信号量。我们用完信号量后都它进行清理。归还占有的一切资源。

int sem_destroy(sem_t *sem);

4.读写锁(reader-writer lock)

读写锁与互斥锁非常相似。r、RW lock有三种状态: 共享读取锁(shared-read), 互斥写入锁(exclusive-write lock), 打开(unlock)。 后两种状态与之前的互斥锁两种状态完全相同。

一个unlock的RW lock可以被某个线程获取R锁或者W锁。

如果被一个线程获得R锁,RW lock可以被其它线程继续获得R锁,而不必等待该线程释放R锁。但是,如果此时有其它线程想要获得W锁,它必须等到所有持有共享读取锁的线程释放掉各自的R锁。

如果一个锁被一个线程获得W锁,那么其它线程,无论是想要获取R锁还是W锁,都必须等待该线程释放W锁。


2.进程通信

现在linux使用的进程间通信方式:

  1. 管道(pipe)和有名管道(FIFO)
  2. 信号(signal)
  3. 消息队列
  4. 共享内存
  5. 信号量
  6. 套接字(socket)

消息队列的使用

msg_queue MsgSendQueue;
msg_queue MsgRecvQueue;

if ( MsgSendQueue.open_queue(m_iMsqKeyVal) < 0 )
{
	pCommLog->add_log("打开发送队列失败键值=%ld", m_iMsqKeyVal);
	pCommLog->write_log();
	exit(1);
}

if ( MsgRecvQueue.open_queue(m_iMsqKeyVal+1) < 0 )
{
	pCommLog->add_log("打开发送队列失败键值=%ld", m_iMsqKeyVal+1);
	pCommLog->write_log();
	exit(1);
}

下面开始从发送队列接收数据,与银行后台长连接发送报文(注意要上互斥锁),之后接收数据返回给写队列

while (true)
{
	memset(&MsqBuff, 0, sizeof(struct mymsgbuf));
	pCommLog->write_log();
	pCommLog->add_log("开始接收消息队列数据");
	
	if ( MsgSendQueue.recv_message(0, &MsqBuff) < 0 )
	{
		continue;
	}
	
	memset(sLenBuff, 0, sizeof(sLenBuff));
	memcpy(sLenBuff, MsqBuff.sBuffer, 4);
	nLen = atoi(sLenBuff);
	pCommLog->add_log("接收消息队列数据成功,发送数据到主机");
	pCommLog->add_log_hex(string(MsqBuff.sBuffer, nLen+4));

	if ((iRet = tcpclient_threadpool::TcpClientLongComm.status()) == 0)
	{
		//写数据上锁,防止socket操作冲突
		TcpSendMutex.lock();
	    iRet = tcpclient_threadpool::TcpClientLongComm.write(MsqBuff.sBuffer, nLen+4, 30);
	    TcpSendMutex.unlock();
    }
  ...
}

3.线程同步示例分析

项目示例–条件变量与互斥锁组合

说明一下传递的参数相当于是全局变量:

static list<ks_socket *> socketList;

1.开始循环接受客户端请求并建立连接

while (true)
{	
	newSocket = m_pSockServer->accept();
	if (newSocket == NULL)
	{
		continue;
	}
	//终端建立连接成功,线程同步上锁
	mutex_lock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
	//将连接对象插入静态链表"通知交易线程"
	ks_tcpthreadpool::socketList.push_back(newSocket);
	//通知正在等待TCP交易线程
	cond_signal(ks_tcpthreadpool::tcpThreadPool.thread_cond);
	mutex_unlock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
}

2.交易线程开始交易

while(true)	
{
	mutex_lock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
	//如果socket对象链表为空,则等待监听线程建立连接
	while(ks_tcpthreadpool::socketList.empty())
	{
		cond_wait(ks_tcpthreadpool::tcpThreadPool.thread_cond, \
			ks_tcpthreadpool::tcpThreadPool.thread_mutex);
	}
	//cout << "交易线程开始" << endl;
	//获取已连接的客户端套接字指针对象
	pSocket = ks_tcpthreadpool::socketList.front();
	ks_tcpthreadpool::socketList.pop_front();
	++ks_tcpthreadpool::tcpThreadPool.busy_threads;
	mutex_unlock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);

	string strDateTime;
	ks_tools::get_datetime(strDateTime, "YYYYMMDD HH24:MI:SS:LL");		
	pTcpLog->add_log("---------------------------------------------------------");
	pTcpLog->add_log("[%s] 终端开始接入", strDateTime.c_str());
	//调用交易流程部分
	//cout << "开始调用交易函数" << endl;
	thread_server(*this);
	
	//交易已处理完成,关闭套接字,并释放对象内存
	pSocket->close();
	delete pSocket;
	
	ks_tools::get_datetime(strDateTime, "YYYYMMDD HH24:MI:SS:LL");	
	pTcpLog->add_log("[%s] 断开终端连接", strDateTime.c_str());
	pTcpLog->add_log("---------------------------------------------------------\n\n");		
	pTcpLog->write_log();
	--ks_tcpthreadpool::tcpThreadPool.busy_threads;
}

4.附录

  1. Linux下线程同步的几种方法
  2. Linux进程间通信
  3. Linux 线程属性函数总结
  4. 关于windows线程同步的四种方法
  5. 进程与线程的区别


扫描关注我

(转载本站文章请注明作者和出处 Undefined

Post Directory