本文共 7659 字,大约阅读时间需要 25 分钟。
概述
一般的多线程任务大多是避免主线程阻塞(界面卡死),开销线程的次数少。现在有一个光伏监控系统用于采集光伏板的发电功率,每次接收完网络数据包就会进行数据库的写操作。为了不占用主 GUI 线程,这一过程都在新线程里完成。但是成百上千的光伏板时时刻刻都会传送数据过来,如果每一次的执行都完整的创建线程-执行-销毁线程,可见这对于资源的消耗是何等之高。根据《Qt 多线程编程之敲开 QThread 类的大门》中的「3.1 开多少个线程比较合适?」所讲“频繁的切换线程会使性能降低”,尤其是资源密集型操作,如需要 CPU 进行大量的运算任务。为了提高效率,在不增加资源(如 CPU 核数)的情况下,如何利用现有资源成了唯一的解决思路。线程池就是在这样的背景下诞生的。
任务的统一封装形式:QRunnable
为什么要统一封装形式?从直观的角度来看,线程池里有若干个线程,我们只要将执行的任务“扔”到这个“池子”里就可以了。在 Qt 中,所有需要线程池的任务都用 QRunnable 统一封装起来,为什么要这样做而不是类似 QObject::moveToThread() 的方式呢?其实这样做只是为了方便管理而已。上百个任务如果都是统一的类,那就可以用一个数组来管理了。
除此以外,能用到线程池的任务,基本上功能单一且并不需要和其他线程进行信号槽的通信。所以基于这样的场景,Qt 没有将 QRunnable 设计成 QObject 的子类,也是最大限度地精简“任务”负担。
如何使用 QRunnable?
QRunnable()
virtual ~QRunnable() bool autoDelete() const void setAutoDelete(bool autoDelete) virtual void run() = 0 使用 QRunnable 非常简单,继承后重写 run() 函数,然后“扔”给线程池即可。成员函数也就 autoDelete() 这么一种,用于设置对象的所有权。默认设置为 true,线程结束后会自动删除该对象,也就是说扔到线程里就不用管何时去删除的问题了。简单的使用示例如下代码所示:class HelloWorldTask : public QRunnable{ void run() override { qDebug() << "Hello world"; }};
HelloWorldTask *task = new HelloWorldTask();QThreadPool::globalInstance->start(task);
2.3 如何使用信号槽通信?
因为 QRunnable 不是 QObject 的子类,因此不能使用信号槽这种元对象系统特性。如果使用场景需要信号槽通信,解决办法就是采用 QObject、QRunnable 双继承形式,例如下列代码:
class WorkRunnable : public QObject, public QRunnable{ Q_OBJECTpublic: WorkRunnable();protected: void run();signals: void result(const QString &str);public slots: void doSomething(const QString &str);}
采用这种方法需要注意一点,就是 QObject 必须放在 QRunnable 前面,列表初始化时也同样如此。
线程池的本质是什么?
线程池这个概念不属于操作系统的范畴,存粹就是设计者自己写的一套管理工具。从感性的角度想,既然线程池称为“池子”,那这个“池子”里起码有「线程」和「任务」两种数据。因此 QThreadPool 类有着两种私有成员变量:QRunnable 和 QThread。但是为了便于管理很多的任务和线程,线程池将这两种数据进行了二次封装并用容器存储了起来,而线程池的本质就是协调两种容器之间数据的交互。线程池中的两种数据之一:任务
首先,适用于线程复用的任务大多都是简单的任务,而线程池为了管理这些数量多的任务,将它们按照优先级进行了分类,同一优先级的任务会放到一个组中,在 QThreadPool 源码中以 QueuePage 类来表示一组任务,从以下代码可以看出最多存储255个任务。QueuePage 有一个私有变量 m_priority 用于表示该组的优先级。
class QueuePage {public: enum { MaxPageSize = 256 }; QueuePage(QRunnable *runnable, int pri) : m_priority(pri) { push(runnable); }private: int m_priority = 0; int m_firstIndex = 0; int m_lastIndex = -1; QRunnable *m_entries[MaxPageSize];};
其次,不同优先级的任务组会放到 QVector 容器中,源码如下所示:
class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate{public: QVectorqueue; // 任务队列};
正如上述源码所示,如果任务没有统一的形式,怎么管理就是个比较棘手的事。所以将执行的任务统一成 QRunnable 的形式管理起来非常方便。
线程池中的两种数据之一:线程
因为 QThread 类无法满足被线程池管理的需求,因此线程池中的线程也进行了二次封装,由继承于 QThread 的 QThreadPoolThread 类表示。该类在 QThread 的基础上扩展了两个私有变量:QRunnable * 和 QThreadPoolPrivate *。一个用于存储当前线程要执行的任务,一个用于存储线程池的指针。如下代码所示:
class QThreadPoolThread : public QThread{public: QThreadPoolThread(QThreadPoolPrivate *manager); void run() override; void registerThreadInactive(); QWaitCondition runnableReady; QThreadPoolPrivate *manager; QRunnable *runnable;};
我们知道线程启动后会先执行 run() 函数,在 QThreadPoolThread 的 run() 函数中,线程执行完一个任务后会从任务列表中获取优先级靠前的任务组,然后从该任务组中循环取得任务来执行。
线程池如何处理这两种数据?
如果仅仅是使用的话,我们只要将 QRunnable 任务“丢”进线程池中就可以了。在线程池内部,提交的任务会经历重重关卡,最终找到属于自己的归属。一般来说,任务的最终归属有4种:开辟的新线程中、被扔到人物列表里、插入即将启动的线程中、利用执行完任务的线程中。无论哪一种归属,都会在一个线程中被执行。
既然我们的目的是减少线程开销,那么提交任务后肯定先看看线程池中是否有线程,因为默认的线程池里是空的,既没有任务也没有线程。没有线程的话,那就开辟一个新的线程,然后执行我们的任务。这是任务的第一个归属。
如果有线程,那自然要利用已有的线程。但是我们知道并不是开越多的线程越好,这时候如果工作的线程过多,我们还是没法利用已有的线程来执行我们的任务,那么我们只能将任务存入任务列表中,等待被这些活跃的线程“临幸”。这是任务的第二个归属。
这里产生一个问题,有多少个工作线程才算过多?在 tooManaThreadActive() 源码中我们可以找到答案,如下列代码。假如我们有一台双核超线程 CPU 的台式机,一般最大线程数量(maxThreadCount)为 2*2=4 个。那么下列代码的含义就是:在工作线程数量大于4个的基础上,如果仍然大于 reservedThreads + 1 个,那么就属于线程过多。从这里我们可以看出,如果设置的“保留线程数”小的话,对线程池不会有什么影响,只有大于 maxThreadCount -1 时才会判定该线程池中的工作线程过多。也就是说条件为:activeThreadCount > maxThreadCount 并且 activeThreadCount > reservedThreads + 1。
bool QThreadPoolPrivate::tooManyThreadsActive() const{ const int activeThreadCount = this->activeThreadCount(); return activeThreadCount > maxThreadCount && (activeThreadCount - reservedThreads) > 1;}
这里产生第二个问题,线程过多了会有什么影响?因为每一个线程都会循环的从任务列表中取任务来执行,每次执行完一个任务后线程都会有个“小动作”,即检查下当前线程池是否工作线程过多。毕竟执行任务的期间,我们还有可能在线程池里开辟新的线程来执行任务。如果工作线程过多,那么该线程(QT和readPoolThread)会存入 expiredThreads 队列中;如果不多,则会存入 waitingThreads 队列中。如果仍然不清楚 waitingThreads 和 expiredThreads 的区别,可以这样理解:所有任务都执行完的线程会存入 waitingThreads 容器中,而由于工作线程过多导致无法再继续执行任务的线程会存入 expiredThreads 容器中。至此,线程的归属我们也清楚了,参见 QThreadPool 线程池类的私有变量:
class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate{ Q_DECLARE_PUBLIC(QThreadPool) friend class QThreadPoolThread;public: QThreadPoolPrivate(); QListallThreads; QQueue waitingThreads; QQueue expiredThreads;};
现在我们发现工作线程也不是很多,那就从 waitingThreads 容器中取一个线程来执行我们的任务。这是任务的第三个归属。
如果 waitingThreads 容器是空的,那就从 expiredThreads 容器中取一个线程来执行我们的任务。这是任务的第四个归属。 如果连 expiredThreads 容器也是空的,那最终只能和第一个归属一样,开辟一个新的线程来执行任务。 以上就是提交新 QRunnable 后的处理流程,理解起来不是很难。看源码弄懂这个过程对开阔思维很有帮助,希望读者能静下心来细细品味。.如何使用 QThreadPool
上节的内容如果没有看懂,没有关系,只要会用 QThreadPool 类就可以了,接下来我们讲讲如何使用 QThreadPool。所有的函数基本上是针对线程池、线程、任务这三部分而设计的。线程池相关
全局对象
因为每个 Qt 程序或者说每个进程都有一个全局 QThreadPool 对象,所以 globalInstance() 函数用于获取该对象的指针,那么用的时候直接用该静态函数即可,而无需显式的创建一个 QThreadPool 对象。如下:
#include#include #include #include class Task : public QRunnable{public: void run() { qDebug() << "Hello"; }};int main(int argc, char *argv[]){ QCoreApplication a(argc, argv); Task *task = new Task(); QThreadPool::globalInstance()->start(task); return a.exec();}
当然,并不是我们非得使用全局线程池,显式的创建一个局部线程池也是可以的。例如:
#include#include #include #include
class Task : public QRunnable{public: void run() { // 每个任务执行10秒,每一秒都输出当前线程 for (int i = 0; i < 10; ++i) { qDebug() << QThread::currentThread(); QThread::sleep(1); } }};
int main(int argc, char *argv[]){ QCoreApplication a(argc, argv); QThreadPool pool; // 每隔1秒插入一个任务 for (int i = 0; i < 200; ++i) { Task *task = new Task(); pool.start(task); QThread::sleep(1); } return a.exec();}
工作线程数
获取线程池中工作线程的数量可以调用 activeThreadCount() 函数来获取。 最大线程数 最大线程数的目的之一是判断线程池是否超负荷运行的依据,在上文「线程池如何处理这两种数据?」中已经说明。我们可以用 maxThreadCount() 函数来获取最大线程数,用 setMaxThreadCount() 函数设置最大线程数。线程相关
栈大小
QThreadPool 可以设置里面线程的栈大小,和《Qt 多线程编程之敲开 QThread 类的大门》的「3.2设置栈大小」内容一样。调用stackSize() 来获取设置的栈大小,调用 setStackSize() 来设置新线程的栈大小。
超时时间
这个 expiryTimeout 时间指的是执行完任务后线程保留的时间,默认是30秒。也就是说在30秒内如果线程还没有去执行新的任务,那么该线程才会被销毁。线程池设置超时时间只会对新线程有效果,对已经运行的线程产生作用。调用 setExpiryTimeout() 函数可以设置。
保留线程·
关于保留线程的作用,已经在上文「3.3线程池如何处理这两种数据?」中阐述,这里不再赘述。调用 reserveThread() 函数可以使 reservedThreads 计数加一,调用 releaseThread() 函数则将其计数减1。
执行
线程池没有“启动”一说,调用 start() 仅仅是将任务丢进线程池而已,线程池内部会根据自身的状况来决定如何处理这个任务。
任务相关
QThreadPool 还设计了一个阻塞函数 waitForDone() 用于等待所有任务被执行完,只有所有任务都执行完后才会继续执行该函数下面的代码。当然也可以将时间传入该函数的参数用于超时判断。
QThreadPool 类中的 start() 和 tryStart() 这两个函数比较相似,所不同的是:调用 start() 函数,如果线程池中没有可用的线程时,会把待执行的任务存放到任务列表中;而调用 tryStart() 函数,没有可用线程时会返回 false,并不会对任务做任何处理。
删除某个任务可以调用 tryTake() 函数,而清空所有任务可以调用 clear() 函数。
所有函数
QRunnable 类QRunnable()
virtual ~QRunnable() bool autoDelete() const void setAutoDelete(bool autoDelete) virtual void run() = 0 QThreadPool 类线程池
全局对象:[static]QThreadPool * globalInstance() 活跃的线程数:int activeThreadCount() const 最大线程数 int maxThreadCount() const void setMaxThreadCount(int maxThreadCount) 线程 栈大小 uint stackSize() const void setStackSize(uint stackSize) 超时时间 int expiryTimeout() const void setExpiryTimeout(int expiryTimeout) 保留线程 保留线程:void reserveThread() 释放线程:void releaseThread() 执行 启动:void start(QRunnable *runnable, int priority = 0) 任务 等待所有runnable完成:bool waitForDone(int msecs = -1) 尝试启用已有线程并立即运行runnable:bool tryStart(QRunnable *runnable) 删除runnable:bool tryTake(QRunnable *runnable) 删除未启动的runnable:void clear()转载地址:http://cwemi.baihongyu.com/