拆分引用计数如何在无锁堆栈中工作?
我正在阅读 C++ concurrency in action 2。它为无锁堆栈引入了拆分引用计数。
一种可能的技术涉及对每个节点使用不是一个而是两个引用计数:内部计数和外部计数。这些值的总和就是对节点的引用总数。外部计数与指向节点的指针一起保存,每次读取指针时都会增加。当阅读器完成节点时,它会减少内部计数。读取指针的简单操作将在完成时使外部计数增加一,内部计数减一。
上面的短语解释了拆分引用计数的简要概念。听起来外部计数总是增加而内部计数总是减少。
当不再需要外部计数/指针配对时(节点不再能从多个线程可访问的位置访问),内部计数增加外部计数值减一,并丢弃外部计数器。一旦内部计数等于零,就没有对节点的未完成引用,可以安全地删除它。使用原子操作更新共享数据仍然很重要。现在让我们看看一个无锁堆栈的实现,它使用这种技术来确保只有在安全的情况下才回收节点。
但是在上面的短语中,当节点不再可访问时,内部计数应该增加外部计数的值减一。我对这个解释感到非常困惑。
(1) 使用内部和外部计数的确切目的是什么?
(2) 为什么它需要两个引用计数而不是一个?
编辑:我从书中添加了下面的示例代码。
template <typename T>
class lock_free_stack {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
struct node {
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_)
: data(std::make_shared<T>(data_)), internal_count(0) {}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
} while (!head.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
public:
~lock_free_stack() {
while (pop())
;
}
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load();
while (!head.atomic_compare_exchange_weak(new_node.ptr->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
;
}
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_head_count(old_head);
node* const ptr = old_head.ptr;
if (!ptr) {
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed)) {
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count - 2;
if (ptr->internal_count.fetch_add(
count_increase, std::memory_order_release) == -count_increase) {
delete ptr;
}
return res;
} else if (ptr->internal_count.fetch_add(-1, std::memory_order_relaxed) ==
1) {
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};