Boost无锁栈指南:boost::lockfree::stack
liuian 2025-04-24 03:25 116 浏览
1. 库的介绍
boost::lockfree::stack 是 Boost C++ 库中 lockfree 模块提供的线程安全无锁栈实现。无锁栈允许多个线程在不使用互斥锁的情况下并发地访问和修改共享数据结构,从而避免了传统锁机制带来的线程阻塞和上下文切换开销。
该组件位于 Boost 库的 boost/lockfree/stack.hpp 头文件中,要使用它需要先安装 Boost 库(1.53.0 版本以上),并包含相应的头文件:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <thread>
#include <vector>2. 主要功能与特点
2.1 主要功能
- 线程安全:无需额外同步机制即可安全地在多线程环境中使用
- 后进先出(LIFO)栈:保证数据按栈的特性处理
- 多生产者多消费者(MPMC)支持:适用于复杂并发场景
- 无阻塞操作:入栈和出栈操作不会导致线程阻塞
- 容量配置:支持固定大小和动态大小栈实现
2.2 特点
- 高性能:在高并发情况下,性能显著优于基于互斥锁的实现
- 无死锁风险:由于不使用锁,因此不存在死锁问题
- 适合实时系统:操作延迟低且可预测,适合对时间敏感的应用
- 内存一致性:提供良好的内存序保证,确保线程间正确通信
- ABA问题的解决:内部实现已解决无锁编程中常见的 ABA 问题
3. 应用场景
boost::lockfree::stack 特别适合以下场景:
- 高性能计算:需要线程间快速数据交换,且出栈顺序与入栈顺序相反的场景
- 实时系统:对延迟敏感,要求预测性能的应用程序
- 任务调度系统:管理具有后进先出特性的任务执行
- 深度优先算法实现:需要栈结构支持的算法
- 内存管理:实现高效的对象池或内存池
- 撤销操作系统:需要记录最近操作以便撤销的应用
4. 详细功能模块与代码示例
4.1 基本用法
下面是使用 boost::lockfree::stack 的最基本示例:
#include <boost/lockfree/stack.hpp>
#include <iostream>
int main()
{
// 创建一个容量为100的固定大小无锁栈
boost::lockfree::stack<int> stack(100);
// 入栈操作
int value = 42;
bool success = stack.push(value);
if (success) {
std::cout << "成功将 " << value << " 入栈\n";
} else {
std::cout << "入栈失败,栈可能已满\n";
}
// 出栈操作
int result;
if (stack.pop(result)) {
std::cout << "成功出栈: " << result << "\n";
} else {
std::cout << "出栈失败,栈可能为空\n";
}
return 0;
}4.2 固定大小与动态大小栈
boost::lockfree::stack 支持两种容量模式:
#include <boost/lockfree/stack.hpp>
#include <iostream>
int main()
{
// 固定大小栈 - 构造时指定容量
boost::lockfree::stack<int> fixed_stack(100);
// 动态大小栈 - 使用模板参数指定
boost::lockfree::stack<int, boost::lockfree::capacity<0>> dynamic_stack;
// 或使用 fixed_sized 标志禁用固定大小
boost::lockfree::stack<int, boost::lockfree::fixed_sized<false>> another_dynamic_stack;
// 检查栈是否为无锁实现
std::cout << "固定栈是无锁的: " << fixed_stack.is_lock_free() << std::endl;
std::cout << "动态栈是无锁的: " << dynamic_stack.is_lock_free() << std::endl;
return 0;
}值得注意的是,动态大小栈内部使用了节点分配器,可能在某些操作中发生内存分配,这可能影响实时性能。
4.3 多生产者多消费者模式
以下是在多线程环境中使用 boost::lockfree::stack 的一个完整示例:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <cstdlib>
boost::lockfree::stack<int> stack(1000);
std::atomic<bool> done(false);
std::atomic<unsigned> push_count(0);
std::atomic<unsigned> pop_count(0);
void producer(int id)
{
for (int i = 0; i < 1000; ++i) {
int value = id * 10000 + i;
while (!stack.push(value)) {
// 栈满时,让出CPU时间片
std::this_thread::yield();
}
++push_count;
}
}
void consumer()
{
int value;
while (!done || !stack.empty()) {
if (stack.pop(value)) {
++pop_count;
// 处理value,这里只是简单计数
if (pop_count % 1000 == 0) {
std::cout << "已消费: " << pop_count << " 项\n";
}
} else {
std::this_thread::yield();
}
}
}
int main()
{
// 创建生产者线程
std::vector<std::thread> producers;
for (int i = 0; i < 4; ++i) {
producers.push_back(std::thread(producer, i));
}
// 创建消费者线程
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
consumers.push_back(std::thread(consumer));
}
// 等待所有生产者完成
for (auto& t : producers) {
t.join();
}
// 通知消费者所有生产已完成
done = true;
// 等待所有消费者完成
for (auto& t : consumers) {
t.join();
}
std::cout << "入栈总数: " << push_count << std::endl;
std::cout << "出栈总数: " << pop_count << std::endl;
return 0;
}4.4 批量操作
boost::lockfree::stack 提供了批量入栈和出栈操作,可以提高性能:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <vector>
#include <algorithm>
int main()
{
boost::lockfree::stack<int> stack(100);
// 准备批量入栈的数据
std::vector<int> items_to_push = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// 批量入栈
size_t pushed = stack.push(items_to_push.begin(), items_to_push.end());
std::cout << "成功入栈 " << pushed << " 个元素\n";
// 批量出栈
std::vector<int> results(10);
size_t popped = stack.pop(results.begin(), results.end());
std::cout << "成功出栈 " << popped << " 个元素: ";
for (size_t i = 0; i < popped; ++i) {
std::cout << results[i] << " ";
}
std::cout << std::endl;
// 注意:由于栈的LIFO特性,出栈顺序与入栈相反
std::cout << "验证LIFO特性: ";
if (popped > 0 && results[0] == 10) {
std::cout << "符合预期,最后入栈的元素(10)最先出栈" << std::endl;
}
return 0;
}4.5 消费者遍历
可以使用 consume_one 和 consume_all 函数结合回调函数处理栈中的元素:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <functional>
int main()
{
boost::lockfree::stack<int> stack(100);
// 添加一些元素
for (int i = 1; i <= 10; ++i) {
stack.push(i);
}
// 使用consume_one处理单个元素
bool consumed = stack.consume_one([](int value) {
std::cout << "consume_one处理元素: " << value << std::endl;
});
std::cout << "consume_one " << (consumed ? "成功" : "失败") << std::endl;
// 使用consume_all处理所有元素
size_t consumed_count = stack.consume_all([](int value) {
std::cout << "consume_all处理元素: " << value << std::endl;
});
std::cout << "consume_all处理了 " << consumed_count << " 个元素\n";
// 验证消费完毕后栈为空
std::cout << "栈现在是" << (stack.empty() ? "空的" : "非空的") << std::endl;
return 0;
}4.6 栈容量与状态查询
boost::lockfree::stack 提供了查询栈状态的方法:
#include <boost/lockfree/stack.hpp>
#include <iostream>
int main()
{
boost::lockfree::stack<int> stack(10);
// 填充栈
for (int i = 0; i < 5; ++i) {
stack.push(i);
}
// 检查栈是否为空
std::cout << "栈是否为空: " << (stack.empty() ? "是" : "否") << std::endl;
// 注意:无锁栈不提供直接获取栈大小的方法,因为这在无锁环境中开销很大
// 可以使用消费者循环计数
int count = 0;
stack.consume_all([&count](int) { ++count; });
std::cout << "栈中元素数量: " << count << std::endl;
// 重新填充栈
for (int i = 0; i < 5; ++i) {
stack.push(i);
}
// 检查栈是否已满 (无法直接检查,但可以尝试入栈)
bool is_full = !stack.push(100);
std::cout << "尝试入栈新元素: " << (is_full ? "栈可能已满" : "入栈成功") << std::endl;
// 清空栈
int value;
while (stack.pop(value)) {
std::cout << "出栈元素: " << value << std::endl;
}
std::cout << "清空后栈是否为空: " << (stack.empty() ? "是" : "否") << std::endl;
return 0;
}4.7 高级配置选项
boost::lockfree::stack 提供了多种配置选项来满足不同需求:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <boost/pool/pool_alloc.hpp>
// 自定义分配器
typedef boost::fast_pool_allocator<int> pool_allocator;
struct ComplexType {
int id;
double value;
ComplexType(int i = 0, double v = 0.0) : id(i), value(v) {}
// 用于调试的输出函数
friend std::ostream& operator<<(std::ostream& os, const ComplexType& obj) {
return os << "ID:" << obj.id << ", Value:" << obj.value;
}
};
int main()
{
// 使用自定义分配器的栈
boost::lockfree::stack<int, boost::lockfree::allocator<pool_allocator>> custom_alloc_stack(100);
// 配置固定大小
boost::lockfree::stack<int, boost::lockfree::fixed_sized<true>> fixed_stack(100);
// 自定义内存对齐
boost::lockfree::stack<int, boost::lockfree::alignment<16>> aligned_stack(100);
// 组合多个选项
boost::lockfree::stack<
ComplexType,
boost::lockfree::capacity<1000>, // 固定容量
boost::lockfree::fixed_sized<true>, // 固定大小
boost::lockfree::allocator<pool_allocator> // 自定义分配器
> advanced_stack;
// 测试栈功能
for (int i = 0; i < 10; ++i) {
custom_alloc_stack.push(i);
fixed_stack.push(i);
aligned_stack.push(i);
advanced_stack.push(ComplexType(i, i * 1.5));
}
// 测试自定义类型栈
ComplexType complex_value;
while (advanced_stack.pop(complex_value)) {
std::cout << "从高级栈弹出: " << complex_value << std::endl;
}
return 0;
}4.8 性能优化与最佳实践
在使用 boost::lockfree::stack 时,以下最佳实践可以帮助您获得最佳性能:
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <atomic>
#include <random>
#include <algorithm>
// 性能测试示例
void performance_test()
{
// 使用合适的栈大小,避免频繁的内存分配
constexpr size_t STACK_SIZE = 10000;
boost::lockfree::stack<int, boost::lockfree::fixed_sized<true>> stack(STACK_SIZE);
std::atomic<bool> start{false};
std::atomic<int> ready_producers{0};
std::atomic<int> ready_consumers{0};
std::atomic<bool> done{false};
// 生产者
auto producer = [&](int id, int items) {
// 使用线程本地缓冲区收集一批数据,减少对共享栈的竞争
std::vector<int> local_buffer;
local_buffer.reserve(100); // 预分配空间
std::mt19937 rng(id); // 使用确定性随机数,避免不同线程生成相同数据
ready_producers++;
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 等待开始信号
}
for (int i = 0; i < items; ++i) {
int value = id * 1000000 + i;
// 将数据添加到本地缓冲区
local_buffer.push_back(value);
// 批量入栈,减少竞争
if (local_buffer.size() == 100) {
// 随机打乱顺序,减少假共享
std::shuffle(local_buffer.begin(), local_buffer.end(), rng);
size_t pushed = 0;
do {
// 尝试批量入栈
pushed += stack.push(local_buffer.begin() + pushed, local_buffer.end());
// 如果没有全部入栈成功,等待一小会再试
if (pushed < local_buffer.size()) {
std::this_thread::yield();
}
} while (pushed < local_buffer.size());
local_buffer.clear();
}
}
// 处理剩余元素
if (!local_buffer.empty()) {
size_t pushed = 0;
do {
pushed += stack.push(local_buffer.begin() + pushed, local_buffer.end());
if (pushed < local_buffer.size()) {
std::this_thread::yield();
}
} while (pushed < local_buffer.size());
}
};
// 消费者
auto consumer = [&]() {
// 使用本地批处理来减少竞争
std::vector<int> batch(100);
ready_consumers++;
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 等待开始信号
}
int backoff = 0; // 回退计数器
while (!done || !stack.empty()) {
// 尝试批量出栈
size_t popped = stack.pop(batch.begin(), batch.end());
if (popped > 0) {
// 处理批量数据
backoff = 0; // 重置回退
} else {
// 使用指数回退策略
if (++backoff > 10) {
std::this_thread::sleep_for(std::chrono::microseconds(1 << std::min(backoff - 10, 6)));
} else {
std::this_thread::yield();
}
}
}
};
// 创建线程
constexpr int NUM_PRODUCERS = 4;
constexpr int NUM_CONSUMERS = 4;
constexpr int ITEMS_PER_PRODUCER = 100000;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < NUM_PRODUCERS; ++i) {
producers.emplace_back(producer, i, ITEMS_PER_PRODUCER);
}
for (int i = 0; i < NUM_CONSUMERS; ++i) {
consumers.emplace_back(consumer);
}
// 等待所有线程就绪
while (ready_producers < NUM_PRODUCERS || ready_consumers < NUM_CONSUMERS) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// 开始计时
auto start_time = std::chrono::high_resolution_clock::now();
// 发出开始信号
start.store(true, std::memory_order_release);
// 等待生产者完成
for (auto& t : producers) {
t.join();
}
// 标记生产者已完成
done = true;
// 等待消费者完成
for (auto& t : consumers) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
std::cout << "处理 " << NUM_PRODUCERS * ITEMS_PER_PRODUCER
<< " 项数据耗时: " << duration.count() << " 毫秒" << std::endl;
std::cout << "每秒处理约 "
<< (NUM_PRODUCERS * ITEMS_PER_PRODUCER * 1000.0 / duration.count())
<< " 项" << std::endl;
}
int main()
{
performance_test();
return 0;
}4.9 与其他Boost组件结合使用
boost::lockfree::stack 可以与其他Boost组件结合使用,实现更复杂的功能:
#include <boost/lockfree/stack.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <thread>
#include <functional>
#include <memory>
// 基于无锁栈的任务系统示例
class TaskSystem {
private:
struct Task {
std::function<void()> func;
// 构造函数
template<typename F>
Task(F&& f) : func(std::forward<F>(f)) {}
};
boost::lockfree::stack<Task*> task_stack{1000};
boost::asio::io_context io_context;
std::unique_ptr<boost::asio::io_context::work> work;
std::vector<std::thread> worker_threads;
std::atomic<bool> running{false};
public:
TaskSystem(int num_threads = 4) : work(std::make_unique<boost::asio::io_context::work>(io_context)) {
running = true;
// 启动工作线程
for (int i = 0; i < num_threads; ++i) {
worker_threads.emplace_back([this]() {
while (running) {
// 尝试从栈中获取任务
Task* task = nullptr;
if (task_stack.pop(task)) {
if (task) {
// 执行任务
task->func();
delete task;
}
} else {
// 没有任务时处理IO事件
io_context.poll_one();
std::this_thread::yield();
}
}
});
}
}
~TaskSystem() {
stop();
}
// 提交任务(后进先出 - LIFO顺序执行)
template<typename F>
bool submit(F&& task) {
auto* task_ptr = new Task(std::forward<F>(task));
bool success = task_stack.push(task_ptr);
if (!success) {
delete task_ptr;
}
return success;
}
// 延迟任务
template<typename F>
void schedule_after(int milliseconds, F&& task) {
auto timer = std::make_shared<boost::asio::steady_timer>(io_context);
timer->expires_after(std::chrono::milliseconds(milliseconds));
timer->async_wait([timer, task = std::forward<F>(task), this](const boost::system::error_code& ec) {
if (!ec) {
this->submit(task);
}
});
}
// 停止任务系统
void stop() {
if (running) {
running = false;
work.reset();
io_context.stop();
for (auto& thread : worker_threads) {
if (thread.joinable()) {
thread.join();
}
}
worker_threads.clear();
// 清空剩余任务
Task* task = nullptr;
while (task_stack.pop(task)) {
delete task;
}
}
}
};
// 使用示例
int main() {
TaskSystem task_system(4);
// 注意这是LIFO顺序,最后提交的任务会最先执行
for (int i = 0; i < 10; ++i) {
task_system.submit([i]() {
std::cout << "执行任务 " << i << " 在线程 "
<< std::this_thread::get_id() << std::endl;
});
}
// 提交延迟任务
task_system.schedule_after(1000, []() {
std::cout << "1秒后执行的延迟任务" << std::endl;
});
// 等待任务完成
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}5. 注意事项与限制
使用 boost::lockfree::stack 时需要注意以下几点:
- LIFO语义:栈是后进先出的,如果需要先进先出的行为,应该使用 boost::lockfree::queue。
- 内存一致性:无锁栈依赖特定的内存序来保证正确性,不当使用可能导致难以发现的并发问题。
- 固定大小限制:固定大小栈可能会因栈满而拒绝新元素,必须有处理这种情况的策略。
- 动态内存分配:动态大小栈可能在运行时进行内存分配,这可能不适合对延迟敏感的应用程序。
- ABA问题:虽然 boost::lockfree::stack 内部处理了ABA问题,但了解这一问题有助于理解实现细节。
- 性能考量:在低竞争环境下,无锁栈可能比简单互斥锁实现的栈开销更大,应根据实际场景选择。
- 不保证公平性:在高竞争情况下,无锁栈不保证线程公平性,可能会导致某些线程的操作被长时间延迟。
- 构造函数异常安全:元素类型的构造函数需要保证异常安全,否则可能导致内存泄漏。
6. 总结
boost::lockfree::stack 是一个功能强大的无锁栈实现,能够在多线程环境中提供高性能的LIFO数据结构。它避免了使用互斥锁,消除了死锁风险,并减少了线程竞争和上下文切换的开销,特别适合对性能和延迟敏感的应用场景。
相关推荐
- 华硕人工客服24小时吗(华硕售后人工客服)
-
华硕服务中心广东省惠州市惠东县城平深路(创富斜对面)惠东同心电脑城1L11(1.3km)笔记本电脑,平板电脑华硕服务中心广东省惠州市惠东县平山镇同心电脑城1F26(1.3km)笔记本电脑,平...
- 电脑音量小喇叭不见了(电脑声音喇叭图标不见了怎么办)
-
如果您电脑上的小喇叭(扬声器)不见了,可以尝试以下方法找回:1.检查设备管理器:在Windows下,右键点击“我的电脑”(或此电脑)->点击“属性”->点击“设备管理器”,查看“声音、视...
- 腾达路由器手机设置教程(腾达路由器手机设置教程视频)
-
用手机设置腾达路由器的方法如下:1在手机上打开浏览器,输入路由器背面的管理IP和用户及对应的密码2一般第一次打开,默认会跳出设置向导,准备好宽带用户名和密码,3按向导提示输入相应内容4在无线设置的安全...
- 自配电脑配置推荐(自配电脑配置推荐百度)
-
首先,像这类软件最低要求不高。最高没上限。纯粹看你的工程量大小。CPU有双核,内存有4G,就可以运行。但是实际体验肯定比较差,卡是肯德。渲染时间也会超长,一个小作品渲染几小时是正常的。稍微大点的工程也...
- 2025年平板性价比排行(2020年值得买的平板)
-
推荐台电P30S好。 基本配置:10.1英寸IPS广视角屏幕,1280*800分辨率,16:10的黄金显示比例,K9高压独立功放,支持3.5mm耳麦接口,联发科MT8183八核处理器,4GB...
- 2020显卡天梯图10月(2020显卡天梯图极速空间)
-
排行球队名称积分已赛胜平负进球失球净胜球 1?诺维奇城974629107753639 2?沃特福德91462710...
-
- 笔记本电脑无线网络连接(笔记本电脑无线网络连接不上怎么办)
-
一、笔记本电脑怎么连接wifi---win7系统笔记本连接wifi1、要先创建无线网络连接,将鼠标移到Win7的开始菜单,然后点击“控制面板”。2、然后点击“网络和Internet”。3、再打开“网络和共享中心”,这是Win7系统必有的功...
-
2025-12-22 05:55 liuian
- wind数据库(wind数据库官网)
-
先购买wind数据库,安装好wind取得使用权后,按照wind所给提示,输入账户和密码可使用wind数据库。Wind资讯金融终端是一个集实时行情、资料查询、数据浏览、研究分析、新闻资讯为一体的金融数据...
- 如何关闭360家庭防火墙(如果关闭360家庭防火墙)
-
关闭方法如下:1.打开手机360主界面之后,点击“安全防护中心”。2.点击第三列“入口防护”下方的“查看状态”按钮。3.在列出的功能项中找到“局域网防护”,直接点击后面的“关闭”按钮,关闭所有的“局域...
- 笔记本电脑型号配置怎么看(怎么查自己电脑的型号)
-
查电脑的配置和型号方法:方法一:1、右键单击“此电脑”,点击属性2、这里可以看到操作系统,CPU等大致信息3、点击设备管理器4、这里可以查看具体硬件的详细信方法二:1、首先打开电脑上的“控制面板”2、...
- pscs6序列号是什么
-
AdobePhotoshopCS6就二个版本(测试版和正式版)1、AdobePhotoshopCS6是AdobePhotoshop的第13代,是一个较为重大的版本更新。2、Photoshop在前几...
- win7桌面图片怎么设置(win7如何设置桌面图片)
-
1、首先用鼠标右键单击桌面的空白处。然后在弹出的菜单上选择“个性化”选项。这样就弹出了的个性化窗口上能显示看到“桌面背景”按钮。点击它即可。2、继续打开了选择“桌面背景”选项,然后在上面选择你想要设置...
- windows安卓下载(win安卓版)
-
2265安卓网是安全的,2265安卓网成立于2012年初,网站一直努力为各位安卓爱好者提供最新、最全的安卓游戏软件资源下载。经过几个月的努力、和广大安卓用户的支持、2265安卓网截至到2012年6月已...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
