boost方案
boost提供了三种无锁方案
boost::lockfree::queue:
支持多个生产者和多个消费者线程的无锁队列。
boost::lockfree::stack:
支持多个生产者和多个消费者线程的无锁栈。
boost::lockfree::spsc_queue:
仅支持单个生产者和单个消费者线程的无锁队列。相比boost::lockfree::queue,其效率更高。
注:这些API内部是通过轻量级原子锁实现的lock-free,不是真正意义的无锁。我看到的资料中,貌似只有linux kernel中fifo实现了真正意义上的无锁,但是仅用于与单个消费者单个生产者的环境。
boost官方文档:
http://www.boost.org/doc/libs/1_60_0/doc/html/lockfree.html
queue容量和自增长的问题
可以设置初始容量,添加新元素时如果容量不够,则总容量可能自动增长:queue在当前操作系统上如果支持lock-free,则不会自动增长,如果不支持lock-free,才会自动增长。不同的操作系统其内存分配机制不同,这样会导致在某些操作系统上的queue不支持lockfree
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<2>> q; printf("boost::lockfree:queue is lock free:%s", q.is_lock_free() ? "true" : "false"); //true //push的返回值:1,push成功;0,push失败。 size_t s1 = q.push(9); //1 size_t s2 = q.push(9); //1 size_t s3 = q.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<2>> q2; size_t s2_1 = q2.push(9); //1 size_t s2_2 = q2.push(9); //1 size_t s2_3 = q2.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>, boost::lockfree::capacity<2>> q3; size_t s3_1 = q3.push(9); //1 size_t s3_2 = q3.push(9); //1 size_t s3_3 = q3.push(9); //0 size_t s3_4 = q3.push(9); //0
如果不需要考虑多线程或者自己实现同步,还有一种方案:boost::circular_buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html
C++11 std::atomic方案
网上有人借用std::atomic实现的一套无锁队列,其内部实现参考了boost::lockfree::queue的设计思路,可以适用于多个消费者多个生产者线程。
A High Performance Lock Free Ring Queue
http://www.codeproject.com/Tips/754393/A-High-Performance-Lock-Free-Ring-Queue
下面代码我在原文基础上做了修改:最新的编译器已不支持std::atomic_flag在构造函数中初始化。
lfringqueue.h
#ifndef INCLUDED_UTILS_LFRINGQUEUE #define INCLUDED_UTILS_LFRINGQUEUE #define _ENABLE_ATOMIC_ALIGNMENT_FIX #define ATOMIC_FLAG_INIT 0 #pragma once #include <vector> #include <mutex> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include <iostream> // Lock free ring queue template < typename _TyData, long _uiCount = 100000 > class lfringqueue { public: lfringqueue( long uiCount = _uiCount ) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount ) { m_queue = new _TyData*[m_uiCount]; memset( m_queue, 0, sizeof(_TyData*) * m_uiCount ); } ~lfringqueue() { if ( m_queue ) delete [] m_queue; } bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 ) { if ( NULL == pdata ) { // Null enqueues are not allowed return false; } unsigned int uiCurrRetries = 0; while ( uiCurrRetries < uiRetries ) { // Release fence in order to prevent memory reordering // of any read or write with following write std::atomic_thread_fence(std::memory_order_release); long lHeadIterator = m_lHeadIterator; if ( NULL == m_queue[lHeadIterator] ) { long lHeadIteratorOrig = lHeadIterator; ++lHeadIterator; if ( lHeadIterator >= m_uiCount ) lHeadIterator = 0; // Don't worry if this CAS fails. It only means some thread else has // already inserted an item and set it. if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) ) { // void* are always atomic (you wont set a partial pointer). m_queue[lHeadIteratorOrig] = pdata; if ( m_lEventSet.test_and_set( )) { m_bHasItem.test_and_set(); } return true; } } else { // The queue is full. Spin a few times to check to see if an item is popped off. ++uiCurrRetries; } } return false; } bool dequeue( _TyData **ppdata ) { if ( !ppdata ) { // Null dequeues are not allowed! return false; } bool bDone = false; bool bCheckQueue = true; while ( !bDone ) { // Acquire fence in order to prevent memory reordering // of any read or write with following read std::atomic_thread_fence(std::memory_order_acquire); //MemoryBarrier(); long lTailIterator = m_lTailIterator; _TyData *pdata = m_queue[lTailIterator]; //volatile _TyData *pdata = m_queue[lTailIterator]; if ( NULL != pdata ) { bCheckQueue = true; long lTailIteratorOrig = lTailIterator; ++lTailIterator; if ( lTailIterator >= m_uiCount ) lTailIterator = 0; //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator )) { // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). m_queue[lTailIteratorOrig] = NULL; // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). *ppdata = (_TyData*)pdata; return true; } } else { bDone = true; m_lEventSet.clear(); } } *ppdata = NULL; return false; } long countguess() const { long lCount = trycount(); if ( 0 != lCount ) return lCount; // If the queue is full then the item right before the tail item will be valid. If it // is empty then the item should be set to NULL. long lLastInsert = m_lTailIterator - 1; if ( lLastInsert < 0 ) lLastInsert = m_uiCount - 1; _TyData *pdata = m_queue[lLastInsert]; if ( pdata != NULL ) return m_uiCount; return 0; } long getmaxsize() const { return m_uiCount; } bool HasItem() { return m_bHasItem.test_and_set(); } void SetItemFlagBack() { m_bHasItem.clear(); } private: long trycount() const { long lHeadIterator = m_lHeadIterator; long lTailIterator = m_lTailIterator; if ( lTailIterator > lHeadIterator ) return m_uiCount - lTailIterator + lHeadIterator; // This has a bug where it returns 0 if the queue is full. return lHeadIterator - lTailIterator; } private: std::atomic<long> m_lHeadIterator; // enqueue index std::atomic<long> m_lTailIterator; // dequeue index _TyData **m_queue; // array of pointers to the data long m_uiCount; // size of the array std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued }; #endif //INCLUDED_UTILS_LFRINGQUEUE
测试:
/* * File: main.cpp * Author: Peng * * Created on February 22, 2014, 9:55 PM */ #include <cstdlib> #include "lfringqueue.h" #include <mutex> #include <stdio.h> #include <string> #include <set> #include <random> #include <chrono> #include <iostream> #include <ctime> #include <atomic> #include <sstream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <iostream> #include <boost/atomic.hpp> const long NUM_DATA = 10; const int NUM_ENQUEUE_THREAD = 1; const int NUM_DEQUEUE_THREAD = 1; const long NUM_ITEM = 1000000; using namespace std; class Data { public: Data( int i = 0 ) : m_iData(i) { stringstream ss; ss << i; m_szDataString = ss.str(); //sprintf( m_szDataString, "%l-d", i); } bool operator< ( const Data & aData) const { if ( m_iData < aData.m_iData) return true; else return false; } int& GetData() { return m_iData; } private: int m_iData; string m_szDataString; //char m_szDataString[MAX_DATA_SIZE]; }; Data DataArray[NUM_DATA]; constexpr long size = 0.5 * NUM_DATA; lfringqueue < Data, 1000> LockFreeQueue; boost::lockfree::queue<Data*> BoostQueue(1000); // Since there is a chance that the searched number cannot be found, so the function should return boolean bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound ) { if ( iEnd - iStart <= 1 ) { if ( datas[iStart].GetData() == SearchedNum ) { iFound = iStart; return true; } else if ( datas[iEnd].GetData() == SearchedNum ) { iFound = iEnd; return true; } else return false; } int mid = 0.5 * ( iStart + iEnd ); if ( datas[mid].GetData() == SearchedNum ) { iFound = mid; return true; } if ( datas[mid].GetData() > SearchedNum ) { if ( mid - 1 >= 0) return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound); else return false; } else { if ( mid + 1 <= iEnd ) return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound); else return false; } } bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue() { std::uniform_int_distribution<int> dis(1, NUM_DATA); default_random_engine engine{}; for ( long i = 0; i < NUM_ITEM; i++ ) { int x = dis ( engine ); int iFoundIndex; if ( BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex ) ) { Data* pData = &DataArray[iFoundIndex]; LockFreeQueue.enqueue( pData ); //BoostQueue.push( pData ); } } } bool Dequeue() { Data *pData; for ( long i = 0; i < NUM_ITEM; i ++) { while ( LockFreeQueue.dequeue( &pData ) ); //while ( BoostQueue.pop( pData ) ) ; } } int main(int argc, char** argv) { for ( int i = 1; i < NUM_DATA + 1; i++ ) { Data data(i); DataArray[i-1] = data; } std::thread PublishThread[NUM_ENQUEUE_THREAD]; std::thread ConsumerThread[NUM_DEQUEUE_THREAD]; std::chrono::duration<double> elapsed_seconds; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue ); } auto start = std::chrono::high_resolution_clock::now(); for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i] = std::thread{ Dequeue}; } for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i].join(); } auto end = std::chrono::high_resolution_clock::now(); elapsed_seconds = end - start; std::cout << "Enqueue and Dequeue 1 million item in:" << elapsed_seconds.count() << std::endl; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i].join(); } return 0; }
相关推荐
atomic_queue 基于带有循环缓冲区的C ++ 14多生产者多消费者无锁队列。 这些队列遵循的主要设计原理是极简主义:原子操作的最基本要求,固定大小的缓冲区,值语义。 这些品质也有局限性: 最大队列大小必须在编译...
Coordination-resolved C-C bond length and C-1s binding energy: energy: from atomic carbon, graphene, nanotubes, to graphite and diamond,孙长庆,聂彦光,Formulation of the photoemission measurement of...
Chapter Five: The C++ Memory Model and Operations on Atomic Types Chapter Six: Designing Data Structures for Concurrency I: Lock-based Data Structures Chapter Seven: Designing Data Structures ...
名称:Atomic AI ---------------------------------------- 版本:4.1.9 作者:http://www.atomicreach.com/ 分类:实用工具 ---------------------------------------- 概述:你做了很多内容,我们做得很好。 描述...
boost159-atomic-1.59.0-2.el7.x86_64.rpm
Safe Memory Reclamation for Dynamic Lock-Free Objects Using Atomic Reads and WritesMaged M. Michael IBM Thomas J. Watson Research CenterP.O. Box 218 Yorktown Heights NY 10598 USAmagedm@us.ibm....
离线安装包,亲测可用
离线安装包,测试可用
安全栈表实现,C++11实现,使用atomic特性,可以多线程进行操作
eslint-plugin-atomic-design 安装 您首先需要安装 : $ npm i eslint --save-dev 接下来,安装eslint-plugin-atomic-design : $ npm install eslint-plugin-atomic-design --save-dev 注意:如果全局安装了...
Linux平台下C++(C++98、C++03、C++11)实现的线程池 ThreadPoolCpp98 最古老的做法,只使用了C++98语言规范,采用**面向对象的思路**,每一个任务都是一个子类对象; ThreadPoolCpp03 较新做法,使用C++03语言...
Here you will learn about algebraic types such as std::optional, vocabulary types such as std::function, smart pointers, and synchronization primitives such as std::atomic and std::mutex. In the ...
直接从官网下载的live555源码在centos 64位下编译报错,此为修改后版本。
gatsby new my-default-starter https://github.com/Borcioo/gatsby-starter-darkmode-xstyle-atomic 开始开发。 导航到新站点的目录并启动。 cd my-default-starter/ gatsby develop 打开源代码并开始编辑! ...
想要彻底理解C++11和C++14,不可止步...std::atomic和volatile有怎样的区别,它们分别用于什么场合,以及它们和C++的并发API有何联系 “旧”C++程序设计(即C++98)中的最佳实践要求在现代C++的软件开发中作出哪些...
在为它创建USB2.0适配器板之后,我使用OpenSCAD为我的Atomic PI设计和3D打印了一个外壳: 我写了有关整个过程的。 请注意,我创建的两个OpenSCAD文件存储在此存储库中。 我以前关于出色的35 $ SBC的文章: 用的为其...
基于原子操作的无锁hashtable源码
and universal references Techniques for writing clear, correct, effective lambda expressions How std::atomic differs from volatile, how each should be used, and how they relate to C++'s concurrency ...
这是Atomic Design方法的样板,其中使用了一些很酷的东西,例如Storybook,Flow和CSS Modules。 随时进行测试,更改并适应一切。 什么是原子设计? 在设计界广为人知,Atomic Design帮助构建一致,可靠和可重用的...
原子指令是特殊的硬件指令,以不可分的方式对一个或多个内存位置执行操作。无论其他处理器执行什么指令...但是,由于它们是低层的,并且只能对数据结构进行小的更新,因此使用它们来实现并行数据结构是一项艰巨的任务。