Commit 6dd9ff1e by Ben Clayton

Squashed 'third_party/marl/' changes from 49e4e3141..14e4d862a

14e4d862a Scheduler: Fix issues with fiber lists a timeouts. 57f41915d SwiftShader build fixes. 791187298 Add Event::any(). ecd5ab322 Implement yields with timeouts, wait_for() / wait_until() 6ba730d94 Update README.md 8348be4f0 Implement page-based functions for Fuchsia 5e512cd0c Fix condition logic of assert in TasksOnlyScheduledOnWorkerThreads. 37ae48f40 Add marl::Event. a90725760 Update README.md (#49) git-subtree-dir: third_party/marl git-subtree-split: 14e4d862a959b831fd994a436e7c104c6fd19006
parent 193ce894
......@@ -172,6 +172,7 @@ if(MARL_BUILD_TESTS)
${MARL_SRC_DIR}/conditionvariable_test.cpp
${MARL_SRC_DIR}/containers_test.cpp
${MARL_SRC_DIR}/defer_test.cpp
${MARL_SRC_DIR}/event_test.cpp
${MARL_SRC_DIR}/marl_test.cpp
${MARL_SRC_DIR}/marl_test.h
${MARL_SRC_DIR}/memory_test.cpp
......
......@@ -10,17 +10,11 @@ Marl uses a combination of fibers and threads to allow efficient execution of ta
Marl supports Windows, macOS, Linux, Fuchsia and Android (arm, aarch64, ppc64 (ELFv2), x86 and x64).
Marl has no dependencies on other libraries (with exception on googletest for building the optional unit tests).
Marl is in early development and will have breaking API changes.
**More documentation and examples coming soon.**
Note: This is not an officially supported Google product
Marl has no dependencies on other libraries (with an exception on googletest for building the optional unit tests).
## Building
Marl contains a number of unit tests and examples which can be built using CMake.
Marl contains many unit tests and examples that can be built using CMake.
Unit tests require fetching the `googletest` external project, which can be done by typing the following in your terminal:
......@@ -66,3 +60,7 @@ You will also want to add the `marl` public headers to your project's include se
```cmake
target_include_directories($<target> PRIVATE "${MARL_DIR}/include") # replace <target> with the name of your project's target
```
---
Note: This is not an officially supported Google product
......@@ -15,13 +15,14 @@
#ifndef marl_condition_variable_h
#define marl_condition_variable_h
#include "containers.h"
#include "debug.h"
#include "defer.h"
#include "scheduler.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <unordered_set>
namespace marl {
......@@ -43,11 +44,29 @@ class ConditionVariable {
// wait() blocks the current fiber or thread until the predicate is satisfied
// and the ConditionVariable is notified.
template <typename Predicate>
inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
inline void wait(std::unique_lock<std::mutex>& lock, Predicate&& pred);
// wait_for() blocks the current fiber or thread until the predicate is
// satisfied, and the ConditionVariable is notified, or the timeout has been
// reached. Returns false if pred still evaluates to false after the timeout
// has been reached, otherwise true.
template <typename Rep, typename Period, typename Predicate>
bool wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& duration,
Predicate&& pred);
// wait_until() blocks the current fiber or thread until the predicate is
// satisfied, and the ConditionVariable is notified, or the timeout has been
// reached. Returns false if pred still evaluates to false after the timeout
// has been reached, otherwise true.
template <typename Clock, typename Duration, typename Predicate>
bool wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
Predicate&& pred);
private:
std::mutex mutex;
containers::vector<Scheduler::Fiber*, 4> waiting;
std::unordered_set<Scheduler::Fiber*> waiting;
std::condition_variable condition;
std::atomic<int> numWaiting = {0};
std::atomic<int> numWaitingOnCondition = {0};
......@@ -58,12 +77,12 @@ void ConditionVariable::notify_one() {
return;
}
std::unique_lock<std::mutex> lock(mutex);
if (waiting.size() > 0) {
auto fiber = waiting.back();
waiting.pop_back();
for (auto fiber : waiting) {
fiber->schedule();
}
waiting.clear();
lock.unlock();
if (numWaitingOnCondition > 0) {
condition.notify_one();
}
......@@ -74,20 +93,20 @@ void ConditionVariable::notify_all() {
return;
}
std::unique_lock<std::mutex> lock(mutex);
while (waiting.size() > 0) {
auto fiber = waiting.back();
waiting.pop_back();
for (auto fiber : waiting) {
fiber->schedule();
}
waiting.clear();
lock.unlock();
if (numWaitingOnCondition > 0) {
condition.notify_all();
}
}
template <typename Predicate>
void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock,
Predicate pred) {
void ConditionVariable::wait(std::unique_lock<std::mutex>& lock,
Predicate&& pred) {
if (pred()) {
return;
}
......@@ -97,23 +116,71 @@ void ConditionVariable::wait(std::unique_lock<std::mutex>& dataLock,
// Yield to let other tasks run that can unblock this fiber.
while (!pred()) {
mutex.lock();
waiting.push_back(fiber);
waiting.emplace(fiber);
mutex.unlock();
dataLock.unlock();
lock.unlock();
fiber->yield();
dataLock.lock();
lock.lock();
}
} else {
// Currently running outside of the scheduler.
// Delegate to the std::condition_variable.
numWaitingOnCondition++;
condition.wait(dataLock, pred);
condition.wait(lock, pred);
numWaitingOnCondition--;
}
numWaiting--;
}
template <typename Rep, typename Period, typename Predicate>
bool ConditionVariable::wait_for(
std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& duration,
Predicate&& pred) {
return wait_until(lock, std::chrono::system_clock::now() + duration, pred);
}
template <typename Clock, typename Duration, typename Predicate>
bool ConditionVariable::wait_until(
std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
Predicate&& pred) {
if (pred()) {
return true;
}
numWaiting++;
defer(numWaiting--);
if (auto fiber = Scheduler::Fiber::current()) {
// Currently executing on a scheduler fiber.
// Yield to let other tasks run that can unblock this fiber.
while (!pred()) {
mutex.lock();
waiting.emplace(fiber);
mutex.unlock();
lock.unlock();
fiber->yield_until(timeout);
lock.lock();
if (std::chrono::system_clock::now() >= timeout) {
mutex.lock();
waiting.erase(fiber);
mutex.unlock();
return false;
}
}
return true;
} else {
// Currently running outside of the scheduler.
// Delegate to the std::condition_variable.
numWaitingOnCondition++;
defer(numWaitingOnCondition--);
return condition.wait_until(lock, timeout, pred);
}
}
} // namespace marl
#endif // marl_condition_variable_h
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef marl_event_h
#define marl_event_h
#include "conditionvariable.h"
#include "containers.h"
#include "memory.h"
#include <chrono>
namespace marl {
// Event is a synchronization primitive used to block until a signal is raised.
class Event {
public:
enum class Mode {
// The event signal will be automatically reset when a call to wait()
// returns.
// A single call to signal() will only unblock a single (possibly
// future) call to wait().
Auto,
// While the event is in the signaled state, any calls to wait() will
// unblock without automatically reseting the signaled state.
// The signaled state can be reset with a call to clear().
Manual
};
inline Event(Mode mode = Mode::Auto,
bool initialState = false,
Allocator* allocator = Allocator::Default);
// signal() signals the event, possibly unblocking a call to wait().
inline void signal() const;
// clear() clears the signaled state.
inline void clear() const;
// wait() blocks until the event is signaled.
// If the event was constructed with the Auto Mode, then only one
// call to wait() will unblock before returning, upon which the signalled
// state will be automatically cleared.
inline void wait() const;
// wait_for() blocks until the event is signaled, or the timeout has been
// reached.
// If the timeout was reached, then wait_for() return false.
// If the event is signalled and event was constructed with the Auto Mode,
// then only one call to wait() will unblock before returning, upon which the
// signalled state will be automatically cleared.
template <typename Rep, typename Period>
inline bool wait_for(
const std::chrono::duration<Rep, Period>& duration) const;
// wait_until() blocks until the event is signaled, or the timeout has been
// reached.
// If the timeout was reached, then wait_for() return false.
// If the event is signalled and event was constructed with the Auto Mode,
// then only one call to wait() will unblock before returning, upon which the
// signalled state will be automatically cleared.
template <typename Clock, typename Duration>
inline bool wait_until(
const std::chrono::time_point<Clock, Duration>& timeout) const;
// test() returns true if the event is signaled, otherwise false.
// If the event is signalled and was constructed with the Auto Mode
// then the signalled state will be automatically cleared upon returning.
inline bool test() const;
// isSignalled() returns true if the event is signaled, otherwise false.
// Unlike test() the signal is not automatically cleared when the event was
// constructed with the Auto Mode.
// Note: No lock is held after bool() returns, so the event state may
// immediately change after returning. Use with caution.
inline bool isSignalled() const;
// any returns an event that is automatically signalled whenever any of the
// events in the list are signalled.
template <typename Iterator>
inline static Event any(Mode mode,
const Iterator& begin,
const Iterator& end);
// any returns an event that is automatically signalled whenever any of the
// events in the list are signalled.
// This overload defaults to using the Auto mode.
template <typename Iterator>
inline static Event any(const Iterator& begin, const Iterator& end);
private:
struct Shared {
inline Shared(Mode mode, bool initialState);
inline void signal();
inline void wait();
template <typename Rep, typename Period>
inline bool wait_for(const std::chrono::duration<Rep, Period>& duration);
template <typename Clock, typename Duration>
inline bool wait_until(
const std::chrono::time_point<Clock, Duration>& timeout);
std::mutex mutex;
ConditionVariable cv;
const Mode mode;
bool signalled;
containers::vector<std::shared_ptr<Shared>, 2> deps;
};
const std::shared_ptr<Shared> shared;
};
Event::Shared::Shared(Mode mode, bool initialState)
: mode(mode), signalled(initialState) {}
void Event::Shared::signal() {
std::unique_lock<std::mutex> lock(mutex);
if (signalled) {
return;
}
signalled = true;
if (mode == Mode::Auto) {
cv.notify_one();
} else {
cv.notify_all();
}
for (auto dep : deps) {
dep->signal();
}
}
void Event::Shared::wait() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return signalled; });
if (mode == Mode::Auto) {
signalled = false;
}
}
template <typename Rep, typename Period>
bool Event::Shared::wait_for(
const std::chrono::duration<Rep, Period>& duration) {
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, duration, [&] { return signalled; })) {
return false;
}
if (mode == Mode::Auto) {
signalled = false;
}
return true;
}
template <typename Clock, typename Duration>
bool Event::Shared::wait_until(
const std::chrono::time_point<Clock, Duration>& timeout) {
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_until(lock, timeout, [&] { return signalled; })) {
return false;
}
if (mode == Mode::Auto) {
signalled = false;
}
return true;
}
Event::Event(Mode mode /* = Mode::Auto */,
bool initialState /* = false */,
Allocator* allocator /* = Allocator::Default */)
: shared(allocator->make_shared<Shared>(mode, initialState)) {}
void Event::signal() const {
shared->signal();
}
void Event::clear() const {
std::unique_lock<std::mutex> lock(shared->mutex);
shared->signalled = false;
}
void Event::wait() const {
shared->wait();
}
template <typename Rep, typename Period>
bool Event::wait_for(const std::chrono::duration<Rep, Period>& duration) const {
return shared->wait_for(duration);
}
template <typename Clock, typename Duration>
bool Event::wait_until(
const std::chrono::time_point<Clock, Duration>& timeout) const {
return shared->wait_until(timeout);
}
bool Event::test() const {
std::unique_lock<std::mutex> lock(shared->mutex);
if (!shared->signalled) {
return false;
}
if (shared->mode == Mode::Auto) {
shared->signalled = false;
}
return true;
}
bool Event::isSignalled() const {
std::unique_lock<std::mutex> lock(shared->mutex);
return shared->signalled;
}
template <typename Iterator>
Event Event::any(Mode mode, const Iterator& begin, const Iterator& end) {
Event any(mode, false);
for (auto it = begin; it != end; it++) {
auto s = it->shared;
std::unique_lock<std::mutex> lock(s->mutex);
if (s->signalled) {
any.signal();
}
s->deps.push_back(any.shared);
}
return any;
}
template <typename Iterator>
Event Event::any(const Iterator& begin, const Iterator& end) {
return any(Mode::Auto, begin, end);
}
} // namespace marl
#endif // marl_event_h
......@@ -21,12 +21,16 @@
#include <array>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>
#include <queue>
#include <set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
namespace marl {
......@@ -45,6 +49,8 @@ class Scheduler {
class Worker;
public:
using TimePoint = std::chrono::system_clock::time_point;
Scheduler(Allocator* allocator = Allocator::Default);
~Scheduler();
......@@ -101,6 +107,14 @@ class Scheduler {
// yield() must only be called on the currently executing fiber.
void yield();
// yield_until() suspends execution of this Fiber, allowing the thread to
// work on other tasks. yield_until() may automatically resume sometime
// after timeout.
// yield_until() must only be called on the currently executing fiber.
template <typename Clock, typename Duration>
inline void yield_until(
const std::chrono::time_point<Clock, Duration>& timeout);
// schedule() reschedules the suspended Fiber for execution.
void schedule();
......@@ -113,6 +127,8 @@ class Scheduler {
Fiber(Allocator::unique_ptr<OSFiber>&&, uint32_t id);
void yield_until_sc(const TimePoint& timeout);
// switchTo() switches execution to the given fiber.
// switchTo() must only be called on the currently executing fiber.
void switchTo(Fiber*);
......@@ -143,10 +159,40 @@ class Scheduler {
// Maximum number of worker threads.
static constexpr size_t MaxWorkerThreads = 256;
// WaitingFibers holds all the fibers waiting on a timeout.
struct WaitingFibers {
// operator bool() returns true iff there are any wait fibers.
inline operator bool() const;
// take() returns the next fiber that has exceeded its timeout, or nullptr
// if there are no fibers that have yet exceeded their timeouts.
inline Fiber* take(const TimePoint& timepoint);
// next() returns the timepoint of the next fiber to timeout.
// next() can only be called if operator bool() returns true.
inline TimePoint next() const;
// add() adds another fiber and timeout to the list of waiting fibers.
inline void add(const TimePoint& timeout, Fiber* fiber);
// erase() removes the fiber from the waiting list.
inline void erase(Fiber* fiber);
private:
struct Timeout {
TimePoint timepoint;
Fiber* fiber;
inline bool operator<(const Timeout&) const;
};
std::set<Timeout> timeouts;
std::unordered_map<Fiber*, TimePoint> fibers;
};
// TODO: Implement a queue that recycles elements to reduce number of
// heap allocations.
using TaskQueue = std::queue<Task>;
using FiberQueue = std::queue<Fiber*>;
using FiberSet = std::unordered_set<Fiber*>;
// Workers executes Tasks on a single thread.
// Once a task is started, it may yield to other tasks on the same Worker.
......@@ -172,7 +218,9 @@ class Scheduler {
// yield() suspends execution of the current task, and looks for other
// tasks to start or continue execution.
void yield(Fiber* fiber);
// If timeout is not nullptr, yield may automatically resume the current
// task sometime after timeout.
void yield(Fiber* fiber, const TimePoint* timeout);
// enqueue(Fiber*) enqueues resuming of a suspended fiber.
void enqueue(Fiber* fiber);
......@@ -236,6 +284,10 @@ class Scheduler {
// frequently putting the thread to sleep and re-waking.
void spinForWork();
// enqueueFiberTimeouts() enqueues all the fibers that have finished
// waiting.
_Requires_lock_held_(lock) void enqueueFiberTimeouts();
// numBlockedFibers() returns the number of fibers currently blocked and
// held externally.
_Requires_lock_held_(lock) inline size_t numBlockedFibers() const {
......@@ -247,6 +299,7 @@ class Scheduler {
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
TaskQueue tasks; // guarded by mutex
FiberQueue fibers; // guarded by mutex
WaitingFibers waiting; // guarded by mutex
std::condition_variable added;
std::mutex mutex;
};
......@@ -274,7 +327,7 @@ class Scheduler {
Fiber* currentFiber = nullptr;
std::thread thread;
Work work;
FiberQueue idleFibers; // Fibers that have completed which can be reused.
FiberSet idleFibers; // Fibers that have completed which can be reused.
std::vector<Allocator::unique_ptr<Fiber>>
workerFibers; // All fibers created by this worker.
FastRnd rng;
......@@ -312,6 +365,14 @@ class Scheduler {
singleThreadedWorkers;
};
template <typename Clock, typename Duration>
void Scheduler::Fiber::yield_until(
const std::chrono::time_point<Clock, Duration>& timeout) {
using ToDuration = typename TimePoint::duration;
using ToClock = typename TimePoint::clock;
yield_until_sc(std::chrono::time_point_cast<ToDuration, ToClock>(timeout));
}
Scheduler::Worker* Scheduler::Worker::getCurrent() {
return Worker::current;
}
......
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "marl/event.h"
#include "marl/waitgroup.h"
#include "marl_test.h"
namespace std {
namespace chrono {
template <typename Rep, typename Period>
std::ostream& operator<<(std::ostream& os, const duration<Rep, Period>& d) {
return os << chrono::duration_cast<chrono::microseconds>(d).count() << "ms";
}
} // namespace chrono
} // namespace std
TEST_P(WithBoundScheduler, EventIsSignalled) {
std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
marl::Event::Mode::Auto};
for (auto mode : modes) {
auto event = marl::Event(mode);
ASSERT_EQ(event.isSignalled(), false);
event.signal();
ASSERT_EQ(event.isSignalled(), true);
ASSERT_EQ(event.isSignalled(), true);
event.clear();
ASSERT_EQ(event.isSignalled(), false);
}
}
TEST_P(WithBoundScheduler, EventAutoTest) {
auto event = marl::Event(marl::Event::Mode::Auto);
ASSERT_EQ(event.test(), false);
event.signal();
ASSERT_EQ(event.test(), true);
ASSERT_EQ(event.test(), false);
}
TEST_P(WithBoundScheduler, EventManualTest) {
auto event = marl::Event(marl::Event::Mode::Manual);
ASSERT_EQ(event.test(), false);
event.signal();
ASSERT_EQ(event.test(), true);
ASSERT_EQ(event.test(), true);
}
TEST_P(WithBoundScheduler, EventAutoWait) {
std::atomic<int> counter = {0};
auto event = marl::Event(marl::Event::Mode::Auto);
auto done = marl::Event(marl::Event::Mode::Auto);
for (int i = 0; i < 3; i++) {
marl::schedule([=, &counter] {
event.wait();
counter++;
done.signal();
});
}
ASSERT_EQ(counter.load(), 0);
event.signal();
done.wait();
ASSERT_EQ(counter.load(), 1);
event.signal();
done.wait();
ASSERT_EQ(counter.load(), 2);
event.signal();
done.wait();
ASSERT_EQ(counter.load(), 3);
}
TEST_P(WithBoundScheduler, EventManualWait) {
std::atomic<int> counter = {0};
auto event = marl::Event(marl::Event::Mode::Manual);
auto wg = marl::WaitGroup(3);
for (int i = 0; i < 3; i++) {
marl::schedule([=, &counter] {
event.wait();
counter++;
wg.done();
});
}
event.signal();
wg.wait();
ASSERT_EQ(counter.load(), 3);
}
TEST_P(WithBoundScheduler, EventSequence) {
std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
marl::Event::Mode::Auto};
for (auto mode : modes) {
std::string sequence;
auto eventA = marl::Event(mode);
auto eventB = marl::Event(mode);
auto eventC = marl::Event(mode);
auto done = marl::Event(mode);
marl::schedule([=, &sequence] {
eventB.wait();
sequence += "B";
eventC.signal();
});
marl::schedule([=, &sequence] {
eventA.wait();
sequence += "A";
eventB.signal();
});
marl::schedule([=, &sequence] {
eventC.wait();
sequence += "C";
done.signal();
});
ASSERT_EQ(sequence, "");
eventA.signal();
done.wait();
ASSERT_EQ(sequence, "ABC");
}
}
TEST_P(WithBoundScheduler, EventWaitForUnblocked) {
auto event = marl::Event(marl::Event::Mode::Manual);
auto wg = marl::WaitGroup(1000);
for (int i = 0; i < 1000; i++) {
marl::schedule([=] {
defer(wg.done());
auto duration = std::chrono::seconds(10);
event.wait_for(duration);
});
}
event.signal(); // unblock
wg.wait();
}
TEST_P(WithBoundScheduler, EventWaitForTimeTaken) {
auto event = marl::Event(marl::Event::Mode::Auto);
auto wg = marl::WaitGroup(1000);
for (int i = 0; i < 1000; i++) {
marl::schedule([=] {
defer(wg.done());
auto duration = std::chrono::milliseconds(10);
auto start = std::chrono::system_clock::now();
auto triggered = event.wait_for(duration);
auto end = std::chrono::system_clock::now();
ASSERT_FALSE(triggered);
ASSERT_GE(end - start, duration);
});
}
wg.wait();
}
TEST_P(WithBoundScheduler, EventWaitUntilUnblocked) {
auto event = marl::Event(marl::Event::Mode::Manual);
auto wg = marl::WaitGroup(1000);
for (int i = 0; i < 1000; i++) {
marl::schedule([=] {
defer(wg.done());
auto duration = std::chrono::seconds(10);
auto start = std::chrono::system_clock::now();
event.wait_until(start + duration);
});
}
event.signal(); // unblock
wg.wait();
}
TEST_P(WithBoundScheduler, EventWaitUntilTimeTaken) {
auto event = marl::Event(marl::Event::Mode::Auto);
auto wg = marl::WaitGroup(1000);
for (int i = 0; i < 1000; i++) {
marl::schedule([=] {
defer(wg.done());
auto duration = std::chrono::milliseconds(10);
auto start = std::chrono::system_clock::now();
auto triggered = event.wait_until(start + duration);
auto end = std::chrono::system_clock::now();
ASSERT_FALSE(triggered);
ASSERT_GE(end - start, duration);
});
}
wg.wait();
}
// EventWaitStressTest spins up a whole lot of wait_fors(), unblocks them early,
// and then let's all the workers go to idle before repeating.
// This is testing to ensure that the scheduler handles timeouts correctly when
// they are early-unblocked. Specifically, this is to test that fibers are
// not double-placed into the idle or working lists.
TEST_P(WithBoundScheduler, EventWaitStressTest) {
auto event = marl::Event(marl::Event::Mode::Manual);
for (int i = 0; i < 10; i++) {
auto wg = marl::WaitGroup(1000);
for (int j = 0; j < 1000; j++) {
marl::schedule([=] {
defer(wg.done());
event.wait_for(std::chrono::milliseconds(100));
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
event.signal(); // unblock
wg.wait();
}
}
TEST_P(WithBoundScheduler, EventAny) {
for (int i = 0; i < 3; i++) {
std::vector<marl::Event> events = {
marl::Event(marl::Event::Mode::Auto),
marl::Event(marl::Event::Mode::Auto),
marl::Event(marl::Event::Mode::Auto),
};
auto any = marl::Event::any(events.begin(), events.end());
events[i].signal();
ASSERT_TRUE(any.isSignalled());
}
}
\ No newline at end of file
......@@ -50,6 +50,48 @@ inline void protectPage(void* addr) {
MARL_ASSERT(res == 0, "Failed to protect page at %p", addr);
}
} // anonymous namespace
#elif defined(__Fuchsia__)
#include <unistd.h>
#include <zircon/process.h>
#include <zircon/syscalls.h>
namespace {
// This was a static in pageSize(), but due to the following TSAN false-positive
// bug, this has been moved out to a global.
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=68338
const size_t kPageSize = sysconf(_SC_PAGESIZE);
inline size_t pageSize() {
return kPageSize;
}
inline void* allocatePages(size_t count) {
auto length = count * kPageSize;
zx_handle_t vmo;
if (zx_vmo_create(length, 0, &vmo) != ZX_OK) {
return nullptr;
}
zx_vaddr_t reservation;
zx_status_t status =
zx_vmar_map(zx_vmar_root_self(), ZX_VM_PERM_READ | ZX_VM_PERM_WRITE, 0,
vmo, 0, length, &reservation);
zx_handle_close(vmo);
(void)status;
MARL_ASSERT(status == ZX_OK, "Failed to allocate %d pages", int(count));
return reinterpret_cast<void*>(reservation);
}
inline void freePages(void* ptr, size_t count) {
auto length = count * kPageSize;
zx_status_t status = zx_vmar_unmap(zx_vmar_root_self(),
reinterpret_cast<zx_vaddr_t>(ptr), length);
(void)status;
MARL_ASSERT(status == ZX_OK, "Failed to free %d pages at %p", int(count),
ptr);
}
inline void protectPage(void* addr) {
zx_status_t status = zx_vmar_protect(
zx_vmar_root_self(), 0, reinterpret_cast<zx_vaddr_t>(addr), kPageSize);
(void)status;
MARL_ASSERT(status == ZX_OK, "Failed to protect page at %p", addr);
}
} // anonymous namespace
#elif defined(_WIN32)
#define WIN32_LEAN_AND_MEAN 1
#include <Windows.h>
......@@ -82,7 +124,6 @@ inline void protectPage(void* addr) {
}
} // anonymous namespace
#else
// TODO: Fuchsia support
#error "Page based allocation not implemented for this platform"
#endif
......
......@@ -43,6 +43,14 @@ inline T take(std::queue<T>& queue) {
return out;
}
template <typename T>
inline T take(std::unordered_set<T>& set) {
auto it = set.begin();
auto out = std::move(*it);
set.erase(it);
return out;
}
inline void nop() {
#if defined(_WIN32)
__nop();
......@@ -216,7 +224,12 @@ void Scheduler::Fiber::schedule() {
void Scheduler::Fiber::yield() {
MARL_SCOPED_EVENT("YIELD");
worker->yield(this);
worker->yield(this, nullptr);
}
void Scheduler::Fiber::yield_until_sc(const TimePoint& timeout) {
MARL_SCOPED_EVENT("YIELD_UNTIL");
worker->yield(this, &timeout);
}
void Scheduler::Fiber::switchTo(Fiber* to) {
......@@ -241,6 +254,60 @@ Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) {
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::WaitingFibers
////////////////////////////////////////////////////////////////////////////////
Scheduler::WaitingFibers::operator bool() const {
return fibers.size() > 0;
}
Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timepoint) {
if (!*this) {
return nullptr;
}
auto it = timeouts.begin();
if (timepoint < it->timepoint) {
return nullptr;
}
auto fiber = it->fiber;
timeouts.erase(it);
auto deleted = fibers.erase(fiber) != 0;
(void)deleted;
MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
return fiber;
}
Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
MARL_ASSERT(*this,
"WaitingFibers::next() called when there' no waiting fibers");
return timeouts.begin()->timepoint;
}
void Scheduler::WaitingFibers::add(const TimePoint& timepoint, Fiber* fiber) {
timeouts.emplace(Timeout{timepoint, fiber});
bool added = fibers.emplace(fiber, timepoint).second;
(void)added;
MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
}
void Scheduler::WaitingFibers::erase(Fiber* fiber) {
auto it = fibers.find(fiber);
if (it != fibers.end()) {
auto timeout = it->second;
auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
(void)erased;
MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
fibers.erase(it);
}
}
bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
if (timepoint != o.timepoint) {
return timepoint < o.timepoint;
}
return fiber < o.fiber;
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker
////////////////////////////////////////////////////////////////////////////////
thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
......@@ -296,15 +363,20 @@ void Scheduler::Worker::stop() {
}
}
void Scheduler::Worker::yield(Fiber* from) {
(void)from; // unreferenced parameter
void Scheduler::Worker::yield(
Fiber* from,
const std::chrono::system_clock::time_point* timeout) {
MARL_ASSERT(currentFiber == from,
"Attempting to call yield from a non-current fiber");
// Current fiber is yielding as it is blocked.
// First wait until there's something else this worker can do.
std::unique_lock<std::mutex> lock(work.mutex);
if (timeout != nullptr) {
work.waiting.add(*timeout, from);
}
// First wait until there's something else this worker can do.
waitForWork(lock);
if (work.fibers.size() > 0) {
......@@ -332,6 +404,7 @@ bool Scheduler::Worker::tryLock() {
void Scheduler::Worker::enqueue(Fiber* fiber) {
std::unique_lock<std::mutex> lock(work.mutex);
auto wasIdle = work.num == 0;
work.waiting.erase(fiber);
work.fibers.push(std::move(fiber));
work.num++;
lock.unlock();
......@@ -385,7 +458,8 @@ void Scheduler::Worker::run() {
Fiber::current()->id);
{
std::unique_lock<std::mutex> lock(work.mutex);
work.added.wait(lock, [this] { return work.num > 0 || shutdown; });
work.added.wait(
lock, [this] { return work.num > 0 || work.waiting || shutdown; });
while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
waitForWork(lock);
runUntilIdle(lock);
......@@ -418,9 +492,25 @@ _Requires_lock_held_(lock) void Scheduler::Worker::waitForWork(
spinForWork();
lock.lock();
}
work.added.wait(lock, [this] {
return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
});
if (work.waiting) {
work.added.wait_until(lock, work.waiting.next(), [this] {
return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
});
enqueueFiberTimeouts();
} else {
work.added.wait(lock, [this] {
return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
});
}
}
_Requires_lock_held_(lock) void Scheduler::Worker::enqueueFiberTimeouts() {
auto now = std::chrono::system_clock::now();
while (auto fiber = work.waiting.take(now)) {
work.fibers.push(fiber);
work.num++;
}
}
void Scheduler::Worker::spinForWork() {
......@@ -467,7 +557,11 @@ _Requires_lock_held_(lock) void Scheduler::Worker::runUntilIdle(
work.num--;
auto fiber = take(work.fibers);
lock.unlock();
idleFibers.push(currentFiber);
auto added = idleFibers.emplace(currentFiber).second;
(void)added;
MARL_ASSERT(added, "fiber already idle");
switchToFiber(fiber);
lock.lock();
}
......@@ -499,6 +593,8 @@ Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
}
void Scheduler::Worker::switchToFiber(Fiber* to) {
MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
"switching to idle fiber");
auto from = currentFiber;
currentFiber = to;
from->switchTo(to);
......
......@@ -150,7 +150,7 @@ TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
}
wg.wait();
ASSERT_EQ(threads.size(), 8U);
ASSERT_LE(threads.size(), 8U);
ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
scheduler->unbind();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment