C++多线程基础-线程/进程
1.多线程同步方式
Linux下提供了多种方式来处理线程同步,最常用的是互斥锁、 读写锁、条件变量和信号量。
1.互斥锁(mutex)
通过锁机制实现线程间的同步。
初始化锁。在Linux下,线程的互斥量数据类型是pthread_mutex_t。在使用前,要对它进行初始化。
静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);
加锁。对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
销毁锁。锁在是使用完成后,需要进行销毁以释放资源。
int pthread_mutex_destroy(pthread_mutex *mutex);
2.条件变量(cond)
跟互斥锁不同,条件变量是用来等待而不是用来上锁的。·条件变量用来自动阻塞一个线程·,
直到某特殊情况发生为止。通常条件变量和互斥锁·同时使用·。条件变量分为两部分: 条件和变量。
条件本身是由互斥量保护的。线程在改变条件状态前·先要锁住互斥量·。
条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,
主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起; 另一个线程使”条件成立”(给出条件成立信号)。条件的检测是在互斥锁的保护下进行的。
如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。
如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,
重新获得互斥锁,重新评价条件。
如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
初始化条件变量。
静态态初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
动态初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
等待条件成立。释放锁,同时阻塞等待条件变量为真才行。timewait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
激活条件变量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)
int pthread_cond_signal(pthread_cond_t *cond);
//激活一个等待该条件的线程(存在多个等待线程时按入队顺序激活其中一个)
int pthread_cond_broadcast(pthread_cond_t *cond);
//解除所有线程的阻塞
清除条件变量。无线程等待,否则返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);
//只有在没有线程在该条件变量上等待的时候才能销毁这个条件变量,否则返回EBUSY
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
if(没有条件信号)
{
(1)pthread_mutex_unlock (mutex); // 因为用户在函数外面已经加锁了(这是使用约定),但是在没有信号的情况下为了让其他线程也能等待cond,必须解锁。
(2) 阻塞当前线程,等待条件信号(当然应该是类似于中断触发的方式等待,而不是软件轮询的方式等待)... 有信号就继续执行后面。
(3) pthread_mutex_lock (mutex); // 因为用户在函数外面要解锁(这也是使用约定),所以要与1呼应加锁,保证用户感觉依然是自己加锁、自己解锁。
}
...
}
3.信号量(sem)
如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。信号量函数的名字都以”sem_“打头。线程使用的基本信号量函数有四个。
信号量初始化。
int sem_init (sem_t *sem , int pshared, unsigned int value);
这是对由sem指定的信号量进行初始化,设置好它的共享选项(linux 只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。
等待信号量。给信号量减1,然后等待直到信号量的值大于0。
int sem_wait(sem_t *sem);
释放信号量。信号量值加1。并通知其他等待线程。
int sem_post(sem_t *sem);
销毁信号量。我们用完信号量后都它进行清理。归还占有的一切资源。
int sem_destroy(sem_t *sem);
4.读写锁(reader-writer lock)
读写锁与互斥锁非常相似。r、RW lock有三种状态:
共享读取锁(shared-read), 互斥写入锁(exclusive-write lock), 打开(unlock)。
后两种状态与之前的互斥锁两种状态完全相同。
一个unlock的RW lock可以被某个线程获取R锁或者W锁。
如果被一个线程获得R锁,RW lock可以被其它线程继续获得R锁,而不必等待该线程释放R锁。但是,如果此时有其它线程想要获得W锁,它必须等到所有持有共享读取锁的线程释放掉各自的R锁。
如果一个锁被一个线程获得W锁,那么其它线程,无论是想要获取R锁还是W锁,都必须等待该线程释放W锁。
2.进程通信
现在linux使用的进程间通信方式:
管道(pipe)和有名管道(FIFO)- 信号(signal)
消息队列- 共享内存
- 信号量
套接字(socket)
消息队列的使用
msg_queue MsgSendQueue;
msg_queue MsgRecvQueue;
if ( MsgSendQueue.open_queue(m_iMsqKeyVal) < 0 )
{
pCommLog->add_log("打开发送队列失败键值=%ld", m_iMsqKeyVal);
pCommLog->write_log();
exit(1);
}
if ( MsgRecvQueue.open_queue(m_iMsqKeyVal+1) < 0 )
{
pCommLog->add_log("打开发送队列失败键值=%ld", m_iMsqKeyVal+1);
pCommLog->write_log();
exit(1);
}
下面开始从发送队列接收数据,与银行后台长连接发送报文(注意要上互斥锁),之后接收数据返回给写队列
while (true)
{
memset(&MsqBuff, 0, sizeof(struct mymsgbuf));
pCommLog->write_log();
pCommLog->add_log("开始接收消息队列数据");
if ( MsgSendQueue.recv_message(0, &MsqBuff) < 0 )
{
continue;
}
memset(sLenBuff, 0, sizeof(sLenBuff));
memcpy(sLenBuff, MsqBuff.sBuffer, 4);
nLen = atoi(sLenBuff);
pCommLog->add_log("接收消息队列数据成功,发送数据到主机");
pCommLog->add_log_hex(string(MsqBuff.sBuffer, nLen+4));
if ((iRet = tcpclient_threadpool::TcpClientLongComm.status()) == 0)
{
//写数据上锁,防止socket操作冲突
TcpSendMutex.lock();
iRet = tcpclient_threadpool::TcpClientLongComm.write(MsqBuff.sBuffer, nLen+4, 30);
TcpSendMutex.unlock();
}
...
}
3.线程同步示例分析
项目示例–条件变量与互斥锁组合
说明一下传递的参数相当于是全局变量:
static list<ks_socket *> socketList;
1.开始循环接受客户端请求并建立连接
while (true)
{
newSocket = m_pSockServer->accept();
if (newSocket == NULL)
{
continue;
}
//终端建立连接成功,线程同步上锁
mutex_lock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
//将连接对象插入静态链表"通知交易线程"
ks_tcpthreadpool::socketList.push_back(newSocket);
//通知正在等待TCP交易线程
cond_signal(ks_tcpthreadpool::tcpThreadPool.thread_cond);
mutex_unlock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
}
2.交易线程开始交易
while(true)
{
mutex_lock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
//如果socket对象链表为空,则等待监听线程建立连接
while(ks_tcpthreadpool::socketList.empty())
{
cond_wait(ks_tcpthreadpool::tcpThreadPool.thread_cond, \
ks_tcpthreadpool::tcpThreadPool.thread_mutex);
}
//cout << "交易线程开始" << endl;
//获取已连接的客户端套接字指针对象
pSocket = ks_tcpthreadpool::socketList.front();
ks_tcpthreadpool::socketList.pop_front();
++ks_tcpthreadpool::tcpThreadPool.busy_threads;
mutex_unlock(ks_tcpthreadpool::tcpThreadPool.thread_mutex);
string strDateTime;
ks_tools::get_datetime(strDateTime, "YYYYMMDD HH24:MI:SS:LL");
pTcpLog->add_log("---------------------------------------------------------");
pTcpLog->add_log("[%s] 终端开始接入", strDateTime.c_str());
//调用交易流程部分
//cout << "开始调用交易函数" << endl;
thread_server(*this);
//交易已处理完成,关闭套接字,并释放对象内存
pSocket->close();
delete pSocket;
ks_tools::get_datetime(strDateTime, "YYYYMMDD HH24:MI:SS:LL");
pTcpLog->add_log("[%s] 断开终端连接", strDateTime.c_str());
pTcpLog->add_log("---------------------------------------------------------\n\n");
pTcpLog->write_log();
--ks_tcpthreadpool::tcpThreadPool.busy_threads;
}
4.附录
扫描关注我
(转载本站文章请注明作者和出处 Undefined)