Boost高性能并发无锁队列指南:boost::lockfree::queue
liuian 2025-04-24 03:25 505 浏览
1. 库的介绍
boost::lockfree::queue是Boost C++库中lockfree模块的一部分,它提供了一个线程安全的无锁队列实现。无锁队列允许多个线程在不使用互斥锁的情况下并发地访问共享数据结构,从而避免了传统锁带来的线程阻塞和上下文切换开销。
该组件位于Boost库的boost/lockfree/queue.hpp头文件中,要使用它需要先安装Boost库(1.53.0版本以上),并包含相应的头文件:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <thread>
#include <vector>2. 主要功能与特点
2.1 主要功能
- 线程安全:多线程环境下无需额外同步机制
- 先进先出(FIFO)队列:保证数据按顺序处理
- 多生产者多消费者(MPMC)支持:适用于复杂并发场景
- 无阻塞操作:入队和出队操作不会阻塞线程
- 容量配置:支持固定大小和动态大小队列
2.2 特点
- 高性能:相比互斥锁实现,在高并发情况下性能显著提升
- 无死锁风险:不使用锁,因此不存在死锁问题
- 适合实时系统:操作延迟低且可预测
- 内存一致性:提供良好的内存序保证
- ABA问题的解决:内部实现解决了无锁编程中的ABA问题
3. 应用场景
boost::lockfree::queue特别适合以下场景:
- 高性能计算:需要线程间快速数据交换的场景
- 实时系统:对延迟敏感的应用程序
- 消息传递系统:作为线程间通信的媒介
- 生产者-消费者模式:一方生产数据,另一方消费数据
- 事件处理系统:处理高频率事件流
- 游戏引擎:需要低延迟线程通信的游戏场景
4. 详细功能模块与代码示例
4.1 基本用法
#include <boost/lockfree/queue.hpp>
#include <iostream>
int main()
{
// 创建一个容量为100的固定大小无锁队列
boost::lockfree::queue<int> queue(100);
// 入队操作
int value = 42;
bool success = queue.push(value);
if (success) {
std::cout << "成功将 " << value << " 入队\n";
} else {
std::cout << "入队失败,队列可能已满\n";
}
// 出队操作
int result;
if (queue.pop(result)) {
std::cout << "成功出队: " << result << "\n";
} else {
std::cout << "出队失败,队列可能为空\n";
}
return 0;
}4.2 固定大小与动态大小队列
#include <boost/lockfree/queue.hpp>
#include <iostream>
int main()
{
// 固定大小队列 - 构造时指定容量
boost::lockfree::queue<int> fixed_queue(100);
// 动态大小队列 - 使用模板参数指定
boost::lockfree::queue<int, boost::lockfree::capacity<0>> dynamic_queue;
// 或使用fixed_sized标志禁用动态大小
boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>> another_dynamic_queue;
// 检查队列是否为固定大小
std::cout << "固定队列是固定大小: " << fixed_queue.is_lock_free() << std::endl;
std::cout << "动态队列是固定大小: " << dynamic_queue.is_lock_free() << std::endl;
return 0;
}值得注意的是,动态大小队列内部使用了节点分配器,可能导致在某些操作中发生内存分配,这可能影响实时性能。
4.3 多生产者多消费者模式
这是boost::lockfree::queue最常见的使用场景:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
boost::lockfree::queue<int> queue(1000);
std::atomic<bool> done(false);
std::atomic<int> produced_count(0);
std::atomic<int> consumed_count(0);
void producer(int id)
{
for (int i = 0; i < 1000; ++i) {
int value = id * 10000 + i;
while (!queue.push(value)) {
// 队列满时,让出CPU时间片
std::this_thread::yield();
}
produced_count.fetch_add(1);
}
}
void consumer()
{
int value;
while (!done || !queue.empty()) {
if (queue.pop(value)) {
consumed_count.fetch_add(1);
// 处理value,这里只是简单打印
if (consumed_count % 1000 == 0) {
std::cout << "已消费: " << consumed_count << " 项\n";
}
} else {
std::this_thread::yield();
}
}
}
int main()
{
// 创建生产者线程
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
producers.push_back(std::thread(producer, i));
}
// 创建消费者线程
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
consumers.push_back(std::thread(consumer));
}
// 等待所有生产者完成
for (auto& t : producers) {
t.join();
}
// 通知消费者所有生产已完成
done = true;
// 等待所有消费者完成
for (auto& t : consumers) {
t.join();
}
std::cout << "生产项总数: " << produced_count << std::endl;
std::cout << "消费项总数: " << consumed_count << std::endl;
return 0;
}4.4 批量操作
boost::lockfree::queue提供了批量入队和出队操作,可以提高性能:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <vector>
int main()
{
boost::lockfree::queue<int> queue(100);
// 准备批量入队的数据
std::vector<int> items_to_push = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// 批量入队
size_t pushed = queue.push(items_to_push.begin(), items_to_push.end());
std::cout << "成功入队 " << pushed << " 个元素\n";
// 批量出队
std::vector<int> results(10);
size_t popped = queue.pop(results.begin(), results.end());
std::cout << "成功出队 " << popped << " 个元素: ";
for (size_t i = 0; i < popped; ++i) {
std::cout << results[i] << " ";
}
std::cout << std::endl;
return 0;
}4.5 消费者遍历
可以使用consume_one和consume_all函数结合回调函数处理队列中的元素:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <functional>
int main()
{
boost::lockfree::queue<int> queue(100);
// 添加一些元素
for (int i = 0; i < 10; ++i) {
queue.push(i);
}
// 使用consume_one处理单个元素
bool consumed = queue.consume_one([](int value) {
std::cout << "consume_one处理元素: " << value << std::endl;
});
std::cout << "consume_one " << (consumed ? "成功" : "失败") << std::endl;
// 使用consume_all处理所有元素
size_t consumed_count = queue.consume_all([](int value) {
std::cout << "consume_all处理元素: " << value << std::endl;
});
std::cout << "consume_all处理了 " << consumed_count << " 个元素\n";
return 0;
}4.6 队列容量与状态查询
boost::lockfree::queue提供了查询队列状态的方法:
#include <boost/lockfree/queue.hpp>
#include <iostream>
int main()
{
boost::lockfree::queue<int> queue(10);
// 填充队列
for (int i = 0; i < 5; ++i) {
queue.push(i);
}
// 检查队列是否为空
std::cout << "队列是否为空: " << (queue.empty() ? "是" : "否") << std::endl;
// 获取队列当前大小(近似值)
// 注意:在并发环境中这个值只是一个估计
std::cout << "队列大小估计: " << queue.read_available() << std::endl;
// 清空队列
queue.consume_all([](int){});
std::cout << "清空后队列是否为空: " << (queue.empty() ? "是" : "否") << std::endl;
return 0;
}4.7 高级配置选项
boost::lockfree::queue提供了多种配置选项来满足不同需求:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <boost/pool/pool_alloc.hpp>
// 自定义分配器
typedef boost::fast_pool_allocator<int> pool_allocator;
int main()
{
// 使用自定义分配器的队列
boost::lockfree::queue<int, boost::lockfree::allocator<pool_allocator>> custom_alloc_queue(100);
// 配置固定大小
boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>> fixed_queue(100);
// 自定义内存对齐
boost::lockfree::queue<int, boost::lockfree::alignment<16>> aligned_queue(100);
// 组合多个选项
boost::lockfree::queue<
int,
boost::lockfree::capacity<1000>, // 固定容量
boost::lockfree::fixed_sized<true>, // 固定大小
boost::lockfree::allocator<pool_allocator> // 自定义分配器
> advanced_queue;
// 测试队列功能
for (int i = 0; i < 10; ++i) {
custom_alloc_queue.push(i);
fixed_queue.push(i);
aligned_queue.push(i);
advanced_queue.push(i);
}
int value;
while (custom_alloc_queue.pop(value)) {
std::cout << "自定义分配器队列元素: " << value << std::endl;
}
return 0;
}4.8 性能优化与最佳实践
在使用boost::lockfree::queue时,以下最佳实践可以帮助您获得最佳性能:
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <atomic>
// 性能测试示例
void performance_test()
{
// 使用合适的队列大小,避免频繁的内存分配
constexpr size_t QUEUE_SIZE = 10000;
boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>> queue(QUEUE_SIZE);
std::atomic<bool> start{false};
std::atomic<int> ready_producers{0};
std::atomic<int> ready_consumers{0};
std::atomic<bool> done{false};
// 生产者
auto producer = [&](int id, int items) {
ready_producers++;
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 等待开始信号
}
for (int i = 0; i < items; ++i) {
int value = id * 1000000 + i;
// 使用批量入队来提高性能
if (i % 100 == 0 && i > 0) {
std::vector<int> batch;
for (int j = 0; j < 100; ++j) {
batch.push_back(value - 100 + j);
}
queue.push(batch.begin(), batch.end());
} else {
// 当队列满时进行指数退避
int retry = 0;
while (!queue.push(value)) {
if (++retry > 10) {
std::this_thread::sleep_for(std::chrono::microseconds(1 << std::min(retry, 10)));
} else {
std::this_thread::yield();
}
}
}
}
};
// 消费者
auto consumer = [&]() {
ready_consumers++;
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 等待开始信号
}
std::vector<int> batch(100);
int value;
while (!done || !queue.empty()) {
// 尝试批量出队
size_t popped = queue.pop(batch.begin(), batch.end());
if (popped > 0) {
// 处理批量数据
continue;
}
// 单个出队
if (queue.pop(value)) {
// 处理单个元素
} else {
// 智能退避,避免CPU空转
std::this_thread::yield();
}
}
};
// 创建线程
constexpr int NUM_PRODUCERS = 4;
constexpr int NUM_CONSUMERS = 4;
constexpr int ITEMS_PER_PRODUCER = 100000;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < NUM_PRODUCERS; ++i) {
producers.emplace_back(producer, i, ITEMS_PER_PRODUCER);
}
for (int i = 0; i < NUM_CONSUMERS; ++i) {
consumers.emplace_back(consumer);
}
// 等待所有线程就绪
while (ready_producers < NUM_PRODUCERS || ready_consumers < NUM_CONSUMERS) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// 开始计时
auto start_time = std::chrono::high_resolution_clock::now();
// 发出开始信号
start.store(true, std::memory_order_release);
// 等待生产者完成
for (auto& t : producers) {
t.join();
}
// 标记生产者已完成
done = true;
// 等待消费者完成
for (auto& t : consumers) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
std::cout << "处理 " << NUM_PRODUCERS * ITEMS_PER_PRODUCER
<< " 项数据耗时: " << duration.count() << " 毫秒" << std::endl;
std::cout << "每秒处理约 "
<< (NUM_PRODUCERS * ITEMS_PER_PRODUCER * 1000.0 / duration.count())
<< " 项" << std::endl;
}
int main()
{
performance_test();
return 0;
}4.9 与其他Boost组件结合使用
boost::lockfree::queue可以与其他Boost组件结合使用,实现更复杂的功能:
#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <thread>
#include <functional>
// 任务队列类示例
class TaskQueue {
private:
boost::lockfree::queue<std::function<void()>*> task_queue{1000};
boost::asio::io_context io_context;
std::unique_ptr<boost::asio::io_context::work> work;
std::vector<std::thread> worker_threads;
std::atomic<bool> running{false};
public:
TaskQueue(int num_threads = 4) : work(std::make_unique<boost::asio::io_context::work>(io_context)) {
running = true;
// 启动工作线程
for (int i = 0; i < num_threads; ++i) {
worker_threads.emplace_back([this]() {
while (running) {
// 尝试从队列中获取任务
std::function<void()>* task = nullptr;
if (task_queue.pop(task)) {
if (task) {
// 执行任务
(*task)();
delete task;
}
} else {
// 没有任务时处理IO事件
io_context.poll_one();
std::this_thread::yield();
}
}
});
}
}
~TaskQueue() {
stop();
}
// 提交任务
template<typename F>
bool submit(F&& task) {
auto* task_ptr = new std::function<void()>(std::forward<F>(task));
bool success = task_queue.push(task_ptr);
if (!success) {
delete task_ptr;
}
return success;
}
// 定时任务
template<typename F>
void schedule_after(int milliseconds, F&& task) {
auto timer = std::make_shared<boost::asio::steady_timer>(io_context);
timer->expires_after(std::chrono::milliseconds(milliseconds));
timer->async_wait([timer, task = std::forward<F>(task)](const boost::system::error_code& ec) {
if (!ec) {
task();
}
});
}
// 停止队列处理
void stop() {
if (running) {
running = false;
work.reset();
io_context.stop();
for (auto& thread : worker_threads) {
if (thread.joinable()) {
thread.join();
}
}
worker_threads.clear();
// 清空剩余任务
std::function<void()>* task = nullptr;
while (task_queue.pop(task)) {
delete task;
}
}
}
};
// 使用示例
int main() {
TaskQueue task_queue(4);
// 提交普通任务
for (int i = 0; i < 10; ++i) {
task_queue.submit([i]() {
std::cout << "执行任务 " << i << " 在线程 "
<< std::this_thread::get_id() << std::endl;
});
}
// 提交延迟任务
task_queue.schedule_after(1000, []() {
std::cout << "1秒后执行的定时任务" << std::endl;
});
// 等待任务完成
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}5. 注意事项与限制
使用boost::lockfree::queue时需要注意以下几点:
- 内存一致性:无锁队列依赖特定的内存序来保证正确性,不当使用可能导致难以发现的并发问题。
- 固定大小限制:固定大小队列可能会因队列满而拒绝新元素,必须有处理这种情况的策略。
- 动态内存分配:动态大小队列可能在运行时进行内存分配,这可能不适合对延迟敏感的应用程序。
- ABA问题:虽然boost::lockfree::queue内部处理了ABA问题,但了解这一问题有助于理解实现细节。
- 原子操作开销:无锁队列虽然避免了锁的开销,但原子操作本身也有一定代价,在低竞争环境下可能不如简单的锁实现快。
- 对齐要求:某些平台上的原子操作可能需要特定的内存对齐,boost::lockfree::queue会自动处理这些要求。
6. 总结
boost::lockfree::queue是一个强大的无锁队列实现,能够在多线程环境中提供高性能的数据交换。它无需使用互斥锁,因此避免了与锁相关的多种问题,特别适合对延迟敏感或高并发的应用场景。
相关推荐
-
- 驱动网卡(怎么从新驱动网卡)
-
网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...
-
2026-01-30 00:37 liuian
- win10更新助手装系统(微软win10更新助手)
-
1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...
- windows11专业版密钥最新(windows11专业版激活码永久)
-
Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...
-
- 手机删过的软件恢复(手机删除过的软件怎么恢复)
-
操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...
-
2026-01-29 23:55 liuian
- 一键ghost手动备份系统步骤(一键ghost 备份)
-
步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。 步骤...
- 怎么创建局域网(怎么创建局域网打游戏)
-
1、购买路由器一台。进入路由器把dhcp功能打开 2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。 3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...
- 精灵驱动器官方下载(精灵驱动手机版下载)
-
是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...
- 一键还原系统步骤(一键还原系统有哪些)
-
1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。 2、在弹出的“准备安装”窗口中,可...
- 电脑加速器哪个好(电脑加速器哪款好)
-
我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...
- 任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)
-
是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...
- u盘怎么恢复文件(u盘文件恢复的方法)
-
开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...
- 系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)
-
1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...
-
- 剪贴板权限设置方法(剪贴板访问权限)
-
1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...
-
2026-01-29 21:37 liuian
- 平板系统重装大师(平板重装win系统)
-
如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...
- 联想官网售后服务网点(联想官网售后服务热线)
-
联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
