Commit 95813a8a by Ben Clayton

Yarn: Add WaitGroup

WaitGroup is a synchronization primitive that holds an internal counter that can incremented, decremented and waited on until it reaches 0. WaitGroups can be used as a simple mechanism for waiting on a number of concurrently execute a number of tasks to complete. Bug: b/139010488 Change-Id: I086859b81509076de3dbce8a5fde656ab4e4e347 Reviewed-on: https://swiftshader-review.googlesource.com/c/SwiftShader/+/34816Tested-by: 's avatarBen Clayton <bclayton@google.com> Reviewed-by: 's avatarNicolas Capens <nicolascapens@google.com> Kokoro-Presubmit: kokoro <noreply+kokoro@google.com>
parent a74c7d9f
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include "Yarn_test.hpp" #include "Yarn_test.hpp"
#include "Yarn/WaitGroup.hpp"
TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct) TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct)
{ {
auto scheduler = new yarn::Scheduler(); auto scheduler = new yarn::Scheduler();
...@@ -44,3 +46,69 @@ TEST_P(WithBoundScheduler, DestructWithPendingTasks) ...@@ -44,3 +46,69 @@ TEST_P(WithBoundScheduler, DestructWithPendingTasks)
yarn::schedule([] {}); yarn::schedule([] {});
} }
} }
TEST_P(WithBoundScheduler, DestructWithPendingFibers)
{
yarn::WaitGroup wg(1);
for (int i = 0; i < 10000; i++)
{
yarn::schedule([=] { wg.wait(); });
}
wg.done();
auto scheduler = yarn::Scheduler::get();
scheduler->unbind();
delete scheduler;
// Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
(new yarn::Scheduler())->bind();
}
TEST_P(WithBoundScheduler, FibersResumeOnSameYarnThread)
{
yarn::WaitGroup fence(1);
yarn::WaitGroup wg(1000);
for (int i = 0; i < 1000; i++)
{
yarn::schedule([=] {
auto threadID = std::this_thread::get_id();
fence.wait();
ASSERT_EQ(threadID, std::this_thread::get_id());
wg.done();
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
fence.done();
wg.wait();
}
TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread)
{
auto scheduler = yarn::Scheduler::get();
yarn::WaitGroup fence(1);
yarn::WaitGroup wg(1000);
std::vector<std::thread> threads;
for (int i = 0; i < 1000; i++)
{
threads.push_back(std::thread([=] {
scheduler->bind();
auto threadID = std::this_thread::get_id();
fence.wait();
ASSERT_EQ(threadID, std::this_thread::get_id());
wg.done();
scheduler->unbind();
}));
}
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // just to try and get some tasks to yield.
fence.done();
wg.wait();
for (auto& thread : threads)
{
thread.join();
}
}
\ No newline at end of file
// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef yarn_waitgroup_hpp
#define yarn_waitgroup_hpp
#include "ConditionVariable.hpp"
#include "Debug.hpp"
#include <atomic>
#include <mutex>
namespace yarn {
// WaitGroup is a synchronization primitive that holds an internal counter that
// can incremented, decremented and waited on until it reaches 0.
// WaitGroups can be used as a simple mechanism for waiting on a number of
// concurrently execute a number of tasks to complete.
//
// Example:
//
// void runTasksConcurrently(int numConcurrentTasks)
// {
// // Construct the WaitGroup with an initial count of numConcurrentTasks.
// yarn::WaitGroup wg(numConcurrentTasks);
// for (int i = 0; i < numConcurrentTasks; i++)
// {
// // Schedule a task to be run asynchronously.
// // These may all be run concurrently.
// yarn::schedule([=] {
// // Once the task has finished, decrement the waitgroup counter
// // to signal that this has completed.
// defer(wg.done());
// doSomeWork();
// });
// }
// // Block until all tasks have completed.
// wg.wait();
// }
class WaitGroup
{
public:
// Constructs the WaitGroup with the specified initial count.
inline WaitGroup(unsigned int initialCount = 0);
// add() increments the internal counter by count.
inline void add(unsigned int count = 1) const;
// done() decrements the internal counter by one.
// Returns true if the internal count has reached zero.
inline bool done() const;
// wait() blocks until the WaitGroup counter reaches zero.
inline void wait() const;
private:
struct Data
{
std::atomic<unsigned int> count = { 0 };
ConditionVariable condition;
std::mutex mutex;
};
const std::shared_ptr<Data> data = std::make_shared<Data>();
};
inline WaitGroup::WaitGroup(unsigned int initialCount /* = 0 */)
{
data->count = initialCount;
}
void WaitGroup::add(unsigned int count /* = 1 */) const
{
data->count += count;
}
bool WaitGroup::done() const
{
YARN_ASSERT(data->count > 0, "yarn::WaitGroup::done() called too many times");
auto count = --data->count;
if (count == 0)
{
std::unique_lock<std::mutex> lock(data->mutex);
data->condition.notify_all();
return true;
}
return false;
}
void WaitGroup::wait() const
{
std::unique_lock<std::mutex> lock(data->mutex);
data->condition.wait(lock, [this]{ return data->count == 0; });
}
} // namespace yarn
#endif // yarn_waitgroup_hpp
// Copyright 2019 The SwiftShader Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "Yarn_test.hpp"
#include "Yarn/WaitGroup.hpp"
TEST(WithoutBoundScheduler, WaitGroupDone)
{
yarn::WaitGroup wg(2); // Should not require a scheduler.
wg.done();
wg.done();
}
#if YARN_DEBUG_ENABLED
TEST(WithoutBoundScheduler, WaitGroupDoneTooMany)
{
yarn::WaitGroup wg(2); // Should not require a scheduler.
wg.done();
wg.done();
EXPECT_DEATH(wg.done(), "done\\(\\) called too many times");
}
#endif // YARN_DEBUG_ENABLED
TEST_P(WithBoundScheduler, WaitGroup_OneTask)
{
yarn::WaitGroup wg(1);
std::atomic<int> counter = {0};
yarn::schedule([&counter, wg] {
counter++;
wg.done();
});
wg.wait();
ASSERT_EQ(counter.load(), 1);
}
TEST_P(WithBoundScheduler, WaitGroup_10Tasks)
{
yarn::WaitGroup wg(10);
std::atomic<int> counter = {0};
for (int i = 0; i < 10; i++)
{
yarn::schedule([&counter, wg] {
counter++;
wg.done();
});
}
wg.wait();
ASSERT_EQ(counter.load(), 10);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment