Skip to content

Commit

Permalink
add basic UnboundedQueue
Browse files Browse the repository at this point in the history
Summary: a very basic, coarsely locked UnboundedQueue for when you just gotta have unbounded behavior

Reviewed By: yfeldblum

Differential Revision: D5669658

fbshipit-source-id: 6827ed09d5ee4425757e756c1e30534d53b462c9
  • Loading branch information
James Sedgwick authored and facebook-github-bot committed Aug 27, 2017
1 parent 4a7fbd4 commit 0a4ae65
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
59 changes: 59 additions & 0 deletions wangle/concurrent/UnboundedBlockingQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2017, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/

#pragma once

#include <queue>

#include <folly/LifoSem.h>
#include <folly/Synchronized.h>
#include <wangle/concurrent/BlockingQueue.h>

namespace wangle {

// Warning: this is effectively just a std::deque wrapped in a single mutex
// We are aiming to add a more performant concurrent unbounded queue in the
// future, but this class is available if you must have an unbounded queue
// and can tolerate any contention.
template <class T>
class UnboundedBlockingQueue : public BlockingQueue<T> {
public:
virtual ~UnboundedBlockingQueue() {}

void add(T item) override {
queue_.wlock()->push(std::move(item));
sem_.post();
}

T take() override {
while (true) {
{
auto ulockedQueue = queue_.ulock();
if (!ulockedQueue->empty()) {
auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
T item = std::move(wlockedQueue->front());
wlockedQueue->pop();
return item;
}
}
sem_.wait();
}
}

size_t size() override {
return queue_.rlock()->size();
}

private:
folly::LifoSem sem_;
folly::Synchronized<std::queue<T>> queue_;
};

} // namespace wangle
46 changes: 46 additions & 0 deletions wangle/concurrent/test/UnboundedBlockingQueueTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2017, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/

#include <folly/portability/GTest.h>
#include <folly/Baton.h>
#include <wangle/concurrent/UnboundedBlockingQueue.h>
#include <thread>

using namespace wangle;

TEST(UnboundedQueuee, push_pop) {
UnboundedBlockingQueue<int> q;
q.add(42);
EXPECT_EQ(42, q.take());
}
TEST(UnboundedBlockingQueue, size) {
UnboundedBlockingQueue<int> q;
EXPECT_EQ(0, q.size());
q.add(42);
EXPECT_EQ(1, q.size());
q.take();
EXPECT_EQ(0, q.size());
}

TEST(UnboundedBlockingQueue, concurrent_push_pop) {
UnboundedBlockingQueue<int> q;
folly::Baton<> b1, b2;
std::thread t([&] {
b1.post();
EXPECT_EQ(42, q.take());
EXPECT_EQ(0, q.size());
b2.post();
});
b1.wait();
q.add(42);
b2.wait();
EXPECT_EQ(0, q.size());
t.join();
}

0 comments on commit 0a4ae65

Please sign in to comment.