Commit 27c6367d by Ben Clayton

Squashed 'third_party/marl/' changes from 539094011..748d3c161

748d3c161 Add usage recommendations to README.md 2939fcfed Replace marl::parallelize() 4b4600208 Fix stupid typo in front page code samples. 742bba9c8 Fix clang-tidy and go-vet warnings. git-subtree-dir: third_party/marl git-subtree-split: 748d3c16165c7dcff018273cfaf9d8f26a33333a
parent 8787897e
...@@ -242,6 +242,7 @@ if(MARL_BUILD_TESTS) ...@@ -242,6 +242,7 @@ if(MARL_BUILD_TESTS)
${MARL_SRC_DIR}/marl_test.h ${MARL_SRC_DIR}/marl_test.h
${MARL_SRC_DIR}/memory_test.cpp ${MARL_SRC_DIR}/memory_test.cpp
${MARL_SRC_DIR}/osfiber_test.cpp ${MARL_SRC_DIR}/osfiber_test.cpp
${MARL_SRC_DIR}/parallelize_test.cpp
${MARL_SRC_DIR}/pool_test.cpp ${MARL_SRC_DIR}/pool_test.cpp
${MARL_SRC_DIR}/scheduler_test.cpp ${MARL_SRC_DIR}/scheduler_test.cpp
${MARL_SRC_DIR}/ticket_test.cpp ${MARL_SRC_DIR}/ticket_test.cpp
......
...@@ -33,31 +33,31 @@ int main() { ...@@ -33,31 +33,31 @@ int main() {
constexpr int numTasks = 10; constexpr int numTasks = 10;
// Create an event that is manually reset. // Create an event that is manually reset.
marl::Event sayHellow(marl::Event::Mode::Manual); marl::Event sayHello(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks. // Create a WaitGroup with an initial count of numTasks.
marl::WaitGroup saidHellow(numTasks); marl::WaitGroup saidHello(numTasks);
// Schedule some tasks to run asynchronously. // Schedule some tasks to run asynchronously.
for (int i = 0; i < numTasks; i++) { for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads. // Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value. marl::schedule([=] { // All marl primitives are capture-by-value.
// Decrement the WaitGroup counter when the task has finished. // Decrement the WaitGroup counter when the task has finished.
defer(saidHellow.done()); defer(saidHello.done());
printf("Task %d waiting to say hello...\n", i); printf("Task %d waiting to say hello...\n", i);
// Blocking in a task? // Blocking in a task?
// The scheduler will find something else for this thread to do. // The scheduler will find something else for this thread to do.
sayHellow.wait(); sayHello.wait();
printf("Hello from task %d!\n", i); printf("Hello from task %d!\n", i);
}); });
} }
sayHellow.signal(); // Unblock all the tasks. sayHello.signal(); // Unblock all the tasks.
saidHellow.wait(); // Wait for all tasks to complete. saidHello.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n"); printf("All tasks said hello.\n");
...@@ -123,6 +123,81 @@ set(MARL_GOOGLETEST_DIR <path-to-googletest>) # defaults to ${MARL_THIR ...@@ -123,6 +123,81 @@ set(MARL_GOOGLETEST_DIR <path-to-googletest>) # defaults to ${MARL_THIR
add_subdirectory(${MARL_DIR}) add_subdirectory(${MARL_DIR})
``` ```
### Usage Recommendations
#### Capture marl synchronization primitves by value
All marl synchronization primitves aside from `marl::ConditionVariable` should be lambda-captured by **value**:
```c++
marl::Event event;
marl::schedule([=]{ // [=] Good, [&] Bad.
event.signal();
})
```
Internally, these primitives hold a shared pointer to the primitive state. By capturing by value we avoid common issues where the primitive may be destructed before the last reference is used.
#### Create one instance of `marl::Scheduler`, use it for the lifetime of the process.
`marl::Scheduler::setWorkerThreadCount()` is an expensive operation as it spawn a number of hardware threads. \
Destructing the `marl::Scheduler` requires waiting on all tasks to complete.
Multiple `marl::Scheduler`s may fight each other for hardware thread utilization.
For these reasons, it is recommended to create a single `marl::Scheduler` for the lifetime of your process.
For example:
```c++
int main() {
marl::Scheduler scheduler;
scheduler.bind();
scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
defer(scheduler.unbind());
return do_program_stuff();
}
```
#### Bind the scheduler to externally created threads
In order to call `marl::schedule()` the scheduler must be bound to the calling thread. Failure to bind the scheduler to the thread before calling `marl::schedule()` will result in undefined behavior.
`marl::Scheduler` may be simultaneously bound to any number of threads, and the scheduler can be retrieved from a bound thread with `marl::Scheduler::get()`.
A typical way to pass the scheduler from one thread to another would be:
```c++
std::thread spawn_new_thread() {
// Grab the scheduler from the currently running thread.
marl::Scheduler* scheduler = marl::Scheduler::get();
// Spawn the new thread.
return std::thread([=] {
// Bind the scheduler to the new thread.
scheduler->bind();
defer(scheduler->unbind());
// You can now safely call `marl::schedule()`
run_thread_logic();
});
}
```
Always remember to unbind the scheduler before terminating the thread. Forgetting to unbind will result in the `marl::Scheduler` destructor blocking indefinitely.
#### Don't use externally blocking calls in marl tasks
The `marl::Scheduler` internally holds a number of worker threads which will execute the scheduled tasks. If a marl task becomes blocked on a marl synchronization primitive, marl can yield from the blocked task and continue execution of other scheduled tasks.
Calling a non-marl blocking function on a marl worker thread will prevent that worker thread from being able to switch to execute other tasks until the blocking function has returned. Examples of these non-marl blocking functions include: [`std::mutex::lock()`](https://en.cppreference.com/w/cpp/thread/mutex/lock), [`std::condition_variable::wait()`](https://en.cppreference.com/w/cpp/thread/condition_variable/wait), [`accept()`](http://man7.org/linux/man-pages/man2/accept.2.html).
Short blocking calls are acceptable, such as a mutex lock to access a data structure. However be careful that you do not use a marl blocking call with a `std::mutex` lock held - the marl task may yield with the lock held, and block other tasks from re-locking the mutex. This sort of situation may end up with a deadlock.
If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/master/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
--- ---
Note: This is not an officially supported Google product Note: This is not an officially supported Google product
...@@ -32,31 +32,31 @@ int main() { ...@@ -32,31 +32,31 @@ int main() {
constexpr int numTasks = 10; constexpr int numTasks = 10;
// Create an event that is manually reset. // Create an event that is manually reset.
marl::Event sayHellow(marl::Event::Mode::Manual); marl::Event sayHello(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks. // Create a WaitGroup with an initial count of numTasks.
marl::WaitGroup saidHellow(numTasks); marl::WaitGroup saidHello(numTasks);
// Schedule some tasks to run asynchronously. // Schedule some tasks to run asynchronously.
for (int i = 0; i < numTasks; i++) { for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads. // Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value. marl::schedule([=] { // All marl primitives are capture-by-value.
// Decrement the WaitGroup counter when the task has finished. // Decrement the WaitGroup counter when the task has finished.
defer(saidHellow.done()); defer(saidHello.done());
printf("Task %d waiting to say hello...\n", i); printf("Task %d waiting to say hello...\n", i);
// Blocking in a task? // Blocking in a task?
// The scheduler will find something else for this thread to do. // The scheduler will find something else for this thread to do.
sayHellow.wait(); sayHello.wait();
printf("Hello from task %d!\n", i); printf("Hello from task %d!\n", i);
}); });
} }
sayHellow.signal(); // Unblock all the tasks. sayHello.signal(); // Unblock all the tasks.
saidHellow.wait(); // Wait for all tasks to complete. saidHello.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n"); printf("All tasks said hello.\n");
......
// Copyright 2020 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef marl_parallelize_h
#define marl_parallelize_h
#include "scheduler.h"
#include "waitgroup.h"
namespace marl {
namespace detail {
void parallelizeChain(WaitGroup*) {}
template <typename F, typename... L>
void parallelizeChain(WaitGroup* wg, F&& f, L&&... l) {
schedule([=] {
f();
wg->done();
});
parallelizeChain(wg, std::forward<L>(l)...);
}
} // namespace detail
// parallelize() schedules all the function parameters and waits for them to
// complete. These functions may execute concurrently.
// Each function must take no parameters.
template <typename... FUNCTIONS>
inline void parallelize(FUNCTIONS&&... functions) {
WaitGroup wg(sizeof...(FUNCTIONS));
detail::parallelizeChain(&wg, functions...);
wg.wait();
}
} // namespace marl
#endif // marl_parallelize_h
...@@ -255,7 +255,7 @@ class Scheduler { ...@@ -255,7 +255,7 @@ class Scheduler {
// take() returns the next fiber that has exceeded its timeout, or nullptr // take() returns the next fiber that has exceeded its timeout, or nullptr
// if there are no fibers that have yet exceeded their timeouts. // if there are no fibers that have yet exceeded their timeouts.
inline Fiber* take(const TimePoint& timepoint); inline Fiber* take(const TimePoint& timeout);
// next() returns the timepoint of the next fiber to timeout. // next() returns the timepoint of the next fiber to timeout.
// next() can only be called if operator bool() returns true. // next() can only be called if operator bool() returns true.
......
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef marl_util_h
#define marl_util_h
#include "scheduler.h"
#include "waitgroup.h"
namespace marl {
// parallelize() is used to split a number of work items into N smaller batches
// which can be processed in parallel with the function f().
// numTotal is the total number of work items to process.
// numPerTask is the maximum number of work items to process per call to f().
// There will always be at least one call to f().
// F must be a function with the signature:
// void(COUNTER taskIndex, COUNTER first, COUNTER count)
// COUNTER is any integer type.
template <typename F, typename COUNTER>
inline void parallelize(COUNTER numTotal, COUNTER numPerTask, const F& f) {
auto numTasks = (numTotal + numPerTask - 1) / numPerTask;
WaitGroup wg(numTasks - 1);
for (unsigned int task = 1; task < numTasks; task++) {
schedule([=] {
auto first = task * numPerTask;
auto count = std::min(first + numPerTask, numTotal) - first;
f(task, first, count);
wg.done();
});
}
// Run the first chunk on this fiber to reduce the amount of time spent
// waiting.
f(0, 0, std::min(numPerTask, numTotal));
wg.wait();
}
} // namespace marl
#endif // marl_util_h
// Copyright 2020 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "marl_test.h"
#include "marl/parallelize.h"
TEST_P(WithBoundScheduler, Parallelize) {
bool a = false;
bool b = false;
bool c = false;
marl::parallelize([&] { a = true; }, [&] { b = true; }, [&] { c = true; });
ASSERT_TRUE(a);
ASSERT_TRUE(b);
ASSERT_TRUE(c);
}
...@@ -120,7 +120,7 @@ void Scheduler::unbind() { ...@@ -120,7 +120,7 @@ void Scheduler::unbind() {
"singleThreadedWorker not found"); "singleThreadedWorker not found");
MARL_ASSERT(it->second.get() == worker, "worker is not bound?"); MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
bound->singleThreadedWorkers.byTid.erase(it); bound->singleThreadedWorkers.byTid.erase(it);
if (bound->singleThreadedWorkers.byTid.size() == 0) { if (bound->singleThreadedWorkers.byTid.empty()) {
bound->singleThreadedWorkers.unbind.notify_one(); bound->singleThreadedWorkers.unbind.notify_one();
} }
} }
...@@ -140,7 +140,7 @@ Scheduler::~Scheduler() { ...@@ -140,7 +140,7 @@ Scheduler::~Scheduler() {
marl::lock lock(singleThreadedWorkers.mutex); marl::lock lock(singleThreadedWorkers.mutex);
lock.wait(singleThreadedWorkers.unbind, lock.wait(singleThreadedWorkers.unbind,
[this]() REQUIRES(singleThreadedWorkers.mutex) { [this]() REQUIRES(singleThreadedWorkers.mutex) {
return singleThreadedWorkers.byTid.size() == 0; return singleThreadedWorkers.byTid.empty();
}); });
} }
...@@ -149,9 +149,9 @@ Scheduler::~Scheduler() { ...@@ -149,9 +149,9 @@ Scheduler::~Scheduler() {
setWorkerThreadCount(0); setWorkerThreadCount(0);
} }
void Scheduler::setThreadInitializer(const std::function<void()>& func) { void Scheduler::setThreadInitializer(const std::function<void()>& init) {
marl::lock lock(threadInitFuncMutex); marl::lock lock(threadInitFuncMutex);
threadInitFunc = func; threadInitFunc = init;
} }
const std::function<void()>& Scheduler::getThreadInitializer() { const std::function<void()>& Scheduler::getThreadInitializer() {
...@@ -299,15 +299,15 @@ const char* Scheduler::Fiber::toString(State state) { ...@@ -299,15 +299,15 @@ const char* Scheduler::Fiber::toString(State state) {
// Scheduler::WaitingFibers // Scheduler::WaitingFibers
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Scheduler::WaitingFibers::operator bool() const { Scheduler::WaitingFibers::operator bool() const {
return fibers.size() > 0; return !fibers.empty();
} }
Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) { Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
if (!*this) { if (!*this) {
return nullptr; return nullptr;
} }
auto it = timeouts.begin(); auto it = timeouts.begin();
if (timepoint < it->timepoint) { if (timeout < it->timepoint) {
return nullptr; return nullptr;
} }
auto fiber = it->fiber; auto fiber = it->fiber;
...@@ -324,9 +324,9 @@ Scheduler::TimePoint Scheduler::WaitingFibers::next() const { ...@@ -324,9 +324,9 @@ Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
return timeouts.begin()->timepoint; return timeouts.begin()->timepoint;
} }
void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) { void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
timeouts.emplace(Timeout{timepoint, fiber}); timeouts.emplace(Timeout{timeout, fiber});
bool added = fibers.emplace(fiber, timepoint).second; bool added = fibers.emplace(fiber, timeout).second;
(void)added; (void)added;
MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting"); MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
} }
...@@ -474,13 +474,13 @@ void Scheduler::Worker::suspend( ...@@ -474,13 +474,13 @@ void Scheduler::Worker::suspend(
work.numBlockedFibers++; work.numBlockedFibers++;
if (work.fibers.size() > 0) { if (!work.fibers.empty()) {
// There's another fiber that has become unblocked, resume that. // There's another fiber that has become unblocked, resume that.
work.num--; work.num--;
auto to = take(work.fibers); auto to = take(work.fibers);
ASSERT_FIBER_STATE(to, Fiber::State::Queued); ASSERT_FIBER_STATE(to, Fiber::State::Queued);
switchToFiber(to); switchToFiber(to);
} else if (idleFibers.size() > 0) { } else if (!idleFibers.empty()) {
// There's an old fiber we can reuse, resume that. // There's an old fiber we can reuse, resume that.
auto to = take(idleFibers); auto to = take(idleFibers);
ASSERT_FIBER_STATE(to, Fiber::State::Idle); ASSERT_FIBER_STATE(to, Fiber::State::Idle);
...@@ -518,7 +518,7 @@ void Scheduler::Worker::enqueue(Fiber* fiber) { ...@@ -518,7 +518,7 @@ void Scheduler::Worker::enqueue(Fiber* fiber) {
break; break;
} }
notify = work.notifyAdded; notify = work.notifyAdded;
work.fibers.push_back(std::move(fiber)); work.fibers.push_back(fiber);
MARL_ASSERT(!work.waiting.contains(fiber), MARL_ASSERT(!work.waiting.contains(fiber),
"fiber is unexpectedly in the waiting list"); "fiber is unexpectedly in the waiting list");
setFiberState(fiber, Fiber::State::Queued); setFiberState(fiber, Fiber::State::Queued);
...@@ -552,8 +552,7 @@ bool Scheduler::Worker::steal(Task& out) { ...@@ -552,8 +552,7 @@ bool Scheduler::Worker::steal(Task& out) {
if (!work.mutex.try_lock()) { if (!work.mutex.try_lock()) {
return false; return false;
} }
if (work.tasks.size() == 0 || if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
work.tasks.front().is(Task::Flags::SameThread)) {
work.mutex.unlock(); work.mutex.unlock();
return false; return false;
} }
...@@ -668,12 +667,12 @@ void Scheduler::Worker::runUntilIdle() { ...@@ -668,12 +667,12 @@ void Scheduler::Worker::runUntilIdle() {
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running); ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(), MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
"work.num out of sync"); "work.num out of sync");
while (work.fibers.size() > 0 || work.tasks.size() > 0) { while (!work.fibers.empty() || !work.tasks.empty()) {
// Note: we cannot take and store on the stack more than a single fiber // Note: we cannot take and store on the stack more than a single fiber
// or task at a time, as the Fiber may yield and these items may get // or task at a time, as the Fiber may yield and these items may get
// held on suspended fiber stack. // held on suspended fiber stack.
while (work.fibers.size() > 0) { while (!work.fibers.empty()) {
work.num--; work.num--;
auto fiber = take(work.fibers); auto fiber = take(work.fibers);
// Sanity checks, // Sanity checks,
...@@ -690,7 +689,7 @@ void Scheduler::Worker::runUntilIdle() { ...@@ -690,7 +689,7 @@ void Scheduler::Worker::runUntilIdle() {
changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running); changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
} }
if (work.tasks.size() > 0) { if (!work.tasks.empty()) {
work.num--; work.num--;
auto task = take(work.tasks); auto task = take(work.tasks);
work.mutex.unlock(); work.mutex.unlock();
......
...@@ -69,7 +69,6 @@ func Parse(s string) (Benchmark, error) { ...@@ -69,7 +69,6 @@ func Parse(s string) (Benchmark, error) {
case nil: case nil:
return b, nil return b, nil
case errWrongFormat: case errWrongFormat:
break
default: default:
return Benchmark{}, err return Benchmark{}, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment