Commit 569a9a43 by Ben Clayton

Update Marl to 94a361cf0

Contains a fix for an arm linker error (chromium:1058107) and scaling beyond 64 threads on Windows. Changes: 94a361cf0 thread.cpp Fix minor bug in getProcessorGroups() 0249a2624 Thread: Use WaitForObject for Thread::join() 62f209bbb arm: Annotate marl_fiber_swap as type %function 00f091e08 Update README.md 773d9f475 Add new example 'tasks_in_tasks' 3f69e73ce Scheduler: Replace use of std::thread with marl::Thread Commands: git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash Bug: b/140546382 Bug: chromium:1058107 Change-Id: Ic5a96ad5f054a19f6a5a77e1f106c73ba60c0a78
parents 7e857092 3e2f8421
...@@ -264,4 +264,5 @@ if(MARL_BUILD_EXAMPLES) ...@@ -264,4 +264,5 @@ if(MARL_BUILD_EXAMPLES)
build_example(fractal) build_example(fractal)
build_example(hello_task) build_example(hello_task)
build_example(primes) build_example(primes)
build_example(tasks_in_tasks)
endif(MARL_BUILD_EXAMPLES) endif(MARL_BUILD_EXAMPLES)
...@@ -65,6 +65,12 @@ int main() { ...@@ -65,6 +65,12 @@ int main() {
} }
``` ```
## Benchmarks
Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
## Building ## Building
Marl contains many unit tests and examples that can be built using CMake. Marl contains many unit tests and examples that can be built using CMake.
...@@ -117,10 +123,6 @@ set(MARL_GOOGLETEST_DIR <path-to-googletest>) # defaults to ${MARL_THIR ...@@ -117,10 +123,6 @@ set(MARL_GOOGLETEST_DIR <path-to-googletest>) # defaults to ${MARL_THIR
add_subdirectory(${MARL_DIR}) add_subdirectory(${MARL_DIR})
``` ```
## Benchmarks
Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
--- ---
Note: This is not an officially supported Google product Note: This is not an officially supported Google product
// Copyright 2020 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Example of a task that creates and waits on sub tasks.
#include "marl/defer.h"
#include "marl/scheduler.h"
#include "marl/waitgroup.h"
#include <cstdio>
int main() {
// Create a marl scheduler using the 4 hardware threads.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
scheduler.bind();
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
// marl::schedule() requires the scheduler to be bound to the current thread
// (see above). The scheduler ensures that tasks are run on a thread with the
// same scheduler automatically bound, so we don't need to call
// marl::Scheduler::bind() again below.
// Sequence of task events:
// __________________________________________________________
// | |
// | ---> [task B] ---- |
// | / \ |
// | [task A] -----> [task A: wait] -----> [task A: resume] |
// | \ / |
// | ---> [task C] ---- |
// |__________________________________________________________|
// Create a WaitGroup for waiting for task A to finish.
// This has an initial count of 1 (A)
marl::WaitGroup a_wg(1);
// Schedule task A
marl::schedule([=] {
defer(a_wg.done()); // Decrement a_wg when task A is done
printf("Hello from task A\n");
printf("Starting tasks B and C...\n");
// Create a WaitGroup for waiting on task B and C to finish.
// This has an initial count of 2 (B + C)
marl::WaitGroup bc_wg(2);
// Schedule task B
marl::schedule([=] {
defer(bc_wg.done()); // Decrement bc_wg when task B is done
printf("Hello from task B\n");
});
// Schedule task C
marl::schedule([=] {
defer(bc_wg.done()); // Decrement bc_wg when task C is done
printf("Hello from task C\n");
});
// Wait for tasks B and C to finish.
bc_wg.wait();
});
// Wait for task A (and so B and C) to finish.
a_wg.wait();
printf("Task A has finished\n");
}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "debug.h" #include "debug.h"
#include "memory.h" #include "memory.h"
#include "sal.h" #include "sal.h"
#include "thread.h"
#include <array> #include <array>
#include <atomic> #include <atomic>
...@@ -411,7 +412,7 @@ class Scheduler { ...@@ -411,7 +412,7 @@ class Scheduler {
Scheduler* const scheduler; Scheduler* const scheduler;
Allocator::unique_ptr<Fiber> mainFiber; Allocator::unique_ptr<Fiber> mainFiber;
Fiber* currentFiber = nullptr; Fiber* currentFiber = nullptr;
std::thread thread; Thread thread;
Work work; Work work;
FiberSet idleFibers; // Fibers that have completed which can be reused. FiberSet idleFibers; // Fibers that have completed which can be reused.
std::vector<Allocator::unique_ptr<Fiber>> std::vector<Allocator::unique_ptr<Fiber>>
......
...@@ -15,12 +15,34 @@ ...@@ -15,12 +15,34 @@
#ifndef marl_thread_h #ifndef marl_thread_h
#define marl_thread_h #define marl_thread_h
#include <functional>
namespace marl { namespace marl {
// Thread contains static methods that abstract OS-specific thread / cpu // Thread provides an OS abstraction for threads of execution.
// Thread is used by marl instead of std::thread as Windows does not naturally
// scale beyond 64 logical threads on a single CPU, unless you use the Win32
// API.
// Thread alsocontains static methods that abstract OS-specific thread / cpu
// queries and control. // queries and control.
class Thread { class Thread {
public: public:
using Func = std::function<void()>;
Thread() = default;
Thread(Thread&&);
Thread& operator=(Thread&&);
// Start a new thread that calls func.
// logicalCpuHint is a hint to run the thread on the specified logical CPU.
// logicalCpuHint may be entirely ignored.
Thread(unsigned int logicalCpuHint, const Func& func);
~Thread();
// join() blocks until the thread completes.
void join();
// setName() sets the name of the currently executing thread for displaying // setName() sets the name of the currently executing thread for displaying
// in a debugger. // in a debugger.
static void setName(const char* fmt, ...); static void setName(const char* fmt, ...);
...@@ -28,6 +50,13 @@ class Thread { ...@@ -28,6 +50,13 @@ class Thread {
// numLogicalCPUs() returns the number of available logical CPU cores for // numLogicalCPUs() returns the number of available logical CPU cores for
// the system. // the system.
static unsigned int numLogicalCPUs(); static unsigned int numLogicalCPUs();
private:
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
class Impl;
Impl* impl = nullptr;
}; };
} // namespace marl } // namespace marl
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
.text .text
.global marl_fiber_swap .global marl_fiber_swap
.align 4 .align 4
.type marl_fiber_swap, %function
marl_fiber_swap: marl_fiber_swap:
// Save context 'from' // Save context 'from'
......
...@@ -361,7 +361,7 @@ Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id) ...@@ -361,7 +361,7 @@ Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
void Scheduler::Worker::start() { void Scheduler::Worker::start() {
switch (mode) { switch (mode) {
case Mode::MultiThreaded: case Mode::MultiThreaded:
thread = std::thread([=] { thread = Thread(id, [=] {
Thread::setName("Thread<%.2d>", int(id)); Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->getThreadInitializer()) { if (auto const& initFunc = scheduler->getThreadInitializer()) {
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include "marl/thread.h" #include "marl/thread.h"
#include "marl/debug.h"
#include "marl/defer.h"
#include "marl/trace.h" #include "marl/trace.h"
#include <cstdarg> #include <cstdarg>
...@@ -22,20 +24,135 @@ ...@@ -22,20 +24,135 @@
#if defined(_WIN32) #if defined(_WIN32)
#define WIN32_LEAN_AND_MEAN 1 #define WIN32_LEAN_AND_MEAN 1
#include <windows.h> #include <windows.h>
#include <cstdlib> // mbstowcs #include <cstdlib> // mbstowcs
#include <vector>
#elif defined(__APPLE__) #elif defined(__APPLE__)
#include <mach/thread_act.h> #include <mach/thread_act.h>
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <thread>
#else #else
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <thread>
#endif #endif
namespace marl { namespace marl {
#if defined(_WIN32) #if defined(_WIN32)
#define CHECK_WIN32(expr) \
do { \
auto res = expr; \
(void)res; \
MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
(int)GetLastError()); \
} while (false)
namespace {
struct ProcessorGroup {
unsigned int count; // number of logical processors in this group.
KAFFINITY affinity; // affinity mask.
};
const std::vector<ProcessorGroup>& getProcessorGroups() {
static std::vector<ProcessorGroup> groups = [] {
std::vector<ProcessorGroup> out;
SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
DWORD size = sizeof(info);
CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
for (DWORD i = 0; i < count; i++) {
if (info[i].Relationship == RelationGroup) {
auto groupCount = info[i].Group.ActiveGroupCount;
for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
out.emplace_back(ProcessorGroup{groupInfo.ActiveProcessorCount,
groupInfo.ActiveProcessorMask});
}
}
}
return out;
}();
return groups;
}
bool getGroupAffinity(unsigned int index, GROUP_AFFINITY* groupAffinity) {
auto& groups = getProcessorGroups();
for (size_t groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
auto& group = groups[groupIdx];
if (index < group.count) {
for (int i = 0; i < sizeof(group.affinity) * 8; i++) {
if (group.affinity & (1ULL << i)) {
if (index == 0) {
groupAffinity->Group = static_cast<WORD>(groupIdx);
// Use the whole group's affinity, as the OS is then able to shuffle
// threads around based on external demands. Pinning these to a
// single core can cause up to 20% performance loss in benchmarking.
groupAffinity->Mask = group.affinity;
return true;
}
index--;
}
}
return false;
} else {
index -= group.count;
}
}
return false;
}
} // namespace
class Thread::Impl {
public:
Impl(const Func& func) : func(func) {}
static DWORD WINAPI run(void* self) {
reinterpret_cast<Impl*>(self)->func();
return 0;
}
Func func;
HANDLE handle;
};
Thread::Thread(unsigned int logicalCpu, const Func& func) {
SIZE_T size = 0;
InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
MARL_ASSERT(size > 0,
"InitializeProcThreadAttributeList() did not give a size");
LPPROC_THREAD_ATTRIBUTE_LIST attributes =
reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(alloca(size));
CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
defer(DeleteProcThreadAttributeList(attributes));
GROUP_AFFINITY groupAffinity = {};
if (getGroupAffinity(logicalCpu, &groupAffinity)) {
CHECK_WIN32(UpdateProcThreadAttribute(
attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
sizeof(groupAffinity), nullptr, nullptr));
}
impl = new Impl(func);
impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
&Impl::run, impl, 0, attributes, nullptr);
}
Thread::~Thread() {
if (impl) {
CloseHandle(impl->handle);
delete impl;
}
}
void Thread::join() {
MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
WaitForSingleObject(impl->handle, INFINITE);
}
void Thread::setName(const char* fmt, ...) { void Thread::setName(const char* fmt, ...) {
static auto setThreadDescription = static auto setThreadDescription =
reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress( reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
...@@ -57,25 +174,33 @@ void Thread::setName(const char* fmt, ...) { ...@@ -57,25 +174,33 @@ void Thread::setName(const char* fmt, ...) {
} }
unsigned int Thread::numLogicalCPUs() { unsigned int Thread::numLogicalCPUs() {
DWORD_PTR processAffinityMask = 1; unsigned int count = 0;
DWORD_PTR systemAffinityMask = 1; for (auto& group : getProcessorGroups()) {
count += group.count;
GetProcessAffinityMask(GetCurrentProcess(), &processAffinityMask,
&systemAffinityMask);
auto count = 0;
while (processAffinityMask > 0) {
if (processAffinityMask & 1) {
count++;
}
processAffinityMask >>= 1;
} }
return count; return count;
} }
#else #else
class Thread::Impl {
public:
template <typename F>
Impl(F&& func) : thread(func) {}
std::thread thread;
};
Thread::Thread(unsigned int /* logicalCpu */, const Func& func)
: impl(new Thread::Impl(func)) {}
Thread::~Thread() {
delete impl;
}
void Thread::join() {
impl->thread.join();
}
void Thread::setName(const char* fmt, ...) { void Thread::setName(const char* fmt, ...) {
char name[1024]; char name[1024];
va_list vararg; va_list vararg;
...@@ -96,6 +221,20 @@ unsigned int Thread::numLogicalCPUs() { ...@@ -96,6 +221,20 @@ unsigned int Thread::numLogicalCPUs() {
return sysconf(_SC_NPROCESSORS_ONLN); return sysconf(_SC_NPROCESSORS_ONLN);
} }
#endif #endif // OS
Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
rhs.impl = nullptr;
}
Thread& Thread::operator=(Thread&& rhs) {
if (impl) {
delete impl;
impl = nullptr;
}
impl = rhs.impl;
rhs.impl = nullptr;
return *this;
}
} // namespace marl } // namespace marl
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment