Commit 3360bef8 by Ben Clayton

Update Marl to 38c0c7a0f

Changes: 38c0c7a0f Add Scheduler::Config and deprecate old [gs]etters 17b12a448 Use CreateFiberEx instead of CreateFiber (#141) 7236bd098 Test build / warning fixes for iOS c716869a7 Enable FreeBSD support 00433d418 Add tests for using a marl::Scheduler without binding 6b4f6472e Fix CMake warning when not built as subproject 0591f0ef0 Kokoro Ubuntu: Switch to docker image bf9255db7 Optimization for marl::parallelize Commands: ./third_party/update-marl.sh --squash Bug: b/140546382 Change-Id: I5eaceebcbfef7636fd711d9e4dc7ae4a946e8a34
parents 497e6204 068a0c55
...@@ -20,6 +20,12 @@ project(Marl C CXX ASM) ...@@ -20,6 +20,12 @@ project(Marl C CXX ASM)
include(CheckCXXSourceCompiles) include(CheckCXXSourceCompiles)
# MARL_IS_SUBPROJECT is 1 if added via add_subdirectory() from another project.
get_directory_property(MARL_IS_SUBPROJECT PARENT_DIRECTORY)
if(MARL_IS_SUBPROJECT)
set(MARL_IS_SUBPROJECT 1)
endif()
########################################################### ###########################################################
# Options # Options
########################################################### ###########################################################
...@@ -100,8 +106,10 @@ check_cxx_source_compiles( ...@@ -100,8 +106,10 @@ check_cxx_source_compiles(
MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED) MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS}) set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
# Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects if(MARL_IS_SUBPROJECT)
set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED}) # Export MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED as this may be useful to parent projects
set(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED PARENT_SCOPE ${MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED})
endif()
########################################################### ###########################################################
# File lists # File lists
......
...@@ -41,3 +41,13 @@ cc_binary( ...@@ -41,3 +41,13 @@ cc_binary(
"//:marl", "//:marl",
], ],
) )
cc_binary(
name = "tasks_in_tasks",
srcs = [
"tasks_in_tasks.cpp",
],
deps = [
"//:marl",
],
)
// Copyright 2020 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef marl_deprecated_h
#define marl_deprecated_h
// Deprecated marl::Scheduler methods:
// Scheduler(Allocator* allocator = Allocator::Default)
// getThreadInitializer(), setThreadInitializer()
// getWorkerThreadCount(), setWorkerThreadCount()
#ifndef MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
#define MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS 1
#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
#ifndef MARL_WARN_DEPRECATED
#define MARL_WARN_DEPRECATED 0
#endif // MARL_WARN_DEPRECATED
#if MARL_WARN_DEPRECATED
#define MARL_DEPRECATED(message) __attribute__((deprecated(message)))
#else
#define MARL_DEPRECATED(message)
#endif
#endif // marl_deprecated_h
...@@ -22,10 +22,10 @@ namespace marl { ...@@ -22,10 +22,10 @@ namespace marl {
namespace detail { namespace detail {
void parallelizeChain(WaitGroup&) {} inline void parallelizeChain(WaitGroup&) {}
template <typename F, typename... L> template <typename F, typename... L>
void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) { inline void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) {
schedule([=] { schedule([=] {
f(); f();
wg.done(); wg.done();
...@@ -35,13 +35,26 @@ void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) { ...@@ -35,13 +35,26 @@ void parallelizeChain(WaitGroup& wg, F&& f, L&&... l) {
} // namespace detail } // namespace detail
// parallelize() schedules all the function parameters and waits for them to // parallelize() invokes all the function parameters, potentially concurrently,
// complete. These functions may execute concurrently. // and waits for them all to complete before returning.
//
// Each function must take no parameters. // Each function must take no parameters.
template <typename... FUNCTIONS> //
inline void parallelize(FUNCTIONS&&... functions) { // parallelize() does the following:
WaitGroup wg(sizeof...(FUNCTIONS)); // (1) Schedules the function parameters in the parameter pack fn.
detail::parallelizeChain(wg, functions...); // (2) Calls f0 on the current thread.
// (3) Once f0 returns, waits for the scheduled functions in fn to all
// complete.
// As the fn functions are scheduled before running f0, it is recommended to
// pass the function that'll take the most time as the first argument. That way
// you'll be more likely to avoid the cost of a fiber switch.
template <typename F0, typename... FN>
inline void parallelize(F0&& f0, FN&&... fn) {
WaitGroup wg(sizeof...(FN));
// Schedule all the functions in fn.
detail::parallelizeChain(wg, std::forward<FN>(fn)...);
// While we wait for fn to complete, run the first function on this thread.
f0();
wg.wait(); wg.wait();
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define marl_scheduler_h #define marl_scheduler_h
#include "debug.h" #include "debug.h"
#include "deprecated.h"
#include "memory.h" #include "memory.h"
#include "mutex.h" #include "mutex.h"
#include "task.h" #include "task.h"
...@@ -50,8 +51,33 @@ class Scheduler { ...@@ -50,8 +51,33 @@ class Scheduler {
public: public:
using TimePoint = std::chrono::system_clock::time_point; using TimePoint = std::chrono::system_clock::time_point;
using Predicate = std::function<bool()>; using Predicate = std::function<bool()>;
using ThreadInitializer = std::function<void(int workerId)>;
// Config holds scheduler configuration settings that can be passed to the
// Scheduler constructor.
struct Config {
// Per-worker-thread settings.
struct WorkerThread {
int count = 0;
ThreadInitializer initializer;
};
WorkerThread workerThread;
Scheduler(Allocator* allocator = Allocator::Default); // Memory allocator to use for the scheduler and internal allocations.
Allocator* allocator = Allocator::Default;
// allCores() returns a Config with a worker thread for each of the logical
// cpus available to the process.
static Config allCores();
// Fluent setters that return this Config so set calls can be chained.
inline Config& setAllocator(Allocator*);
inline Config& setWorkerThreadCount(int);
inline Config& setWorkerThreadInitializer(const ThreadInitializer&);
};
// Constructor.
Scheduler(const Config&);
// Destructor. // Destructor.
// Blocks until the scheduler is unbound from all threads before returning. // Blocks until the scheduler is unbound from all threads before returning.
...@@ -73,24 +99,36 @@ class Scheduler { ...@@ -73,24 +99,36 @@ class Scheduler {
// enqueue() queues the task for asynchronous execution. // enqueue() queues the task for asynchronous execution.
void enqueue(Task&& task); void enqueue(Task&& task);
// config() returns the Config that was used to build the schededuler.
const Config& config() const;
#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
MARL_DEPRECATED("use Scheduler::Scheduler(const Config&)")
Scheduler(Allocator* allocator = Allocator::Default);
// setThreadInitializer() sets the worker thread initializer function which // setThreadInitializer() sets the worker thread initializer function which
// will be called for each new worker thread spawned. // will be called for each new worker thread spawned.
// The initializer will only be called on newly created threads (call // The initializer will only be called on newly created threads (call
// setThreadInitializer() before setWorkerThreadCount()). // setThreadInitializer() before setWorkerThreadCount()).
MARL_DEPRECATED("use Config::setWorkerThreadInitializer()")
void setThreadInitializer(const std::function<void()>& init); void setThreadInitializer(const std::function<void()>& init);
// getThreadInitializer() returns the thread initializer function set by // getThreadInitializer() returns the thread initializer function set by
// setThreadInitializer(). // setThreadInitializer().
const std::function<void()>& getThreadInitializer(); MARL_DEPRECATED("use config().workerThread.initializer")
std::function<void()> getThreadInitializer();
// setWorkerThreadCount() adjusts the number of dedicated worker threads. // setWorkerThreadCount() adjusts the number of dedicated worker threads.
// A count of 0 puts the scheduler into single-threaded mode. // A count of 0 puts the scheduler into single-threaded mode.
// Note: Currently the number of threads cannot be adjusted once tasks // Note: Currently the number of threads cannot be adjusted once tasks
// have been enqueued. This restriction may be lifted at a later time. // have been enqueued. This restriction may be lifted at a later time.
MARL_DEPRECATED("use Config::setWorkerThreadCount()")
void setWorkerThreadCount(int count); void setWorkerThreadCount(int count);
// getWorkerThreadCount() returns the number of worker threads. // getWorkerThreadCount() returns the number of worker threads.
MARL_DEPRECATED("use config().workerThread.count")
int getWorkerThreadCount(); int getWorkerThreadCount();
#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
// Fibers expose methods to perform cooperative multitasking and are // Fibers expose methods to perform cooperative multitasking and are
// automatically created by the Scheduler. // automatically created by the Scheduler.
...@@ -451,18 +489,17 @@ class Scheduler { ...@@ -451,18 +489,17 @@ class Scheduler {
// The scheduler currently bound to the current thread. // The scheduler currently bound to the current thread.
static thread_local Scheduler* bound; static thread_local Scheduler* bound;
Allocator* const allocator; #if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
Config cfg;
std::function<void()> threadInitFunc;
mutex threadInitFuncMutex; mutex threadInitFuncMutex;
#else
const Config cfg;
#endif
std::array<std::atomic<int>, 8> spinningWorkers; std::array<std::atomic<int>, 8> spinningWorkers;
std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000}; std::atomic<unsigned int> nextSpinningWorkerIdx = {0x8000000};
// TODO: Make this lot thread-safe so setWorkerThreadCount() can be called
// during execution of tasks.
std::atomic<unsigned int> nextEnqueueIndex = {0}; std::atomic<unsigned int> nextEnqueueIndex = {0};
unsigned int numWorkerThreads = 0;
std::array<Worker*, MaxWorkerThreads> workerThreads; std::array<Worker*, MaxWorkerThreads> workerThreads;
struct SingleThreadedWorkers { struct SingleThreadedWorkers {
...@@ -475,6 +512,28 @@ class Scheduler { ...@@ -475,6 +512,28 @@ class Scheduler {
SingleThreadedWorkers singleThreadedWorkers; SingleThreadedWorkers singleThreadedWorkers;
}; };
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Config
////////////////////////////////////////////////////////////////////////////////
Scheduler::Config& Scheduler::Config::setAllocator(Allocator* alloc) {
allocator = alloc;
return *this;
}
Scheduler::Config& Scheduler::Config::setWorkerThreadCount(int count) {
workerThread.count = count;
return *this;
}
Scheduler::Config& Scheduler::Config::setWorkerThreadInitializer(
const ThreadInitializer& initializer) {
workerThread.initializer = initializer;
return *this;
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Fiber
////////////////////////////////////////////////////////////////////////////////
template <typename Clock, typename Duration> template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait( bool Scheduler::Fiber::wait(
marl::lock& lock, marl::lock& lock,
......
#!/bin/bash
set -e # Fail on any error.
. /bin/using.sh # Declare the bash `using` function for configuring toolchains.
set -x # Display commands being run.
cd github/marl
git submodule update --init
using gcc-9 # Always update gcc so we get a newer standard library.
if [ "$BUILD_SYSTEM" == "cmake" ]; then
using cmake-3.17.2
mkdir build
cd build
if [ "$BUILD_TOOLCHAIN" == "clang" ]; then
using clang-10.0.0
fi
EXTRA_CMAKE_FLAGS=""
if [ "$BUILD_TARGET_ARCH" == "x86" ]; then
EXTRA_CMAKE_FLAGS="-DCMAKE_CXX_FLAGS=-m32 -DCMAKE_C_FLAGS=-m32 -DCMAKE_ASM_FLAGS=-m32"
fi
if [ "$BUILD_SANITIZER" == "asan" ]; then
EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_ASAN=1"
elif [ "$BUILD_SANITIZER" == "msan" ]; then
EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_MSAN=1"
elif [ "$BUILD_SANITIZER" == "tsan" ]; then
EXTRA_CMAKE_FLAGS="$EXTRA_CMAKE_FLAGS -DMARL_TSAN=1"
fi
cmake .. ${EXTRA_CMAKE_FLAGS} \
-DMARL_BUILD_EXAMPLES=1 \
-DMARL_BUILD_TESTS=1 \
-DMARL_BUILD_BENCHMARKS=1 \
-DMARL_WARNINGS_AS_ERRORS=1
make --jobs=$(nproc)
./marl-unittests
./fractal
./hello_task
./primes > /dev/null
./tasks_in_tasks
elif [ "$BUILD_SYSTEM" == "bazel" ]; then
using bazel-3.1.0
bazel test //:tests --test_output=all
bazel run //examples:fractal
bazel run //examples:hello_task
bazel run //examples:primes > /dev/null
bazel run //examples:tasks_in_tasks
else
echo "Unknown build system: $BUILD_SYSTEM"
exit 1
fi
\ No newline at end of file
#!/bin/bash #!/bin/bash
set -e # Fail on any error. set -e # Fail on any error.
set -x # Display commands being run.
BUILD_ROOT=$PWD ROOT_DIR=`pwd`
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd )" SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd )"
UBUNTU_VERSION=`cat /etc/os-release | grep -oP "Ubuntu \K([0-9]+\.[0-9]+)"`
cd github/marl docker run --rm -i \
--volume "${ROOT_DIR}:${ROOT_DIR}" \
git submodule update --init --volume "${KOKORO_ARTIFACTS_DIR}:/mnt/artifacts" \
--workdir "${ROOT_DIR}" \
# Always update gcc so we get a newer standard library. --env BUILD_SYSTEM=$BUILD_SYSTEM \
sudo add-apt-repository ppa:ubuntu-toolchain-r/test --env BUILD_TOOLCHAIN=$BUILD_TOOLCHAIN \
sudo apt-get update --env BUILD_TARGET_ARCH=$BUILD_TARGET_ARCH \
sudo apt-get install -y gcc-9-multilib g++-9-multilib linux-libc-dev:i386 --env BUILD_SANITIZER=$BUILD_SANITIZER \
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 100 --slave /usr/bin/g++ g++ /usr/bin/g++-9 --entrypoint "${SCRIPT_DIR}/presubmit-docker.sh" \
sudo update-alternatives --set gcc "/usr/bin/gcc-9" "gcr.io/shaderc-build/radial-build:latest"
if [ "$BUILD_SYSTEM" == "cmake" ]; then
mkdir build
cd build
if [ "$BUILD_TOOLCHAIN" == "clang" ]; then
# Download clang tar
CLANG_TAR="/tmp/clang-8.tar.xz"
curl -L "https://releases.llvm.org/8.0.0/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}.tar.xz" > ${CLANG_TAR}
# Verify clang tar
sudo apt-get install pgpgpg
gpg --import "${SCRIPT_DIR}/clang-8.pubkey.asc"
gpg --verify "${SCRIPT_DIR}/clang-8-ubuntu-${UBUNTU_VERSION}.sig" ${CLANG_TAR}
if [ $? -ne 0 ]; then
echo "clang download failed PGP check"
exit 1
fi
# Untar into tmp
CLANG_DIR=/tmp/clang-8
mkdir ${CLANG_DIR}
tar -xf ${CLANG_TAR} -C ${CLANG_DIR}
# Use clang as compiler
export CC="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang"
export CXX="${CLANG_DIR}/clang+llvm-8.0.0-x86_64-linux-gnu-ubuntu-${UBUNTU_VERSION}/bin/clang++"
fi
extra_cmake_flags=""
if [ "$BUILD_TARGET_ARCH" == "x86" ]; then
extra_cmake_flags="-DCMAKE_CXX_FLAGS=-m32 -DCMAKE_C_FLAGS=-m32 -DCMAKE_ASM_FLAGS=-m32"
fi
build_and_run() {
cmake .. ${extra_cmake_flags} \
-DMARL_BUILD_EXAMPLES=1 \
-DMARL_BUILD_TESTS=1 \
-DMARL_BUILD_BENCHMARKS=1 \
-DMARL_WARNINGS_AS_ERRORS=1 \
$1
make --jobs=$(nproc)
./marl-unittests
./fractal
./hello_task
./primes > /dev/null
./tasks_in_tasks
}
if [ "$BUILD_SANITIZER" == "asan" ]; then
build_and_run "-DMARL_ASAN=1"
elif [ "$BUILD_SANITIZER" == "msan" ]; then
build_and_run "-DMARL_MSAN=1"
elif [ "$BUILD_SANITIZER" == "tsan" ]; then
build_and_run "-DMARL_TSAN=1"
else
build_and_run
fi
elif [ "$BUILD_SYSTEM" == "bazel" ]; then
# Get bazel
curl -L -k -O -s https://github.com/bazelbuild/bazel/releases/download/0.29.1/bazel-0.29.1-installer-linux-x86_64.sh
mkdir $BUILD_ROOT/bazel
bash bazel-0.29.1-installer-linux-x86_64.sh --prefix=$BUILD_ROOT/bazel
rm bazel-0.29.1-installer-linux-x86_64.sh
# Build and run
$BUILD_ROOT/bazel/bin/bazel test //:tests --test_output=all
$BUILD_ROOT/bazel/bin/bazel run //examples:fractal
$BUILD_ROOT/bazel/bin/bazel run //examples:primes > /dev/null
else
echo "Unknown build system: $BUILD_SYSTEM"
exit 1
fi
\ No newline at end of file
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include <cstring> #include <cstring>
#if defined(__linux__) || defined(__APPLE__) #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#include <sys/mman.h> #include <sys/mman.h>
#include <unistd.h> #include <unistd.h>
namespace { namespace {
......
...@@ -78,6 +78,7 @@ TEST_F(AllocatorTest, Create) { ...@@ -78,6 +78,7 @@ TEST_F(AllocatorTest, Create) {
allocator->destroy(s16); allocator->destroy(s16);
} }
#if GTEST_HAS_DEATH_TEST
TEST_F(AllocatorTest, Guards) { TEST_F(AllocatorTest, Guards) {
marl::Allocation::Request request; marl::Allocation::Request request;
request.alignment = 16; request.alignment = 16;
...@@ -88,3 +89,4 @@ TEST_F(AllocatorTest, Guards) { ...@@ -88,3 +89,4 @@ TEST_F(AllocatorTest, Guards) {
EXPECT_DEATH(ptr[-1] = 1, ""); EXPECT_DEATH(ptr[-1] = 1, "");
EXPECT_DEATH(ptr[marl::pageSize()] = 1, ""); EXPECT_DEATH(ptr[marl::pageSize()] = 1, "");
} }
#endif
...@@ -119,9 +119,9 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber( ...@@ -119,9 +119,9 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber(
out->context = {}; out->context = {};
out->target = func; out->target = func;
out->stack = allocator->allocate(request); out->stack = allocator->allocate(request);
marl_fiber_set_target(&out->context, out->stack.ptr, stackSize, marl_fiber_set_target(
reinterpret_cast<void (*)(void*)>(&OSFiber::run), &out->context, out->stack.ptr, static_cast<uint32_t>(stackSize),
out.get()); reinterpret_cast<void (*)(void*)>(&OSFiber::run), out.get());
return out; return out;
} }
......
...@@ -64,10 +64,10 @@ OSFiber::~OSFiber() { ...@@ -64,10 +64,10 @@ OSFiber::~OSFiber() {
Allocator::unique_ptr<OSFiber> OSFiber::createFiberFromCurrentThread( Allocator::unique_ptr<OSFiber> OSFiber::createFiberFromCurrentThread(
Allocator* allocator) { Allocator* allocator) {
auto out = allocator->make_unique<OSFiber>(); auto out = allocator->make_unique<OSFiber>();
out->fiber = ConvertThreadToFiber(nullptr); out->fiber = ConvertThreadToFiberEx(nullptr,FIBER_FLAG_FLOAT_SWITCH);
out->isFiberFromThread = true; out->isFiberFromThread = true;
MARL_ASSERT(out->fiber != nullptr, MARL_ASSERT(out->fiber != nullptr,
"ConvertThreadToFiber() failed with error 0x%x", "ConvertThreadToFiberEx() failed with error 0x%x",
int(GetLastError())); int(GetLastError()));
return out; return out;
} }
...@@ -77,9 +77,10 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber( ...@@ -77,9 +77,10 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber(
size_t stackSize, size_t stackSize,
const std::function<void()>& func) { const std::function<void()>& func) {
auto out = allocator->make_unique<OSFiber>(); auto out = allocator->make_unique<OSFiber>();
out->fiber = CreateFiber(stackSize, &OSFiber::run, out.get()); // stackSize is rounded up to the system's allocation granularity (typically 64 KB).
out->fiber = CreateFiberEx(stackSize - 1,stackSize,FIBER_FLAG_FLOAT_SWITCH,&OSFiber::run, out.get());
out->target = func; out->target = func;
MARL_ASSERT(out->fiber != nullptr, "CreateFiber() failed with error 0x%x", MARL_ASSERT(out->fiber != nullptr, "CreateFiberEx() failed with error 0x%x",
int(GetLastError())); int(GetLastError()));
return out; return out;
} }
......
...@@ -100,8 +100,8 @@ void Scheduler::bind() { ...@@ -100,8 +100,8 @@ void Scheduler::bind() {
bound = this; bound = this;
{ {
marl::lock lock(singleThreadedWorkers.mutex); marl::lock lock(singleThreadedWorkers.mutex);
auto worker = auto worker = cfg.allocator->make_unique<Worker>(
allocator->make_unique<Worker>(this, Worker::Mode::SingleThreaded, -1); this, Worker::Mode::SingleThreaded, -1);
worker->start(); worker->start();
auto tid = std::this_thread::get_id(); auto tid = std::this_thread::get_id();
singleThreadedWorkers.byTid.emplace(tid, std::move(worker)); singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
...@@ -110,7 +110,7 @@ void Scheduler::bind() { ...@@ -110,7 +110,7 @@ void Scheduler::bind() {
void Scheduler::unbind() { void Scheduler::unbind() {
MARL_ASSERT(bound != nullptr, "No scheduler bound"); MARL_ASSERT(bound != nullptr, "No scheduler bound");
auto worker = Scheduler::Worker::getCurrent(); auto worker = Worker::getCurrent();
worker->stop(); worker->stop();
{ {
marl::lock lock(bound->singleThreadedWorkers.mutex); marl::lock lock(bound->singleThreadedWorkers.mutex);
...@@ -127,11 +127,17 @@ void Scheduler::unbind() { ...@@ -127,11 +127,17 @@ void Scheduler::unbind() {
bound = nullptr; bound = nullptr;
} }
Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */) Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
: allocator(allocator), workerThreads{} {
for (size_t i = 0; i < spinningWorkers.size(); i++) { for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1; spinningWorkers[i] = -1;
} }
for (int i = 0; i < cfg.workerThread.count; i++) {
workerThreads[i] =
cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
}
for (int i = 0; i < cfg.workerThread.count; i++) {
workerThreads[i]->start();
}
} }
Scheduler::~Scheduler() { Scheduler::~Scheduler() {
...@@ -146,17 +152,35 @@ Scheduler::~Scheduler() { ...@@ -146,17 +152,35 @@ Scheduler::~Scheduler() {
// Release all worker threads. // Release all worker threads.
// This will wait for all in-flight tasks to complete before returning. // This will wait for all in-flight tasks to complete before returning.
setWorkerThreadCount(0); for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
workerThreads[i]->stop();
}
for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
cfg.allocator->destroy(workerThreads[i]);
}
}
#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
: workerThreads{} {
cfg.allocator = allocator;
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
}
} }
void Scheduler::setThreadInitializer(const std::function<void()>& init) { void Scheduler::setThreadInitializer(const std::function<void()>& init) {
marl::lock lock(threadInitFuncMutex); marl::lock lock(threadInitFuncMutex);
threadInitFunc = init; cfg.workerThread.initializer = [=](int) { init(); };
} }
const std::function<void()>& Scheduler::getThreadInitializer() { std::function<void()> Scheduler::getThreadInitializer() {
marl::lock lock(threadInitFuncMutex); marl::lock lock(threadInitFuncMutex);
return threadInitFunc; if (!cfg.workerThread.initializer) {
return {};
}
auto init = cfg.workerThread.initializer;
return [=]() { init(0); };
} }
void Scheduler::setWorkerThreadCount(int newCount) { void Scheduler::setWorkerThreadCount(int newCount) {
...@@ -169,33 +193,34 @@ void Scheduler::setWorkerThreadCount(int newCount) { ...@@ -169,33 +193,34 @@ void Scheduler::setWorkerThreadCount(int newCount) {
newCount, int(MaxWorkerThreads), int(MaxWorkerThreads)); newCount, int(MaxWorkerThreads), int(MaxWorkerThreads));
newCount = MaxWorkerThreads; newCount = MaxWorkerThreads;
} }
auto oldCount = numWorkerThreads; auto oldCount = cfg.workerThread.count;
for (int idx = oldCount - 1; idx >= newCount; idx--) { for (int idx = oldCount - 1; idx >= newCount; idx--) {
workerThreads[idx]->stop(); workerThreads[idx]->stop();
} }
for (int idx = oldCount - 1; idx >= newCount; idx--) { for (int idx = oldCount - 1; idx >= newCount; idx--) {
allocator->destroy(workerThreads[idx]); cfg.allocator->destroy(workerThreads[idx]);
} }
for (int idx = oldCount; idx < newCount; idx++) { for (int idx = oldCount; idx < newCount; idx++) {
workerThreads[idx] = workerThreads[idx] =
allocator->create<Worker>(this, Worker::Mode::MultiThreaded, idx); cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, idx);
} }
numWorkerThreads = newCount; cfg.workerThread.count = newCount;
for (int idx = oldCount; idx < newCount; idx++) { for (int idx = oldCount; idx < newCount; idx++) {
workerThreads[idx]->start(); workerThreads[idx]->start();
} }
} }
int Scheduler::getWorkerThreadCount() { int Scheduler::getWorkerThreadCount() {
return numWorkerThreads; return cfg.workerThread.count;
} }
#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
void Scheduler::enqueue(Task&& task) { void Scheduler::enqueue(Task&& task) {
if (task.is(Task::Flags::SameThread)) { if (task.is(Task::Flags::SameThread)) {
Scheduler::Worker::getCurrent()->enqueue(std::move(task)); Worker::getCurrent()->enqueue(std::move(task));
return; return;
} }
if (numWorkerThreads > 0) { if (cfg.workerThread.count > 0) {
while (true) { while (true) {
// Prioritize workers that have recently started spinning. // Prioritize workers that have recently started spinning.
auto i = --nextSpinningWorkerIdx % spinningWorkers.size(); auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
...@@ -203,7 +228,7 @@ void Scheduler::enqueue(Task&& task) { ...@@ -203,7 +228,7 @@ void Scheduler::enqueue(Task&& task) {
if (idx < 0) { if (idx < 0) {
// If a spinning worker couldn't be found, round-robin the // If a spinning worker couldn't be found, round-robin the
// workers. // workers.
idx = nextEnqueueIndex++ % numWorkerThreads; idx = nextEnqueueIndex++ % cfg.workerThread.count;
} }
auto worker = workerThreads[idx]; auto worker = workerThreads[idx];
...@@ -213,15 +238,23 @@ void Scheduler::enqueue(Task&& task) { ...@@ -213,15 +238,23 @@ void Scheduler::enqueue(Task&& task) {
} }
} }
} else { } else {
auto worker = Worker::getCurrent(); if (auto worker = Worker::getCurrent()) {
MARL_ASSERT(worker, "singleThreadedWorker not found"); worker->enqueue(std::move(task));
worker->enqueue(std::move(task)); } else {
MARL_FATAL(
"singleThreadedWorker not found. Did you forget to call "
"marl::Scheduler::bind()?");
}
} }
} }
const Scheduler::Config& Scheduler::config() const {
return cfg;
}
bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) { bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
if (numWorkerThreads > 0) { if (cfg.workerThread.count > 0) {
auto thread = workerThreads[from % numWorkerThreads]; auto thread = workerThreads[from % cfg.workerThread.count];
if (thread != thief) { if (thread != thief) {
if (thread->steal(out)) { if (thread->steal(out)) {
return true; return true;
...@@ -237,15 +270,22 @@ void Scheduler::onBeginSpinning(int workerId) { ...@@ -237,15 +270,22 @@ void Scheduler::onBeginSpinning(int workerId) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Fiber // Scheduler::Config
////////////////////////////////////////////////////////////////////////////////
Scheduler::Config Scheduler::Config::allCores() {
return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Fiber
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id) Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
: id(id), impl(std::move(impl)), worker(Scheduler::Worker::getCurrent()) { : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound"); MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
} }
Scheduler::Fiber* Scheduler::Fiber::current() { Scheduler::Fiber* Scheduler::Fiber::current() {
auto worker = Scheduler::Worker::getCurrent(); auto worker = Worker::getCurrent();
return worker != nullptr ? worker->getCurrentFiber() : nullptr; return worker != nullptr ? worker->getCurrentFiber() : nullptr;
} }
...@@ -367,13 +407,13 @@ void Scheduler::Worker::start() { ...@@ -367,13 +407,13 @@ void Scheduler::Worker::start() {
thread = Thread(id, [=] { thread = Thread(id, [=] {
Thread::setName("Thread<%.2d>", int(id)); Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->getThreadInitializer()) { if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
initFunc(); initFunc(id);
} }
Scheduler::bound = scheduler; Scheduler::bound = scheduler;
Worker::current = this; Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0); mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get(); currentFiber = mainFiber.get();
{ {
marl::lock lock(work.mutex); marl::lock lock(work.mutex);
...@@ -386,7 +426,7 @@ void Scheduler::Worker::start() { ...@@ -386,7 +426,7 @@ void Scheduler::Worker::start() {
case Mode::SingleThreaded: case Mode::SingleThreaded:
Worker::current = this; Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->allocator, 0); mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get(); currentFiber = mainFiber.get();
break; break;
...@@ -709,7 +749,7 @@ void Scheduler::Worker::runUntilIdle() { ...@@ -709,7 +749,7 @@ void Scheduler::Worker::runUntilIdle() {
Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() { Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1); auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId); DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
auto fiber = Fiber::create(scheduler->allocator, fiberId, FiberStackSize, auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
[&]() REQUIRES(work.mutex) { run(); }); [&]() REQUIRES(work.mutex) { run(); });
auto ptr = fiber.get(); auto ptr = fiber.get();
workerFibers.push_back(std::move(fiber)); workerFibers.push_back(std::move(fiber));
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "marl_test.h" #include "marl_test.h"
#include "marl/defer.h" #include "marl/defer.h"
#include "marl/event.h"
#include "marl/waitgroup.h" #include "marl/waitgroup.h"
#include <atomic> #include <atomic>
...@@ -117,13 +118,12 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) { ...@@ -117,13 +118,12 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
threads.push_back(std::thread([=] { threads.push_back(std::thread([=] {
scheduler->bind(); scheduler->bind();
defer(scheduler->unbind());
auto threadID = std::this_thread::get_id(); auto threadID = std::this_thread::get_id();
fence.wait(); fence.wait();
ASSERT_EQ(threadID, std::this_thread::get_id()); ASSERT_EQ(threadID, std::this_thread::get_id());
wg.done(); wg.done();
scheduler->unbind();
})); }));
} }
// just to try and get some tasks to yield. // just to try and get some tasks to yield.
...@@ -139,6 +139,7 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) { ...@@ -139,6 +139,7 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) { TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler()); auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
scheduler->bind(); scheduler->bind();
defer(scheduler->unbind());
scheduler->setWorkerThreadCount(8); scheduler->setWorkerThreadCount(8);
std::mutex mutex; std::mutex mutex;
std::unordered_set<std::thread::id> threads; std::unordered_set<std::thread::id> threads;
...@@ -155,6 +156,47 @@ TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) { ...@@ -155,6 +156,47 @@ TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
ASSERT_LE(threads.size(), 8U); ASSERT_LE(threads.size(), 8U);
ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U); ASSERT_EQ(threads.count(std::this_thread::get_id()), 0U);
}
scheduler->unbind(); // Test that a marl::Scheduler *with dedicated worker threads* can be used
} // without first binding to the scheduling thread.
\ No newline at end of file TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
scheduler->setWorkerThreadCount(8);
marl::WaitGroup wg;
for (int i = 0; i < 100; i++) {
wg.add(1);
marl::Event event;
scheduler->enqueue(marl::Task([event, wg] {
event.wait(); // Test that tasks can wait on other tasks.
wg.done();
}));
scheduler->enqueue(marl::Task([event, &scheduler] {
// Despite the main thread never binding the scheduler, the scheduler
// should be automatically bound to worker threads.
ASSERT_EQ(marl::Scheduler::get(), scheduler.get());
event.signal();
}));
}
// As the scheduler has not been bound to the main thread, the wait() call
// here will block **without** fiber yielding.
wg.wait();
}
// Test that a marl::Scheduler *without dedicated worker threads* cannot be used
// without first binding to the scheduling thread.
TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
#if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
"Did you forget to call marl::Scheduler::bind");
#elif !MARL_DEBUG_ENABLED
scheduler->enqueue(marl::Task([] { FAIL() << "Should not be called"; }));
#endif
}
...@@ -31,6 +31,11 @@ ...@@ -31,6 +31,11 @@
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <thread> #include <thread>
#elif defined(__FreeBSD__)
#include <pthread.h>
#include <pthread_np.h>
#include <unistd.h>
#include <thread>
#else #else
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
...@@ -211,6 +216,8 @@ void Thread::setName(const char* fmt, ...) { ...@@ -211,6 +216,8 @@ void Thread::setName(const char* fmt, ...) {
#if defined(__APPLE__) #if defined(__APPLE__)
pthread_setname_np(name); pthread_setname_np(name);
#elif defined(__FreeBSD__)
pthread_set_name_np(pthread_self(), name);
#elif !defined(__Fuchsia__) #elif !defined(__Fuchsia__)
pthread_setname_np(pthread_self(), name); pthread_setname_np(pthread_self(), name);
#endif #endif
...@@ -219,7 +226,7 @@ void Thread::setName(const char* fmt, ...) { ...@@ -219,7 +226,7 @@ void Thread::setName(const char* fmt, ...) {
} }
unsigned int Thread::numLogicalCPUs() { unsigned int Thread::numLogicalCPUs() {
return sysconf(_SC_NPROCESSORS_ONLN); return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN));
} }
#endif // OS #endif // OS
......
...@@ -22,14 +22,14 @@ TEST_F(WithoutBoundScheduler, WaitGroupDone) { ...@@ -22,14 +22,14 @@ TEST_F(WithoutBoundScheduler, WaitGroupDone) {
wg.done(); wg.done();
} }
#if MARL_DEBUG_ENABLED #if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
TEST_F(WithoutBoundScheduler, WaitGroupDoneTooMany) { TEST_F(WithoutBoundScheduler, WaitGroupDoneTooMany) {
marl::WaitGroup wg(2); // Should not require a scheduler. marl::WaitGroup wg(2); // Should not require a scheduler.
wg.done(); wg.done();
wg.done(); wg.done();
EXPECT_DEATH(wg.done(), "done\\(\\) called too many times"); EXPECT_DEATH(wg.done(), "done\\(\\) called too many times");
} }
#endif // MARL_DEBUG_ENABLED #endif // MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
TEST_P(WithBoundScheduler, WaitGroup_OneTask) { TEST_P(WithBoundScheduler, WaitGroup_OneTask) {
marl::WaitGroup wg(1); marl::WaitGroup wg(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment