Commit 3decd1af by Ben Clayton

Merge changes I9862d8e2,Ie1024655

* changes: Update Marl to 16e1dc37c Squashed 'third_party/marl/' changes from ca8408f68..16e1dc37c
parents 6c11cf2a bf61e323
...@@ -105,13 +105,7 @@ endif(NOT MSVC) ...@@ -105,13 +105,7 @@ endif(NOT MSVC)
########################################################### ###########################################################
# OS libraries # OS libraries
########################################################### ###########################################################
if(CMAKE_SYSTEM_NAME MATCHES "Windows") find_package(Threads REQUIRED)
set(MARL_OS_LIBS Kernel32)
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
set(MARL_OS_LIBS pthread)
elseif(CMAKE_SYSTEM_NAME MATCHES "Darwin")
set(MARL_OS_LIBS)
endif()
########################################################### ###########################################################
# Functions # Functions
...@@ -152,7 +146,7 @@ function(marl_set_target_options target) ...@@ -152,7 +146,7 @@ function(marl_set_target_options target)
target_link_libraries(${target} "-fsanitize=thread") target_link_libraries(${target} "-fsanitize=thread")
endif() endif()
target_include_directories(${target} PUBLIC ${MARL_INCLUDE_DIR}) target_include_directories(${target} PUBLIC $<BUILD_INTERFACE:${MARL_INCLUDE_DIR}>)
endfunction(marl_set_target_options) endfunction(marl_set_target_options)
########################################################### ###########################################################
...@@ -167,12 +161,19 @@ set_target_properties(marl PROPERTIES ...@@ -167,12 +161,19 @@ set_target_properties(marl PROPERTIES
marl_set_target_options(marl) marl_set_target_options(marl)
target_link_libraries(marl "${MARL_OS_LIBS}") target_link_libraries(marl PUBLIC Threads::Threads)
# install # install
if(MARL_INSTALL) if(MARL_INSTALL)
include(CMakePackageConfigHelpers)
include(GNUInstallDirs) include(GNUInstallDirs)
configure_package_config_file(
${CMAKE_CURRENT_SOURCE_DIR}/cmake/marl-config.cmake.in
${CMAKE_CURRENT_BINARY_DIR}/marl-config.cmake
INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/marl
)
install(DIRECTORY ${MARL_INCLUDE_DIR}/marl install(DIRECTORY ${MARL_INCLUDE_DIR}/marl
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
USE_SOURCE_PERMISSIONS USE_SOURCE_PERMISSIONS
...@@ -187,10 +188,14 @@ if(MARL_INSTALL) ...@@ -187,10 +188,14 @@ if(MARL_INSTALL)
) )
install(EXPORT marl-targets install(EXPORT marl-targets
FILE marl-config.cmake FILE marl-targets.cmake
NAMESPACE marl:: NAMESPACE marl::
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/marl DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/marl
) )
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/marl-config.cmake
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/marl
)
endif(MARL_INSTALL) endif(MARL_INSTALL)
# tests # tests
......
# Copyright 2020 The Marl Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@PACKAGE_INIT@
include(CMakeFindDependencyMacro)
find_dependency(Threads)
if(NOT TARGET marl::marl)
include(${CMAKE_CURRENT_LIST_DIR}/marl-targets.cmake)
endif()
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xl="http://www.w3.org/1999/xlink" xmlns:dc="http://purl.org/dc/elements/1.1/" version="1.1" viewBox="724 1659.5 441.5 408.5" width="441.5" height="408.5">
<defs>
<font-face font-family="Courier New" font-size="16" panose-1="2 7 3 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="41.015625" slope="0" x-height="422.85156" cap-height="571.28906" ascent="832.5195" descent="-300.29297" font-weight="400">
<font-face-src>
<font-face-name name="CourierNewPSMT"/>
</font-face-src>
</font-face>
<filter id="Shadow" filterUnits="userSpaceOnUse" x="724" y="1659.5">
<feOffset in="SourceAlpha" result="offset" dx="0" dy="2"/>
<feFlood flood-color="#919191" flood-opacity=".25" result="flood"/>
<feComposite in="flood" in2="offset" operator="in" result="color"/>
<feMerge>
<feMergeNode in="color"/>
<feMergeNode in="SourceGraphic"/>
</feMerge>
</filter>
<font-face font-family="Roboto" font-size="13" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="700">
<font-face-src>
<font-face-name name="Roboto-Bold"/>
</font-face-src>
</font-face>
<font-face font-family="Courier New" font-size="13" panose-1="2 7 6 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="100.09766" slope="0" x-height="443.3594" cap-height="591.7969" ascent="832.5195" descent="-300.29297" font-weight="700">
<font-face-src>
<font-face-name name="CourierNewPS-BoldMT"/>
</font-face-src>
</font-face>
<font-face font-family="Roboto" font-size="12" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="400">
<font-face-src>
<font-face-name name="Roboto-Regular"/>
</font-face-src>
</font-face>
<font-face font-family="Roboto" font-size="11" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="400">
<font-face-src>
<font-face-name name="Roboto-Regular"/>
</font-face-src>
</font-face>
<marker orient="auto" overflow="visible" markerUnits="strokeWidth" id="FilledArrow_Marker" stroke-linejoin="miter" stroke-miterlimit="10" viewBox="-1 -3 5 6" markerWidth="5" markerHeight="6" color="#00aeef">
<g>
<path d="M 2.88 0 L 0 -1.08 L 0 1.08 Z" fill="currentColor" stroke="currentColor" stroke-width="1"/>
</g>
</marker>
</defs>
<metadata> Produced by OmniGraffle 7.12.1
<dc:date>2020-02-12 20:52:25 +0000</dc:date>
</metadata>
<g id="Canvas_1" stroke="none" stroke-opacity="1" fill="none" fill-opacity="1" stroke-dasharray="none">
<title>Canvas 1</title>
<g id="Canvas_1: Layer 1">
<title>Layer 1</title>
<g id="Graphic_259">
<rect x="745" y="1660" width="420" height="385" fill="#4751d4" fill-opacity=".04274277"/>
<path d="M 745 1660 L 1165 1660 L 1165 2045 L 745 2045 Z" stroke="gray" stroke-linecap="round" stroke-linejoin="round" stroke-dasharray="1.0,4.0" stroke-width="1"/>
<clipPath id="clip_path">
<rect x="0" y="0" width="420" height="385" fill="#4751d4" fill-opacity=".04274277"/>
</clipPath>
<text clip-path="url(#clip_path)" transform="translate(750 1665)" fill="black">
<tspan font-family="Courier New" font-size="16" font-weight="400" fill="black" x="27.371094" y="13">Worker::run() (Multi-Threaded-Worker)</tspan>
</text>
</g>
<g id="Graphic_260" filter="url(#Shadow)">
<path d="M 825 2025 L 895 2025 C 903.28 2025 910 2033.96 910 2045 C 910 2056.04 903.28 2065 895 2065 L 825 2065 C 816.72 2065 810 2056.04 810 2045 C 810 2033.96 816.72 2025 825 2025 Z" fill="#ffc7b1"/>
<path d="M 825 2025 L 895 2025 C 903.28 2025 910 2033.96 910 2045 C 910 2056.04 903.28 2065 895 2065 L 825 2065 C 816.72 2065 810 2056.04 810 2045 C 810 2033.96 816.72 2025 825 2025 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(816 2037.5)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="28.946533" y="12">Done</tspan>
</text>
</g>
<g id="Graphic_261" filter="url(#Shadow)">
<path d="M 765 1851.75 L 765 1918.25 C 765 1926.116 756.04 1932.5 745 1932.5 C 733.96 1932.5 725 1926.116 725 1918.25 L 725 1851.75 C 725 1843.884 733.96 1837.5 745 1837.5 C 756.04 1837.5 765 1843.884 765 1851.75 Z" fill="#a7fee5"/>
<path d="M 765 1851.75 L 765 1918.25 C 765 1926.116 756.04 1932.5 745 1932.5 C 733.96 1932.5 725 1926.116 725 1918.25 L 725 1851.75 C 725 1843.884 733.96 1837.5 745 1837.5 C 756.04 1837.5 765 1843.884 765 1851.75 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(752.5 1843.5) rotate(90)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="27.097168" y="12">Start</tspan>
</text>
</g>
<g id="Graphic_262" filter="url(#Shadow)">
<title>join</title>
<rect x="1e3" y="1725" width="140" height="40" fill="#c0c0ff"/>
<rect x="1e3" y="1725" width="140" height="40" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(1006 1737.5)" fill="#515556">
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="13.291748" y="11">waitForWork()</tspan>
</text>
</g>
<g id="Graphic_263" filter="url(#Shadow)">
<path d="M 860 1855 L 925 1885 L 860 1915 L 795 1885 Z" fill="white"/>
<path d="M 860 1855 L 925 1885 L 860 1915 L 795 1885 Z" stroke="#fcc04d" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(801 1878)" fill="#515556">
<tspan font-family="Roboto" font-size="12" font-weight="400" fill="#515556" x="29.410156" y="11">Shutdown?</tspan>
</text>
</g>
<g id="Graphic_264" filter="url(#Shadow)">
<circle cx="860" cy="1960" r="15.0000239685285" fill="white"/>
<circle cx="860" cy="1960" r="15.0000239685285" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(851 1953.5)" fill="#515556">
<tspan font-family="Roboto" font-size="11" font-weight="400" fill="#515556" x=".12158203" y="10">Yes</tspan>
</text>
</g>
<g id="Graphic_268" filter="url(#Shadow)">
<circle cx="860" cy="1810" r="15.0000239685285" fill="white"/>
<circle cx="860" cy="1810" r="15.0000239685285" stroke="#b1001c" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(851 1803.5)" fill="#515556">
<tspan font-family="Roboto" font-size="11" font-weight="400" fill="#515556" x="1.9423828" y="10">No</tspan>
</text>
</g>
<g id="Line_269">
<line x1="860" y1="1853.8986" x2="860" y2="1835.06" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_270">
<line x1="860" y1="1916.1014" x2="860" y2="1934.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_271">
<path d="M 860 1795 L 860 1745 L 990.94 1745" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_272">
<line x1="765" y1="1885" x2="783.5537" y2="1885" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_275">
<line x1="860" y1="1975" x2="860" y2="2014.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Graphic_277" filter="url(#Shadow)">
<title>join</title>
<rect x="1e3" y="1800" width="140" height="40" fill="#c0c0ff"/>
<rect x="1e3" y="1800" width="140" height="40" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(1006 1812.5)" fill="#515556">
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="9.391113" y="11">runUntilIdle()</tspan>
</text>
</g>
<g id="Line_278">
<line x1="1070" y1="1765" x2="1070" y2="1789.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_279">
<path d="M 1070 1840 L 1070 1885 L 934.06 1885" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
</g>
</g>
</svg>
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#ifndef marl_blocking_call_h #ifndef marl_blocking_call_h
#define marl_blocking_call_h #define marl_blocking_call_h
#include "defer.h" #include "scheduler.h"
#include "waitgroup.h" #include "waitgroup.h"
#include <thread> #include <thread>
...@@ -32,10 +32,17 @@ class OnNewThread { ...@@ -32,10 +32,17 @@ class OnNewThread {
inline static RETURN_TYPE call(F&& f, Args&&... args) { inline static RETURN_TYPE call(F&& f, Args&&... args) {
RETURN_TYPE result; RETURN_TYPE result;
WaitGroup wg(1); WaitGroup wg(1);
auto scheduler = Scheduler::get();
auto thread = std::thread( auto thread = std::thread(
[&](Args&&... args) { [&, wg](Args&&... args) {
defer(wg.done()); if (scheduler != nullptr) {
scheduler->bind();
}
result = f(std::forward<Args>(args)...); result = f(std::forward<Args>(args)...);
if (scheduler != nullptr) {
Scheduler::unbind();
}
wg.done();
}, },
std::forward<Args>(args)...); std::forward<Args>(args)...);
wg.wait(); wg.wait();
...@@ -50,10 +57,17 @@ class OnNewThread<void> { ...@@ -50,10 +57,17 @@ class OnNewThread<void> {
template <typename F, typename... Args> template <typename F, typename... Args>
inline static void call(F&& f, Args&&... args) { inline static void call(F&& f, Args&&... args) {
WaitGroup wg(1); WaitGroup wg(1);
auto scheduler = Scheduler::get();
auto thread = std::thread( auto thread = std::thread(
[&](Args&&... args) { [&, wg](Args&&... args) {
defer(wg.done()); if (scheduler != nullptr) {
scheduler->bind();
}
f(std::forward<Args>(args)...); f(std::forward<Args>(args)...);
if (scheduler != nullptr) {
Scheduler::unbind();
}
wg.done();
}, },
std::forward<Args>(args)...); std::forward<Args>(args)...);
wg.wait(); wg.wait();
......
...@@ -43,7 +43,6 @@ class OSFiber; ...@@ -43,7 +43,6 @@ class OSFiber;
// A scheduler can be bound to one or more threads using the bind() method. // A scheduler can be bound to one or more threads using the bind() method.
// Once bound to a thread, that thread can call marl::schedule() to enqueue // Once bound to a thread, that thread can call marl::schedule() to enqueue
// work tasks to be executed asynchronously. // work tasks to be executed asynchronously.
// All threads must be unbound with unbind() before the scheduler is destructed.
// Scheduler are initially constructed in single-threaded mode. // Scheduler are initially constructed in single-threaded mode.
// Call setWorkerThreadCount() to spawn dedicated worker threads. // Call setWorkerThreadCount() to spawn dedicated worker threads.
class Scheduler { class Scheduler {
...@@ -56,8 +55,7 @@ class Scheduler { ...@@ -56,8 +55,7 @@ class Scheduler {
Scheduler(Allocator* allocator = Allocator::Default); Scheduler(Allocator* allocator = Allocator::Default);
// Destructor. // Destructor.
// Ensure that all threads are unbound before calling - failure to do so may // Blocks until the scheduler is unbound from all threads before returning.
// result in leaked memory.
~Scheduler(); ~Scheduler();
// get() returns the scheduler bound to the current thread. // get() returns the scheduler bound to the current thread.
...@@ -147,6 +145,39 @@ class Scheduler { ...@@ -147,6 +145,39 @@ class Scheduler {
const std::chrono::time_point<Clock, Duration>& timeout, const std::chrono::time_point<Clock, Duration>& timeout,
const Predicate& pred); const Predicate& pred);
// wait() suspends execution of this Fiber until the Fiber is woken up with
// a call to notify().
// While the Fiber is suspended, the scheduler thread may continue executing
// other tasks.
// wait() must only be called on the currently executing fiber.
//
// Warning: Unlike wait() overloads that take a lock and predicate, this
// form of wait() offers no safety for notify() signals that occur before
// the fiber is suspended, when signalling between different threads. In
// this scenario you may deadlock. For this reason, it is only ever
// recommended to use this overload if you can guarantee that the calls to
// wait() and notify() are made by the same thread.
//
// Use with extreme caution.
inline void wait();
// wait() suspends execution of this Fiber until the Fiber is woken up with
// a call to notify(), or sometime after the timeout is reached.
// While the Fiber is suspended, the scheduler thread may continue executing
// other tasks.
// wait() must only be called on the currently executing fiber.
//
// Warning: Unlike wait() overloads that take a lock and predicate, this
// form of wait() offers no safety for notify() signals that occur before
// the fiber is suspended, when signalling between different threads. For
// this reason, it is only ever recommended to use this overload if you can
// guarantee that the calls to wait() and notify() are made by the same
// thread.
//
// Use with extreme caution.
template <typename Clock, typename Duration>
inline bool wait(const std::chrono::time_point<Clock, Duration>& timeout);
// notify() reschedules the suspended Fiber for execution. // notify() reschedules the suspended Fiber for execution.
// notify() is usually only called when the predicate for one or more wait() // notify() is usually only called when the predicate for one or more wait()
// calls will likely return true. // calls will likely return true.
...@@ -283,13 +314,18 @@ class Scheduler { ...@@ -283,13 +314,18 @@ class Scheduler {
void stop(); void stop();
// wait() suspends execution of the current task until the predicate pred // wait() suspends execution of the current task until the predicate pred
// returns true. // returns true or the optional timeout is reached.
// See Fiber::wait() for more information. // See Fiber::wait() for more information.
_Requires_lock_held_(lock) _Requires_lock_held_(lock)
bool wait(Fiber::Lock& lock, bool wait(Fiber::Lock& lock,
const TimePoint* timeout, const TimePoint* timeout,
const Predicate& pred); const Predicate& pred);
// wait() suspends execution of the current task until the fiber is
// notified, or the optional timeout is reached.
// See Fiber::wait() for more information.
bool wait(const TimePoint* timeout);
// suspend() suspends the currenetly executing Fiber until the fiber is // suspend() suspends the currenetly executing Fiber until the fiber is
// woken with a call to enqueue(Fiber*), or automatically sometime after the // woken with a call to enqueue(Fiber*), or automatically sometime after the
// optional timeout. // optional timeout.
...@@ -314,8 +350,9 @@ class Scheduler { ...@@ -314,8 +350,9 @@ class Scheduler {
_Releases_lock_(work.mutex) _Releases_lock_(work.mutex)
void enqueueAndUnlock(Task&& task); void enqueueAndUnlock(Task&& task);
// flush() processes all pending tasks before returning. // runUntilShutdown() processes all tasks and fibers until there are no more
void flush(); // and shutdown is true, upon runUntilShutdown() returns.
void runUntilShutdown();
// steal() attempts to steal a Task from the worker for another worker. // steal() attempts to steal a Task from the worker for another worker.
// Returns true if a task was taken and assigned to out, otherwise false. // Returns true if a task was taken and assigned to out, otherwise false.
...@@ -333,10 +370,7 @@ class Scheduler { ...@@ -333,10 +370,7 @@ class Scheduler {
private: private:
// run() is the task processing function for the worker. // run() is the task processing function for the worker.
// If the worker was constructed in Mode::MultiThreaded, run() will // run() processes tasks until stop() is called.
// continue to process tasks until stop() is called.
// If the worker was constructed in Mode::SingleThreaded, run() call
// flush() and return.
_Requires_lock_held_(work.mutex) _Requires_lock_held_(work.mutex)
void run(); void run();
...@@ -377,15 +411,10 @@ class Scheduler { ...@@ -377,15 +411,10 @@ class Scheduler {
_Requires_lock_held_(work.mutex) _Requires_lock_held_(work.mutex)
inline void setFiberState(Fiber* fiber, Fiber::State to) const; inline void setFiberState(Fiber* fiber, Fiber::State to) const;
// numBlockedFibers() returns the number of fibers currently blocked and
// held externally.
inline size_t numBlockedFibers() const {
return workerFibers.size() - idleFibers.size();
}
// Work holds tasks and fibers that are enqueued on the Worker. // Work holds tasks and fibers that are enqueued on the Worker.
struct Work { struct Work {
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size() std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
_Guarded_by_(mutex) uint64_t numBlockedFibers = 0;
_Guarded_by_(mutex) TaskQueue tasks; _Guarded_by_(mutex) TaskQueue tasks;
_Guarded_by_(mutex) FiberQueue fibers; _Guarded_by_(mutex) FiberQueue fibers;
_Guarded_by_(mutex) WaitingFibers waiting; _Guarded_by_(mutex) WaitingFibers waiting;
...@@ -454,9 +483,12 @@ class Scheduler { ...@@ -454,9 +483,12 @@ class Scheduler {
unsigned int numWorkerThreads = 0; unsigned int numWorkerThreads = 0;
std::array<Worker*, MaxWorkerThreads> workerThreads; std::array<Worker*, MaxWorkerThreads> workerThreads;
std::mutex singleThreadedWorkerMutex; struct SingleThreadedWorkers {
std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>> std::mutex mutex;
singleThreadedWorkers; std::condition_variable unbind;
std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>> byTid;
};
SingleThreadedWorkers singleThreadedWorkers;
}; };
_Requires_lock_held_(lock) _Requires_lock_held_(lock)
...@@ -471,6 +503,19 @@ bool Scheduler::Fiber::wait( ...@@ -471,6 +503,19 @@ bool Scheduler::Fiber::wait(
return worker->wait(lock, &tp, pred); return worker->wait(lock, &tp, pred);
} }
void Scheduler::Fiber::wait() {
worker->wait(nullptr);
}
template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait(
const std::chrono::time_point<Clock, Duration>& timeout) {
using ToDuration = typename TimePoint::duration;
using ToClock = typename TimePoint::clock;
auto tp = std::chrono::time_point_cast<ToDuration, ToClock>(timeout);
return worker->wait(&tp);
}
Scheduler::Worker* Scheduler::Worker::getCurrent() { Scheduler::Worker* Scheduler::Worker::getCurrent() {
return Worker::current; return Worker::current;
} }
......
-----BEGIN PGP PUBLIC KEY BLOCK-----
mQINBFS+1SABEACnmkESkY7eZq0GhDjbkWpKmURGk9+ycsfAhA44NqUvf4tk1GPM
5SkJ/fYedYZJaDVhIp98fHgucD0O+vjOzghtgwtITusYjiPHPFBd/MN+MQqSEAP+
LUa/kjHLjgyXxKhFUIDGVaDWL5tKOA7/AQKl1TyJ8lz89NHQoUHFsF/hu10+qhJe
V65d32MXFehIUSvegh8DrPuExrliSiORO4HOhuc6151dWA4YBWVg4rX5kfKrGMMT
pTWnSSZtgoRhkKW2Ey8cmZUqPuUJIfWyeNVu1e4SFtAivLvu/Ymz2WBJcNA1ZlTr
RCOR5SIRgZ453pQnI/Bzna2nnJ/TV1gGJIGRahj/ini0cs2x1CILfS/YJQ3rWGGo
OxwG0BVmPk0cmLVtyTq8gUPwxcPUd6WcBKhot3TDMlrffZACnQwQjlVjk5S1dEEz
atUfpEuNitU9WOM4jr/gjv36ZNCOWm95YwLhsuci/NddBN8HXhyvs+zYTVZEXa2W
l/FqOdQsQqZBcJjjWckGKhESdd7934+cesGD3O8KaeSGxww7slJrS0+6QJ8oBoAB
P/WCn/y2AiY2syEKp3wYIGJyAbsm542zMZ4nc7pYfSu49mcyhQQICmqN5QvOyYUx
OSqwbAOUNtlOyeRLZNIKoXtTqWDEu5aEiDROTw6Rkq+dIcxPNgOLdeQ3HwARAQAB
tCFIYW5zIFdlbm5ib3JnIDxoYW5zQGNocm9taXVtLm9yZz6JAlQEEwEKAD4WIQS2
yPmCgrlE47DVwlMPwwQuNFrQXQUCXKW+LwIbAwUJDwUmjQULCQgHAgYVCgkICwIE
FgIDAQIeAQIXgAAKCRAPwwQuNFrQXXw+EACc4n7pYF89qmi6k4u1H5PLPcRVw4Ch
zY293N5JT8dM7c5Q0opPcgSS625SzAzEA8I3kRakFMsYZmJ7NFeFwIV7iJnaolft
iGCinbnB6bF8NnaEUOU0Pl4ByAuPiZqq8t5ORWUnZX/iRtOFEmCyRWHJPxCPFcJG
XCmQHTwnucePFdvNoIHN8vbkrHU32SUQ3iL4aEH92Y2s4D3WoNMW7g3b7srRynO1
pzrT+bhihrl1MAnR6FiS4lSjw7VaEon1PJyaxs6OYO2x/fEz+uUnNPYZGhHQDTQ8
DUyXNlXQ1mOOTMAwxg5JmqWfA2y1pmgJGpKe92t6vpVe9E90GBS9oCvSFXzItNg+
p+9ogNDxMWnT48fygCqDVpk/PLdlyuNAQfuvtcZb8h5y1bzcwwBGHWb9McG12Z/K
JpcWvSQe/eZ9uHcyj2+b7SQHIJL9eaBsyhgvv573PK62Rc8fze+HtwZMWMvw5Fsc
+q5pJ8JS8y3s/EZYJ8URQ00QWOL6DDN1ik0vjxZ6zf+dpK1/3jToSrTnsY5TxXAM
gxeoFVhAtccnoAYY2zp2Dp7JonGNqXrE8rjMe67QBWzVUADgWMlCvFZ4W7ZGcj9y
2XgA4DbOgJVsx3xAGA6FuEIV0UDwDo4WweWnD4Jo+KVC3nWGW8AjNQb9EAn33WlI
K/mivl/oxH2rx7kCDQRUvtUgARAA7EHGtB6wKGOsKoqNjk+dKxJil5vh+ui5ysLz
3wAXDYOA39nP5bvC1JNu3P8ZFwK6uPNm83ujasK42TSPT6zWyBlmbYF2V2VpsvL5
QX+RJbWtvmqF9dwYa5u7jw4x21J+iT2U5zRDUvgc2UYTiVQGRnOYjtiSp+X4HCub
2umLniDi5r08iKIcgCYyhkhxu04bUpoOvoKhdGT/eDZmIZTCGreMUauiIGwoRqnY
UnVuHk0mTYSDylXt8w4XuFRAoFms060g+7yEDlYSCS7dTdViNFIjdIOLpBecMv7E
fFqOJakq0XcmNmHzL8IJMPw/I/fhiN9m4WaR2yR7lx3HofRXZQKIfjnedyAVV1AN
eRjif7QxPOHLbG7QhVWcHFgNg2GL7cyNMcl30LjEyL237ki4S8MA+GB9mMOlBqQQ
/PqFWaCPSaUoiBGKUFEr3+Q7GTL260GkaTeMQkau7+Eo2WgU2ymhi1jrMBMCvwRw
6CgIVATSciS1yDfAX344ISdXbz9rtdnBRnsaX+p84e12vfvjCjyR3xHdXx3Yb2rn
DT+4JX001DR8ZZkM8Ohi3rCc8vqBm/+ckzyhlj67SsLbhbBJxkieJqvILgkcNqwC
GvZLYK2AK8GCyUrp/eAPXoofE9kwGlfvdPM5giEwQ/+9eBUltQPp1iG35T1zg6EQ
MmjCfR0AEQEAAYkCPAQYAQIAJgIbDBYhBLbI+YKCuUTjsNXCUw/DBC40WtBdBQJa
XfpLBQkPBSarAAoJEA/DBC40WtBdPX8P/1ilEM2BomXdhUO1Vmh5DCHsFDpQtlN5
cU+iBiQXaPdVaDyz1SYCziyD/hr70otJqe1eNf4kWxG/SVB7kav9WXxVDgsoRcF+
IaZKK+Mhnt6il13dg/bDoblPdIDh3YJB+yDiuck+dciPMo2JI6LfrzJue318vRja
vZqotOY/pjuKywNQ74nVNbVcebfj0k9HQeXhxO42dabgm5fabYIkRzlcGUMCFr2l
RWz4nkLYPRQUWTJ47N4k/DLrHkClYebzifwCOFBKm7WpErEpd3B6Lq2RBZYwe6L5
OBJj/MKSYP3+hjXkSLlq8nhaAhtMslShkyLvSuI+ZTxOGOnMDtL42TSDusw+r5eX
XCGMpT+7S52WysgmPOSHp+2opSYiRvFhOmOGcS6M2sSvmbZLpnrHfL0TlBqAExF3
FGF+T4dvIAJw/+n2tc7OXgzb3UOgp4AAfvQYeeIbHI2z2sCgyv+EPldb9avPd1wo
xzaznnkToxkgsTZmKiVxGf5tg4w9m1aVvH3y3y6ox/j2BjgUZAFkDA+CUyvHuaub
sdMiJdqFOFAY4mDqLMkMAPlHBIQaUBwvbxPwoC4zoIsuSGUF9DCIqxQE2eH2vzBX
eUH6lXQaEv7eLTvuBNh9kFHAvOMV2Gb3FQoRpnqs3UFf2XOLHh5I0rmeWfSNSrXr
sfYgf//ax/x3uQINBFylxXABEAC2Qt89UYDndAxNoCIJktuSBWh9BxC1JPPQtmLd
XTsG5vd2h63rBN64ZYTGuW2AQxGV24ngP8rv5F1QzSPY0UgOt25r7pS3+1MZbv+d
sZTtN4LWTXRdIVU+wcqKX1FZCGDSuGs5EpyElnKHxxGh7Wi0KFZMN64t83WPrbzq
aiKrpp9/QHMUtrNqPgUBNKvH8k5g/AGa21+fF1kRsUtmsZbre4IK9bakIjmAfNMA
ZA/YnJy0Ou06HcFWzkfTRLMrQHINUzOzNOhhXuYx3h4qSrvcJnqoGMJ9pZkOfrEJ
VPQexYq3hvL1jwMLdFKDozViUx520/7K8frusf+Df0RlucEVF4QjAV4RAuHBtrzP
LkH/0v6U3u1rX+5VMK8otud43cXcNet/cZ97jRm2rPzviRgYI9EljjD9vGPCIzmo
aJYs+eNJRIJGPqzVV+AELiH9Bc9jCad8XeECBsTCVNx+kEijKclQWr+3y610SXNY
JRKzlPBlMrqJ0U+/vNo59TUgZlwC8KdbiWtxEQ3JYFT7rHVH9cQeAlLXAE0yIfZK
+ss2HpIXgBvJ4nNyNBcFzoqF/iKBcH6yYRILNSGLEKOBnX3/XpAlvnOB1gcTSOQY
frNoXHpA7yzpGh1MeypdCeOqOicZZRF/xX1KR6YDC5YDOFM2paydDNS1ql0Wp0VW
WcIp1wARAQABiQI8BBgBCgAmFiEEtsj5goK5ROOw1cJTD8MELjRa0F0FAlylxXAC
GwwFCQlmAYAACgkQD8MELjRa0F3Quw/+MVB3lHyIORyth4q9KsTUUXBW11UtjKqq
SML0nMuNiqHefNd9P1+zVougyF002TfjkSnOpOoH2Uub3iCX0Cfyigo0rcjBXAvO
j9N9g8eL1xBenTdxYiiHvvIm0BadikfsdoqQebv3ONFda7eoQl689LqMKZ9ZEOxi
w7xQKcIPiNEt2WvBVv4mpEFx1pDbLZ/bUgbR3t7v/t6ijAVdIOjQvW/WPemyRTcB
7iJd68H6Uou/Ofy5EPUH4c/heyCw+eUUFnC9msDIvwtTbkz0Aaa7awbpoegFMz2L
LmSRMLybFn5lQTRR7TizzUvrprOx+UalbUASJS+TONZmVltz0eVVeJ3IHylUM/24
cBh2wXqR63osDCZZkXVxbN9AtyoezEVvg8+XhDLyXeh+o05A/lRjMA33BkwyoKzi
5nZb7iaVYWlKM8Zs6PrB8zq9ErDGcka7gikvUuJ2KLKjJqj19/6Z90oCtJQa9ifi
glN+ER3y4hLHFmKI6ns+GNf0FwpgwD7WD9XBQR9uxBPCrVjXXv4IT9rBidzXT8rK
iXYX9tHBHn2wAk28uJOtdDNcsOdOEqfdmIVfBXNv2df6r8ewEzpNd2MpEOZRW8mc
cn+5dkF+W2mGn8Vky04ewU2+Bo9rApv3zJ76s0Skt2c8axKKtLhHY/H5HPiLNC29
Qk8uiuyeUfE=
=H/uX
-----END PGP PUBLIC KEY BLOCK-----
\ No newline at end of file
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "clang"
}
env_vars {
key: "BUILD_SANITIZER"
value: "asan"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "clang"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "clang"
}
env_vars {
key: "BUILD_SANITIZER"
value: "tsan"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x86"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "clang"
}
...@@ -11,4 +11,9 @@ env_vars { ...@@ -11,4 +11,9 @@ env_vars {
env_vars { env_vars {
key: "BUILD_TARGET_ARCH" key: "BUILD_TARGET_ARCH"
value: "x64" value: "x64"
} }
\ No newline at end of file
env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
...@@ -14,6 +14,11 @@ env_vars { ...@@ -14,6 +14,11 @@ env_vars {
} }
env_vars { env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
env_vars {
key: "BUILD_SANITIZER" key: "BUILD_SANITIZER"
value: "asan" value: "asan"
} }
...@@ -12,3 +12,8 @@ env_vars { ...@@ -12,3 +12,8 @@ env_vars {
key: "BUILD_TARGET_ARCH" key: "BUILD_TARGET_ARCH"
value: "x64" value: "x64"
} }
env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
...@@ -14,6 +14,11 @@ env_vars { ...@@ -14,6 +14,11 @@ env_vars {
} }
env_vars { env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
env_vars {
key: "BUILD_SANITIZER" key: "BUILD_SANITIZER"
value: "tsan" value: "tsan"
} }
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x86"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
env_vars {
key: "BUILD_SANITIZER"
value: "asan"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x86"
}
env_vars {
key: "BUILD_TOOLCHAIN"
value: "gcc"
}
...@@ -4,22 +4,68 @@ set -e # Fail on any error. ...@@ -4,22 +4,68 @@ set -e # Fail on any error.
set -x # Display commands being run. set -x # Display commands being run.
BUILD_ROOT=$PWD BUILD_ROOT=$PWD
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd )"
UBUNTU_VERSION=`cat /etc/os-release | grep -oP "Ubuntu \K([0-9]+\.[0-9]+)"`
cd github/marl cd github/marl
git submodule update --init git submodule update --init
# Always update gcc so we get a newer standard library.
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt-get install -y gcc-9-multilib g++-9-multilib linux-libc-dev:i386
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 100 --slave /usr/bin/g++ g++ /usr/bin/g++-9
sudo update-alternatives --set gcc "/usr/bin/gcc-9"
if [ "$BUILD_SYSTEM" == "cmake" ]; then if [ "$BUILD_SYSTEM" == "cmake" ]; then
mkdir build mkdir build
cd build cd build
if [ "$BUILD_TOOLCHAIN" == "clang" ]; then
# Download clang tar
CLANG_TAR="/tmp/clang-8.tar.xz"
curl -L "https://releases.llvm.org/8.0.0/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}.tar.xz" > ${CLANG_TAR}
# Verify clang tar
sudo apt-get install pgpgpg
gpg --import "${SCRIPT_DIR}/clang-8.pubkey.asc"
gpg --verify "${SCRIPT_DIR}/clang-8-ubuntu-${UBUNTU_VERSION}.sig" ${CLANG_TAR}
if [ $? -ne 0 ]; then
echo "clang download failed PGP check"
exit 1
fi
# Untar into tmp
CLANG_DIR=/tmp/clang-8
mkdir ${CLANG_DIR}
tar -xf ${CLANG_TAR} -C ${CLANG_DIR}
# Use clang as compiler
export CC="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang"
export CXX="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang++"
fi
extra_cmake_flags=""
if [ "$BUILD_TARGET_ARCH" == "x86" ]; then
extra_cmake_flags="-DCMAKE_CXX_FLAGS=-m32 -DCMAKE_C_FLAGS=-m32 -DCMAKE_ASM_FLAGS=-m32"
fi
build_and_run() { build_and_run() {
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_BUILD_BENCHMARKS=1 -DMARL_WARNINGS_AS_ERRORS=1 $1 cmake .. ${extra_cmake_flags} \
-DMARL_BUILD_EXAMPLES=1 \
-DMARL_BUILD_TESTS=1 \
-DMARL_BUILD_BENCHMARKS=1 \
-DMARL_WARNINGS_AS_ERRORS=1 \
$1
make --jobs=$(nproc) make --jobs=$(nproc)
./marl-unittests ./marl-unittests
./fractal ./fractal
./hello_task
./primes > /dev/null ./primes > /dev/null
./tasks_in_tasks
} }
if [ "$BUILD_SANITIZER" == "asan" ]; then if [ "$BUILD_SANITIZER" == "asan" ]; then
......
...@@ -61,3 +61,11 @@ TEST_P(WithBoundScheduler, BlockingCallIntReturn) { ...@@ -61,3 +61,11 @@ TEST_P(WithBoundScheduler, BlockingCallIntReturn) {
ASSERT_EQ(n.load(), 4950); ASSERT_EQ(n.load(), 4950);
} }
TEST_P(WithBoundScheduler, BlockingCallSchedulesTask) {
marl::WaitGroup wg(1);
marl::schedule([=] {
marl::blocking_call([=] { marl::schedule([=] { wg.done(); }); });
});
wg.wait();
}
...@@ -99,29 +99,31 @@ void Scheduler::bind() { ...@@ -99,29 +99,31 @@ void Scheduler::bind() {
MARL_ASSERT(bound == nullptr, "Scheduler already bound"); MARL_ASSERT(bound == nullptr, "Scheduler already bound");
bound = this; bound = this;
{ {
std::unique_lock<std::mutex> lock(singleThreadedWorkerMutex); std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
auto worker = auto worker =
allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1); allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1);
worker->start(); worker->start();
auto tid = std::this_thread::get_id(); auto tid = std::this_thread::get_id();
singleThreadedWorkers.emplace(tid, std::move(worker)); singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
} }
} }
void Scheduler::unbind() { void Scheduler::unbind() {
MARL_ASSERT(bound != nullptr, "No scheduler bound"); MARL_ASSERT(bound != nullptr, "No scheduler bound");
Allocator::unique_ptr<Worker> worker; auto worker = Scheduler::Worker::getCurrent();
worker->stop();
{ {
std::unique_lock<std::mutex> lock(bound->singleThreadedWorkerMutex); std::unique_lock<std::mutex> lock(bound->singleThreadedWorkers.mutex);
auto tid = std::this_thread::get_id(); auto tid = std::this_thread::get_id();
auto it = bound->singleThreadedWorkers.find(tid); auto it = bound->singleThreadedWorkers.byTid.find(tid);
MARL_ASSERT(it != bound->singleThreadedWorkers.end(), MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
"singleThreadedWorker not found"); "singleThreadedWorker not found");
worker = std::move(it->second); MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
bound->singleThreadedWorkers.erase(tid); bound->singleThreadedWorkers.byTid.erase(it);
if (bound->singleThreadedWorkers.byTid.size() == 0) {
bound->singleThreadedWorkers.unbind.notify_one();
}
} }
worker->flush();
worker->stop();
bound = nullptr; bound = nullptr;
} }
...@@ -133,14 +135,12 @@ Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */) ...@@ -133,14 +135,12 @@ Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
} }
Scheduler::~Scheduler() { Scheduler::~Scheduler() {
#if MARL_DEBUG_ENABLED
{ {
std::unique_lock<std::mutex> lock(singleThreadedWorkerMutex); // Wait until all the single threaded workers have been unbound.
MARL_ASSERT(singleThreadedWorkers.size() == 0, std::unique_lock<std::mutex> lock(singleThreadedWorkers.mutex);
"Scheduler still bound on %d threads", singleThreadedWorkers.unbind.wait(
int(singleThreadedWorkers.size())); lock, [this] { return singleThreadedWorkers.byTid.size() == 0; });
} }
#endif // MARL_DEBUG_ENABLED
// Release all worker threads. // Release all worker threads.
// This will wait for all in-flight tasks to complete before returning. // This will wait for all in-flight tasks to complete before returning.
...@@ -211,12 +211,9 @@ void Scheduler::enqueue(Task&& task) { ...@@ -211,12 +211,9 @@ void Scheduler::enqueue(Task&& task) {
} }
} }
} else { } else {
auto tid = std::this_thread::get_id(); auto worker = Worker::getCurrent();
std::unique_lock<std::mutex> lock(singleThreadedWorkerMutex); MARL_ASSERT(worker, "singleThreadedWorker not found");
auto it = singleThreadedWorkers.find(tid); worker->enqueue(std::move(task));
MARL_ASSERT(it != singleThreadedWorkers.end(),
"singleThreadedWorker not found");
it->second->enqueue(std::move(task));
} }
} }
...@@ -398,20 +395,32 @@ void Scheduler::Worker::start() { ...@@ -398,20 +395,32 @@ void Scheduler::Worker::start() {
void Scheduler::Worker::stop() { void Scheduler::Worker::stop() {
switch (mode) { switch (mode) {
case Mode::MultiThreaded: case Mode::MultiThreaded: {
enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread)); enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
thread.join(); thread.join();
break; break;
}
case Mode::SingleThreaded: case Mode::SingleThreaded: {
std::unique_lock<std::mutex> lock(work.mutex);
shutdown = true;
runUntilShutdown();
Worker::current = nullptr; Worker::current = nullptr;
break; break;
}
default: default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode)); MARL_ASSERT(false, "Unknown mode: %d", int(mode));
} }
} }
bool Scheduler::Worker::wait(const TimePoint* timeout) {
DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
{
std::unique_lock<std::mutex> lock(work.mutex);
suspend(timeout);
}
return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
}
_Requires_lock_held_(waitLock) _Requires_lock_held_(waitLock)
bool Scheduler::Worker::wait(Fiber::Lock& waitLock, bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
const TimePoint* timeout, const TimePoint* timeout,
...@@ -463,6 +472,8 @@ void Scheduler::Worker::suspend( ...@@ -463,6 +472,8 @@ void Scheduler::Worker::suspend(
// First wait until there's something else this worker can do. // First wait until there's something else this worker can do.
waitForWork(); waitForWork();
work.numBlockedFibers++;
if (work.fibers.size() > 0) { if (work.fibers.size() > 0) {
// There's another fiber that has become unblocked, resume that. // There's another fiber that has become unblocked, resume that.
work.num--; work.num--;
...@@ -480,6 +491,8 @@ void Scheduler::Worker::suspend( ...@@ -480,6 +491,8 @@ void Scheduler::Worker::suspend(
switchToFiber(createWorkerFiber()); switchToFiber(createWorkerFiber());
} }
work.numBlockedFibers--;
setFiberState(currentFiber, Fiber::State::Running); setFiberState(currentFiber, Fiber::State::Running);
} }
...@@ -552,39 +565,24 @@ bool Scheduler::Worker::steal(Task& out) { ...@@ -552,39 +565,24 @@ bool Scheduler::Worker::steal(Task& out) {
} }
_Requires_lock_held_(work.mutex) _Requires_lock_held_(work.mutex)
void Scheduler::Worker::flush() { void Scheduler::Worker::run() {
MARL_ASSERT(mode == Mode::SingleThreaded, if (mode == Mode::MultiThreaded) {
"flush() can only be used on a single-threaded worker"); MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
std::unique_lock<std::mutex> lock(work.mutex); // This is the entry point for a multi-threaded worker.
runUntilIdle(); // Start with a regular condition-variable wait for work. This avoids
// starting the thread with a spinForWork().
work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
}
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
runUntilShutdown();
switchToFiber(mainFiber.get());
} }
_Requires_lock_held_(work.mutex) _Requires_lock_held_(work.mutex)
void Scheduler::Worker::run() { void Scheduler::Worker::runUntilShutdown() {
switch (mode) { while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
case Mode::MultiThreaded: { waitForWork();
MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), runUntilIdle();
Fiber::current()->id);
work.wait([this] { return work.num > 0 || work.waiting || shutdown; });
while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
waitForWork();
runUntilIdle();
}
Worker::current = nullptr;
switchToFiber(mainFiber.get());
break;
}
case Mode::SingleThreaded: {
ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
while (!shutdown) {
runUntilIdle();
idleFibers.emplace(currentFiber);
switchToFiber(mainFiber.get());
}
break;
}
default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode));
} }
} }
...@@ -604,7 +602,7 @@ void Scheduler::Worker::waitForWork() { ...@@ -604,7 +602,7 @@ void Scheduler::Worker::waitForWork() {
} }
work.wait([this] { work.wait([this] {
return work.num > 0 || (shutdown && numBlockedFibers() == 0U); return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
}); });
if (work.waiting) { if (work.waiting) {
enqueueFiberTimeouts(); enqueueFiberTimeouts();
......
...@@ -107,11 +107,14 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameThread) { ...@@ -107,11 +107,14 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameThread) {
TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) { TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
auto scheduler = marl::Scheduler::get(); auto scheduler = marl::Scheduler::get();
// on 32-bit OSs, excessive numbers of threads can run out of address space.
constexpr auto num_threads = sizeof(void*) > 4 ? 1000 : 100;
marl::WaitGroup fence(1); marl::WaitGroup fence(1);
marl::WaitGroup wg(1000); marl::WaitGroup wg(num_threads);
std::vector<std::thread> threads; std::vector<std::thread> threads;
for (int i = 0; i < 1000; i++) { for (int i = 0; i < num_threads; i++) {
threads.push_back(std::thread([=] { threads.push_back(std::thread([=] {
scheduler->bind(); scheduler->bind();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment