Commit 32b1d4b9 by Ben Clayton

Squashed 'third_party/marl/' changes from 38c0c7a0f..c51271125

c51271125 README.md: Repoint link from `master` to `main` da0a9c610 Add basic CMake rules for versioning marl 0e639c3c7 Benchmarks: Simplify tests further. 32af8bb50 Kokoro: Enable debug checks 8cf8dc033 Wrap all stl containers with a marl::StlAllocator 325b072b9 Support specifying worker thread affinities 49fe9a17f marl::containers: Add const methods 11f31bfbe Benchmarks: Warn if benchmarking with sanitizers bfdf613e7 Benchmarks: Add MARL_FULL_BENCHMARK flag 5e2383370 Benchmarks: Allow running with custom Config c277c61b0 marl::containers::vector fixes fcbe1f279 Add issue number to MARL_DEPRECATED() 1f010cad7 Warn about use of deprecated APIs d2e553bff Don't use deprecated scheduler methods git-subtree-dir: third_party/marl git-subtree-split: c51271125451c599efb9ec58b355a4c434296a8f
parent 068a0c55
......@@ -14,9 +14,15 @@
cmake_minimum_required(VERSION 3.0)
include(cmake/parse_version.cmake)
parse_version("${CMAKE_CURRENT_SOURCE_DIR}/VERSION" MARL)
set(CMAKE_CXX_STANDARD 11)
project(Marl C CXX ASM)
project(Marl
VERSION "${MARL_VERSION_MAJOR}.${MARL_VERSION_MINOR}.${MARL_VERSION_PATCH}"
LANGUAGES C CXX ASM
)
include(CheckCXXSourceCompiles)
......@@ -29,7 +35,7 @@ endif()
###########################################################
# Options
###########################################################
function (option_if_not_defined name description default)
function(option_if_not_defined name description default)
if(NOT DEFINED ${name})
option(${name} ${description} ${default})
endif()
......@@ -39,15 +45,18 @@ option_if_not_defined(MARL_WARNINGS_AS_ERRORS "Treat warnings as errors" OFF)
option_if_not_defined(MARL_BUILD_EXAMPLES "Build example applications" OFF)
option_if_not_defined(MARL_BUILD_TESTS "Build tests" OFF)
option_if_not_defined(MARL_BUILD_BENCHMARKS "Build benchmarks" OFF)
option_if_not_defined(MARL_BUILD_SHARED "Build marl as a shared / dynamic library (default static)" OFF)
option_if_not_defined(MARL_ASAN "Build marl with address sanitizer" OFF)
option_if_not_defined(MARL_MSAN "Build marl with memory sanitizer" OFF)
option_if_not_defined(MARL_TSAN "Build marl with thread sanitizer" OFF)
option_if_not_defined(MARL_INSTALL "Create marl install target" OFF)
option_if_not_defined(MARL_FULL_BENCHMARK "Run benchmarks for [0 .. numLogicalCPUs] with no stepping" OFF)
option_if_not_defined(MARL_DEBUG_ENABLED "Enable debug checks even in release builds" OFF)
###########################################################
# Directories
###########################################################
function (set_if_not_defined name value)
function(set_if_not_defined name value)
if(NOT DEFINED ${name})
set(${name} ${value} PARENT_SCOPE)
endif()
......@@ -147,7 +156,7 @@ find_package(Threads REQUIRED)
# Functions
###########################################################
function(marl_set_target_options target)
if (MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
if(MARL_THREAD_SAFETY_ANALYSIS_SUPPORTED)
target_compile_options(${target} PRIVATE "-Wthread-safety")
endif()
......@@ -186,6 +195,10 @@ function(marl_set_target_options target)
target_link_libraries(${target} PUBLIC "-fsanitize=thread")
endif()
if(MARL_DEBUG_ENABLED)
target_compile_definitions(${target} PRIVATE "MARL_DEBUG_ENABLED=1")
endif()
target_include_directories(${target} PUBLIC $<BUILD_INTERFACE:${MARL_INCLUDE_DIR}>)
endfunction(marl_set_target_options)
......@@ -194,9 +207,16 @@ endfunction(marl_set_target_options)
###########################################################
# marl
add_library(marl STATIC ${MARL_LIST})
if(MARL_BUILD_SHARED) # Can also be controlled by BUILD_SHARED_LIBS
add_library(marl SHARED ${MARL_LIST})
else()
add_library(marl ${MARL_LIST})
endif()
set_target_properties(marl PROPERTIES
POSITION_INDEPENDENT_CODE 1
VERSION ${MARL_VERSION}
SOVERSION "${MARL_VERSION_MAJOR}"
)
marl_set_target_options(marl)
......@@ -253,6 +273,7 @@ if(MARL_BUILD_TESTS)
${MARL_SRC_DIR}/parallelize_test.cpp
${MARL_SRC_DIR}/pool_test.cpp
${MARL_SRC_DIR}/scheduler_test.cpp
${MARL_SRC_DIR}/thread_test.cpp
${MARL_SRC_DIR}/ticket_test.cpp
${MARL_SRC_DIR}/waitgroup_test.cpp
${MARL_GOOGLETEST_DIR}/googletest/src/gtest-all.cc
......@@ -293,6 +314,10 @@ if(MARL_BUILD_BENCHMARKS)
marl_set_target_options(marl-benchmarks)
target_compile_definitions(marl-benchmarks PRIVATE
"MARL_FULL_BENCHMARK=${MARL_FULL_BENCHMARK}"
)
target_link_libraries(marl-benchmarks PRIVATE benchmark::benchmark marl)
endif(MARL_BUILD_BENCHMARKS)
......
......@@ -23,11 +23,10 @@ Example:
#include <cstdio>
int main() {
// Create a marl scheduler using the 4 hardware threads.
// Create a marl scheduler using all the logical processors available to the process.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
scheduler.bind();
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
constexpr int numTasks = 10;
......@@ -140,7 +139,7 @@ Internally, these primitives hold a shared pointer to the primitive state. By ca
#### Create one instance of `marl::Scheduler`, use it for the lifetime of the process.
`marl::Scheduler::setWorkerThreadCount()` is an expensive operation as it spawn a number of hardware threads. \
The `marl::Scheduler` constructor can be expensive as it may spawn a number of hardware threads. \
Destructing the `marl::Scheduler` requires waiting on all tasks to complete.
Multiple `marl::Scheduler`s may fight each other for hardware thread utilization.
......@@ -151,9 +150,8 @@ For example:
```c++
int main() {
marl::Scheduler scheduler;
marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
scheduler.bind();
scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
defer(scheduler.unbind());
return do_program_stuff();
......@@ -196,7 +194,7 @@ Calling a non-marl blocking function on a marl worker thread will prevent that w
Short blocking calls are acceptable, such as a mutex lock to access a data structure. However be careful that you do not use a marl blocking call with a `std::mutex` lock held - the marl task may yield with the lock held, and block other tasks from re-locking the mutex. This sort of situation may end up with a deadlock.
If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/master/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
If you need to make a blocking call from a marl worker thread, you may wish to use [`marl::blocking_call()`](https://github.com/google/marl/blob/main/include/marl/blockingcall.h), which will spawn a new thread for performing the call, allowing the marl worker to continue processing other scheduled tasks.
---
......
0.0.0-dev
\ No newline at end of file
# Copyright 2020 The Marl Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# parse_version() reads and parses the version string from FILE, assigning the
# version string to ${PROJECT}_VERSION and the parsed version to
# ${PROJECT}_VERSION_MAJOR, ${PROJECT}_VERSION_MINOR, ${PROJECT}_VERSION_PATCH,
# and the optional ${PROJECT}_VERSION_FLAVOR.
#
# The version string take one of the forms:
# <major>.<minor>.<patch>
# <major>.<minor>.<patch>-<flavor>
function(parse_version FILE PROJECT)
configure_file(${FILE} "${CMAKE_CURRENT_BINARY_DIR}/VERSION") # Required to re-run cmake on version change
file(READ ${FILE} VERSION)
if(${VERSION} MATCHES "([0-9]+)\\.([0-9]+)\\.([0-9]+)(-[a-zA-Z0-9]+)?")
set(FLAVOR "")
if(NOT "${CMAKE_MATCH_4}" STREQUAL "")
string(SUBSTRING ${CMAKE_MATCH_4} 1 -1 FLAVOR)
endif()
set("${PROJECT}_VERSION" ${VERSION} PARENT_SCOPE)
set("${PROJECT}_VERSION_MAJOR" ${CMAKE_MATCH_1} PARENT_SCOPE)
set("${PROJECT}_VERSION_MINOR" ${CMAKE_MATCH_2} PARENT_SCOPE)
set("${PROJECT}_VERSION_PATCH" ${CMAKE_MATCH_3} PARENT_SCOPE)
set("${PROJECT}_VERSION_FLAVOR" ${FLAVOR} PARENT_SCOPE)
else()
message(FATAL_ERROR "Unable to parse version string '${VERSION}'")
endif()
endfunction()
......@@ -82,7 +82,7 @@ The scheduler holds a number of `marl::Scheduler::Worker`s. Each worker holds:
When a task is scheduled with a call to `marl::schedule()`, a worker is picked, and the task is placed on to the worker's `work.tasks` queue. The worker is picked using the following rules:
- If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then the task is queued on to the [Single-Threaded-Worker](#single-threaded-workers) for the currently executing thread.
- If the scheduler has no dedicated worker threads (`marl::Scheduler::config().workerThreads.count == 0`), then the task is queued on to the [Single-Threaded-Worker](#single-threaded-workers) for the currently executing thread.
- Otherwise one of the [Multi-Threaded-Workers](#multi-threaded-workers) is picked. If any workers have entered a [spin-for-work](#marlschedulerworkerspinforwork) state, then these will be prioritized, otherwise a [Multi-Threaded-Worker](#multi-threaded-workers) is picked in a round-robin fashion.
### `marl::Scheduler::Worker::run()`
......@@ -188,15 +188,17 @@ The most significant difference is that the Multi-Threaded-Worker spawns a dedic
A single-threaded-worker (STW) is created for each thread that is bound with a call to `marl::Scheduler::bind()`.
If the scheduler has no dedicated worker threads (`marl::Scheduler::getWorkerThreadCount() == 0`), then scheduled tasks are queued on to the STW for the currently executing thread.
If the scheduler has no dedicated worker threads (`marl::Scheduler::config().workerThreads.count == 0`), then scheduled tasks are queued on to the STW for the currently executing thread.
Because in this mode there are no worker threads, the tasks queued on the STW are not automatically background executed. Instead, tasks are only executed whenever there's a call to [`marl::Scheduler::Worker::suspend()`](#marlschedulerworkersuspend).
The logic for [`suspend()`](#marlschedulerworkersuspend) is common for STWs and MTWs, spawning new fibers that call [`marl::Scheduler::Worker::run()`](#marlschedulerworkerrun) whenever all other fibers are blocked.
```c++
void SingleThreadedWorkerExample() {
marl::Scheduler scheduler;
scheduler.setWorkerThreadCount(0); // STW mode.
marl::Scheduler::Config cfg;
cfg.setWorkerThreadCount(0); // STW mode.
marl::Scheduler scheduler(cfg);
scheduler.bind();
defer(scheduler.unbind());
......@@ -219,7 +221,7 @@ void SingleThreadedWorkerExample() {
### Multi-Threaded-Workers
Multi-Threaded-Workers are created when `marl::Scheduler::setWorkerThreadCount()` is called with a positive number.
Multi-Threaded-Workers are created when the `marl::Scheduler` is constructed with a positive number of worker threads (`marl::Scheduler::Config::workerThread::count > 0`).
Each MTW is paired with a new `std::thread` that begins by calling `marl::Scheduler::Worker::run()`.
......
......@@ -149,8 +149,7 @@ constexpr float cy = 0.156f;
int main() {
// Create a marl scheduler using the full number of logical cpus.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
scheduler.bind();
defer(scheduler.unbind()); // unbind before destructing the scheduler.
......
......@@ -24,9 +24,11 @@
int main() {
// Create a marl scheduler using the 4 hardware threads.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
marl::Scheduler::Config cfg;
cfg.setWorkerThreadCount(4);
marl::Scheduler scheduler(cfg);
scheduler.bind();
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
constexpr int numTasks = 10;
......
......@@ -20,6 +20,8 @@
#include "marl/thread.h"
#include "marl/ticket.h"
#include <vector>
#include <math.h>
// searchMax defines the upper limit on primes to find.
......@@ -42,8 +44,7 @@ bool isPrime(int i) {
int main() {
// Create a marl scheduler using the full number of logical cpus.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
scheduler.setWorkerThreadCount(marl::Thread::numLogicalCPUs());
marl::Scheduler scheduler(marl::Scheduler::Config::allCores());
scheduler.bind();
defer(scheduler.unbind()); // unbind before destructing the scheduler.
......
......@@ -23,9 +23,11 @@
int main() {
// Create a marl scheduler using the 4 hardware threads.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
marl::Scheduler::Config cfg;
cfg.setWorkerThreadCount(4);
marl::Scheduler scheduler(cfg);
scheduler.bind();
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
// marl::schedule() requires the scheduler to be bound to the current thread
......
......@@ -22,18 +22,66 @@
#include <cstddef> // size_t
#include <utility> // std::move
#include <deque>
#include <map>
#include <set>
#include <unordered_map>
#include <unordered_set>
namespace marl {
namespace containers {
////////////////////////////////////////////////////////////////////////////////
// STL wrappers
// STL containers that use a marl::StlAllocator backed by a marl::Allocator.
// Note: These may be re-implemented to optimize for marl's usage cases.
// See: https://github.com/google/marl/issues/129
////////////////////////////////////////////////////////////////////////////////
template <typename T>
using deque = std::deque<T, StlAllocator<T>>;
template <typename K, typename V, typename C = std::less<K>>
using map = std::map<K, V, C, StlAllocator<std::pair<const K, V>>>;
template <typename K, typename C = std::less<K>>
using set = std::set<K, C, StlAllocator<K>>;
template <typename K,
typename V,
typename H = std::hash<K>,
typename E = std::equal_to<K>>
using unordered_map =
std::unordered_map<K, V, H, E, StlAllocator<std::pair<const K, V>>>;
template <typename K, typename H = std::hash<K>, typename E = std::equal_to<K>>
using unordered_set = std::unordered_set<K, H, E, StlAllocator<K>>;
// take() takes and returns the front value from the deque.
template <typename T>
inline T take(deque<T>& queue) {
auto out = std::move(queue.front());
queue.pop_front();
return out;
}
// take() takes and returns the first value from the unordered_set.
template <typename T, typename H, typename E>
inline T take(unordered_set<T, H, E>& set) {
auto it = set.begin();
auto out = std::move(*it);
set.erase(it);
return out;
}
////////////////////////////////////////////////////////////////////////////////
// vector<T, BASE_CAPACITY>
////////////////////////////////////////////////////////////////////////////////
// vector is a container of contiguously stored elements.
// Unlike std::vector, marl::containers::vector keeps the first BASE_CAPACITY
// elements internally, which will avoid dynamic heap allocations.
// Once the vector exceeds BASE_CAPACITY elements, vector will allocate storage
// from the heap.
// Unlike std::vector, marl::containers::vector keeps the first
// BASE_CAPACITY elements internally, which will avoid dynamic heap
// allocations. Once the vector exceeds BASE_CAPACITY elements, vector will
// allocate storage from the heap.
template <typename T, int BASE_CAPACITY>
class vector {
public:
......@@ -49,6 +97,8 @@ class vector {
inline ~vector();
inline vector& operator=(const vector&);
template <int BASE_CAPACITY_2>
inline vector<T, BASE_CAPACITY>& operator=(const vector<T, BASE_CAPACITY_2>&);
......@@ -60,21 +110,30 @@ class vector {
inline void pop_back();
inline T& front();
inline T& back();
inline const T& front() const;
inline const T& back() const;
inline T* begin();
inline T* end();
inline const T* begin() const;
inline const T* end() const;
inline T& operator[](size_t i);
inline const T& operator[](size_t i) const;
inline size_t size() const;
inline size_t cap() const;
inline void resize(size_t n);
inline void reserve(size_t n);
inline T* data();
inline const T* data() const;
Allocator* const allocator;
private:
using TStorage = typename marl::aligned_storage<sizeof(T), alignof(T)>::type;
vector(const vector&) = delete;
inline void free();
Allocator* const allocator;
size_t count = 0;
size_t capacity = BASE_CAPACITY;
TStorage buffer[BASE_CAPACITY];
......@@ -111,6 +170,18 @@ vector<T, BASE_CAPACITY>::~vector() {
}
template <typename T, int BASE_CAPACITY>
vector<T, BASE_CAPACITY>& vector<T, BASE_CAPACITY>::operator=(
const vector& other) {
free();
reserve(other.size());
count = other.size();
for (size_t i = 0; i < count; i++) {
new (&reinterpret_cast<T*>(elements)[i]) T(other[i]);
}
return *this;
}
template <typename T, int BASE_CAPACITY>
template <int BASE_CAPACITY_2>
vector<T, BASE_CAPACITY>& vector<T, BASE_CAPACITY>::operator=(
const vector<T, BASE_CAPACITY_2>& other) {
......@@ -171,6 +242,18 @@ T& vector<T, BASE_CAPACITY>::back() {
}
template <typename T, int BASE_CAPACITY>
const T& vector<T, BASE_CAPACITY>::front() const {
MARL_ASSERT(count > 0, "front() called on empty vector");
return reinterpret_cast<T*>(elements)[0];
}
template <typename T, int BASE_CAPACITY>
const T& vector<T, BASE_CAPACITY>::back() const {
MARL_ASSERT(count > 0, "back() called on empty vector");
return reinterpret_cast<T*>(elements)[count - 1];
}
template <typename T, int BASE_CAPACITY>
T* vector<T, BASE_CAPACITY>::begin() {
return reinterpret_cast<T*>(elements);
}
......@@ -181,6 +264,16 @@ T* vector<T, BASE_CAPACITY>::end() {
}
template <typename T, int BASE_CAPACITY>
const T* vector<T, BASE_CAPACITY>::begin() const {
return reinterpret_cast<T*>(elements);
}
template <typename T, int BASE_CAPACITY>
const T* vector<T, BASE_CAPACITY>::end() const {
return reinterpret_cast<T*>(elements) + count;
}
template <typename T, int BASE_CAPACITY>
T& vector<T, BASE_CAPACITY>::operator[](size_t i) {
MARL_ASSERT(i < count, "index %d exceeds vector size %d", int(i), int(count));
return reinterpret_cast<T*>(elements)[i];
......@@ -231,6 +324,16 @@ void vector<T, BASE_CAPACITY>::reserve(size_t n) {
}
template <typename T, int BASE_CAPACITY>
T* vector<T, BASE_CAPACITY>::data() {
return elements;
}
template <typename T, int BASE_CAPACITY>
const T* vector<T, BASE_CAPACITY>::data() const {
return elements;
}
template <typename T, int BASE_CAPACITY>
void vector<T, BASE_CAPACITY>::free() {
for (size_t i = 0; i < count; i++) {
reinterpret_cast<T*>(elements)[i].~T();
......@@ -238,6 +341,7 @@ void vector<T, BASE_CAPACITY>::free() {
if (allocation.ptr != nullptr) {
allocator->free(allocation);
allocation = {};
elements = nullptr;
}
}
......
......@@ -24,13 +24,23 @@
#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
#ifndef MARL_WARN_DEPRECATED
#define MARL_WARN_DEPRECATED 0
#define MARL_WARN_DEPRECATED 1
#endif // MARL_WARN_DEPRECATED
#if MARL_WARN_DEPRECATED
#define MARL_DEPRECATED(message) __attribute__((deprecated(message)))
#if defined(_WIN32)
#define MARL_DEPRECATED(issue_num, message) \
__declspec(deprecated( \
message "\nSee: https://github.com/google/marl/issues/" #issue_num \
" for more information"))
#else
#define MARL_DEPRECATED(message)
#define MARL_DEPRECATED(issue_num, message) \
__attribute__((deprecated( \
message "\nSee: https://github.com/google/marl/issues/" #issue_num \
" for more information")))
#endif
#else
#define MARL_DEPRECATED(issue_num, message)
#endif
#endif // marl_deprecated_h
......@@ -27,6 +27,9 @@
namespace marl {
template <typename T>
struct StlAllocator;
// pageSize() returns the size in bytes of a virtual memory page for the host
// system.
size_t pageSize();
......@@ -36,6 +39,19 @@ inline T alignUp(T val, T alignment) {
return alignment * ((val + alignment - 1) / alignment);
}
// aligned_storage() is a replacement for std::aligned_storage that isn't busted
// on older versions of MSVC.
template <size_t SIZE, size_t ALIGNMENT>
struct aligned_storage {
struct alignas(ALIGNMENT) type {
unsigned char data[SIZE];
};
};
///////////////////////////////////////////////////////////////////////////////
// Allocation
///////////////////////////////////////////////////////////////////////////////
// Allocation holds the result of a memory allocation from an Allocator.
struct Allocation {
// Intended usage of the allocation. Used for allocation trackers.
......@@ -45,6 +61,7 @@ struct Allocation {
Create, // Allocator::create(), make_unique(), make_shared()
Vector, // marl::containers::vector<T>
List, // marl::containers::list<T>
Stl, // marl::StlAllocator
Count, // Not intended to be used as a usage type - used for upper bound.
};
......@@ -60,6 +77,10 @@ struct Allocation {
Request request; // Request used for the allocation.
};
///////////////////////////////////////////////////////////////////////////////
// Allocator
///////////////////////////////////////////////////////////////////////////////
// Allocator is an interface to a memory allocator.
// Marl provides a default implementation with Allocator::Default.
class Allocator {
......@@ -183,14 +204,9 @@ std::shared_ptr<T> Allocator::make_shared(ARGS&&... args) {
return std::shared_ptr<T>(reinterpret_cast<T*>(alloc.ptr), Deleter{this});
}
// aligned_storage() is a replacement for std::aligned_storage that isn't busted
// on older versions of MSVC.
template <size_t SIZE, size_t ALIGNMENT>
struct aligned_storage {
struct alignas(ALIGNMENT) type {
unsigned char data[SIZE];
};
};
///////////////////////////////////////////////////////////////////////////////
// TrackedAllocator
///////////////////////////////////////////////////////////////////////////////
// TrackedAllocator wraps an Allocator to track the allocations made.
class TrackedAllocator : public Allocator {
......@@ -280,6 +296,141 @@ void TrackedAllocator::free(const Allocation& allocation) {
return allocator->free(allocation);
}
///////////////////////////////////////////////////////////////////////////////
// StlAllocator
///////////////////////////////////////////////////////////////////////////////
// StlAllocator exposes an STL-compatible allocator wrapping a marl::Allocator.
template <typename T>
struct StlAllocator {
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using reference = T&;
using const_reference = const T&;
using size_type = size_t;
using difference_type = size_t;
// An equivalent STL allocator for a different type.
template <class U>
struct rebind {
typedef StlAllocator<U> other;
};
// Constructs an StlAllocator that will allocate using allocator.
// allocator must remain valid until this StlAllocator has been destroyed.
inline StlAllocator(Allocator* allocator);
template <typename U>
inline StlAllocator(const StlAllocator<U>& other);
// Returns the actual address of x even in presence of overloaded operator&.
inline pointer address(reference x) const;
inline const_pointer address(const_reference x) const;
// Allocates the memory for n objects of type T.
// Does not actually construct the objects.
inline T* allocate(std::size_t n);
// Deallocates the memory for n objects of type T.
inline void deallocate(T* p, std::size_t n);
// Returns the maximum theoretically possible number of T stored in this
// allocator.
inline size_type max_size() const;
// Copy constructs an object of type T at the address p.
inline void construct(pointer p, const_reference val);
// Constructs an object of type U at the address P forwarning all other
// arguments to the constructor.
template <typename U, typename... Args>
inline void construct(U* p, Args&&... args);
// Deconstructs the object at p. It does not free the memory.
inline void destroy(pointer p);
// Deconstructs the object at p. It does not free the memory.
template <typename U>
inline void destroy(U* p);
private:
inline Allocation::Request request(size_t n) const;
template <typename U>
friend struct StlAllocator;
Allocator* allocator;
};
template <typename T>
StlAllocator<T>::StlAllocator(Allocator* allocator) : allocator(allocator) {}
template <typename T>
template <typename U>
StlAllocator<T>::StlAllocator(const StlAllocator<U>& other) {
allocator = other.allocator;
}
template <typename T>
typename StlAllocator<T>::pointer StlAllocator<T>::address(reference x) const {
return &x;
}
template <typename T>
typename StlAllocator<T>::const_pointer StlAllocator<T>::address(
const_reference x) const {
return &x;
}
template <typename T>
T* StlAllocator<T>::allocate(std::size_t n) {
auto alloc = allocator->allocate(request(n));
return reinterpret_cast<T*>(alloc.ptr);
}
template <typename T>
void StlAllocator<T>::deallocate(T* p, std::size_t n) {
Allocation alloc;
alloc.ptr = p;
alloc.request = request(n);
allocator->free(alloc);
}
template <typename T>
typename StlAllocator<T>::size_type StlAllocator<T>::max_size() const {
return std::numeric_limits<size_type>::max() / sizeof(value_type);
}
template <typename T>
void StlAllocator<T>::construct(pointer p, const_reference val) {
new (p) T(val);
}
template <typename T>
template <typename U, typename... Args>
void StlAllocator<T>::construct(U* p, Args&&... args) {
::new ((void*)p) U(std::forward<Args>(args)...);
}
template <typename T>
void StlAllocator<T>::destroy(pointer p) {
((T*)p)->~T();
}
template <typename T>
template <typename U>
void StlAllocator<T>::destroy(U* p) {
p->~U();
}
template <typename T>
Allocation::Request StlAllocator<T>::request(size_t n) const {
Allocation::Request req = {};
req.size = sizeof(T) * n;
req.alignment = alignof(T);
req.usage = Allocation::Usage::Stl;
return req;
}
} // namespace marl
#endif // marl_memory_h
......@@ -363,7 +363,7 @@ class UnboundedPool : public Pool<T> {
Allocator* allocator;
marl::mutex mutex;
std::vector<Item*> items;
containers::vector<Item*, 4> items;
Item* free = nullptr;
};
......@@ -373,7 +373,7 @@ class UnboundedPool : public Pool<T> {
template <typename T, PoolPolicy POLICY>
UnboundedPool<T, POLICY>::Storage::Storage(Allocator* allocator)
: allocator(allocator) {}
: allocator(allocator), items(allocator) {}
template <typename T, PoolPolicy POLICY>
UnboundedPool<T, POLICY>::Storage::~Storage() {
......
......@@ -15,6 +15,7 @@
#ifndef marl_scheduler_h
#define marl_scheduler_h
#include "containers.h"
#include "debug.h"
#include "deprecated.h"
#include "memory.h"
......@@ -26,14 +27,8 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <map>
#include <set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
namespace marl {
......@@ -60,6 +55,7 @@ class Scheduler {
struct WorkerThread {
int count = 0;
ThreadInitializer initializer;
std::shared_ptr<Thread::Affinity::Policy> affinityPolicy;
};
WorkerThread workerThread;
......@@ -74,6 +70,8 @@ class Scheduler {
inline Config& setAllocator(Allocator*);
inline Config& setWorkerThreadCount(int);
inline Config& setWorkerThreadInitializer(const ThreadInitializer&);
inline Config& setWorkerThreadAffinityPolicy(
const std::shared_ptr<Thread::Affinity::Policy>&);
};
// Constructor.
......@@ -103,30 +101,30 @@ class Scheduler {
const Config& config() const;
#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
MARL_DEPRECATED("use Scheduler::Scheduler(const Config&)")
MARL_DEPRECATED(139, "use Scheduler::Scheduler(const Config&)")
Scheduler(Allocator* allocator = Allocator::Default);
// setThreadInitializer() sets the worker thread initializer function which
// will be called for each new worker thread spawned.
// The initializer will only be called on newly created threads (call
// setThreadInitializer() before setWorkerThreadCount()).
MARL_DEPRECATED("use Config::setWorkerThreadInitializer()")
MARL_DEPRECATED(139, "use Config::setWorkerThreadInitializer()")
void setThreadInitializer(const std::function<void()>& init);
// getThreadInitializer() returns the thread initializer function set by
// setThreadInitializer().
MARL_DEPRECATED("use config().workerThread.initializer")
MARL_DEPRECATED(139, "use config().workerThread.initializer")
std::function<void()> getThreadInitializer();
// setWorkerThreadCount() adjusts the number of dedicated worker threads.
// A count of 0 puts the scheduler into single-threaded mode.
// Note: Currently the number of threads cannot be adjusted once tasks
// have been enqueued. This restriction may be lifted at a later time.
MARL_DEPRECATED("use Config::setWorkerThreadCount()")
MARL_DEPRECATED(139, "use Config::setWorkerThreadCount()")
void setWorkerThreadCount(int count);
// getWorkerThreadCount() returns the number of worker threads.
MARL_DEPRECATED("use config().workerThread.count")
MARL_DEPRECATED(139, "use config().workerThread.count")
int getWorkerThreadCount();
#endif // MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
......@@ -288,6 +286,8 @@ class Scheduler {
// WaitingFibers holds all the fibers waiting on a timeout.
struct WaitingFibers {
inline WaitingFibers(Allocator*);
// operator bool() returns true iff there are any wait fibers.
inline operator bool() const;
......@@ -314,15 +314,15 @@ class Scheduler {
Fiber* fiber;
inline bool operator<(const Timeout&) const;
};
std::set<Timeout> timeouts;
std::unordered_map<Fiber*, TimePoint> fibers;
containers::set<Timeout, std::less<Timeout>> timeouts;
containers::unordered_map<Fiber*, TimePoint> fibers;
};
// TODO: Implement a queue that recycles elements to reduce number of
// heap allocations.
using TaskQueue = std::deque<Task>;
using FiberQueue = std::deque<Fiber*>;
using FiberSet = std::unordered_set<Fiber*>;
using TaskQueue = containers::deque<Task>;
using FiberQueue = containers::deque<Fiber*>;
using FiberSet = containers::unordered_set<Fiber*>;
// Workers executes Tasks on a single thread.
// Once a task is started, it may yield to other tasks on the same Worker.
......@@ -434,6 +434,8 @@ class Scheduler {
// Work holds tasks and fibers that are enqueued on the Worker.
struct Work {
inline Work(Allocator*);
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
GUARDED_BY(mutex) uint64_t numBlockedFibers = 0;
GUARDED_BY(mutex) TaskQueue tasks;
......@@ -471,7 +473,7 @@ class Scheduler {
Thread thread;
Work work;
FiberSet idleFibers; // Fibers that have completed which can be reused.
std::vector<Allocator::unique_ptr<Fiber>>
containers::vector<Allocator::unique_ptr<Fiber>, 16>
workerFibers; // All fibers created by this worker.
FastRnd rng;
bool shutdown = false;
......@@ -503,8 +505,11 @@ class Scheduler {
std::array<Worker*, MaxWorkerThreads> workerThreads;
struct SingleThreadedWorkers {
inline SingleThreadedWorkers(Allocator*);
using WorkerByTid =
std::unordered_map<std::thread::id, Allocator::unique_ptr<Worker>>;
containers::unordered_map<std::thread::id,
Allocator::unique_ptr<Worker>>;
marl::mutex mutex;
GUARDED_BY(mutex) std::condition_variable unbind;
GUARDED_BY(mutex) WorkerByTid byTid;
......@@ -531,6 +536,12 @@ Scheduler::Config& Scheduler::Config::setWorkerThreadInitializer(
return *this;
}
Scheduler::Config& Scheduler::Config::setWorkerThreadAffinityPolicy(
const std::shared_ptr<Thread::Affinity::Policy>& policy) {
workerThread.affinityPolicy = policy;
return *this;
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Fiber
////////////////////////////////////////////////////////////////////////////////
......
......@@ -17,26 +17,109 @@
#include <functional>
#include "containers.h"
namespace marl {
// Thread provides an OS abstraction for threads of execution.
// Thread is used by marl instead of std::thread as Windows does not naturally
// scale beyond 64 logical threads on a single CPU, unless you use the Win32
// API.
// Thread alsocontains static methods that abstract OS-specific thread / cpu
// queries and control.
class Thread {
public:
using Func = std::function<void()>;
// Core identifies a logical processor unit.
// How a core is identified varies by platform.
struct Core {
struct Windows {
uint8_t group; // Group number
uint8_t index; // Core within the processor group
};
struct Pthread {
uint16_t index; // Core number
};
union {
Windows windows;
Pthread pthread;
};
// Comparison functions
inline bool operator==(const Core&) const;
inline bool operator<(const Core&) const;
};
// Affinity holds the affinity mask for a thread - a description of what cores
// the thread is allowed to run on.
struct Affinity {
// supported is true if marl supports controlling thread affinity for this
// platform.
#if defined(_WIN32) || defined(__linux__) || defined(__FreeBSD__)
static constexpr bool supported = true;
#else
static constexpr bool supported = false;
#endif
// Policy is an interface that provides a get() method for returning an
// Affinity for the given thread by id.
class Policy {
public:
virtual ~Policy(){};
// anyOf() returns a Policy that returns an Affinity for a number of
// available cores in affinity.
//
// Windows requires that each thread is only associated with a
// single affinity group, so the Policy's returned affinity will contain
// cores all from the same group.
static std::shared_ptr<Policy> anyOf(
Affinity&& affinity,
Allocator* allocator = Allocator::Default);
// oneOf() returns a Policy that returns an affinity with a single enabled
// core from affinity. The single enabled core in the Policy's returned
// affinity is:
// affinity[threadId % affinity.count()]
static std::shared_ptr<Policy> oneOf(
Affinity&& affinity,
Allocator* allocator = Allocator::Default);
// get() returns the thread Affinity for the for the given thread by id.
virtual Affinity get(uint32_t threadId, Allocator* allocator) const = 0;
};
Affinity(Allocator*);
Affinity(Affinity&&);
Affinity(const Affinity&, Allocator* allocator);
// all() returns an Affinity with all the cores available to the process.
static Affinity all(Allocator* allocator = Allocator::Default);
Affinity(std::initializer_list<Core>, Allocator* allocator);
// count() returns the number of enabled cores in the affinity.
size_t count() const;
// operator[] returns the i'th enabled core from this affinity.
Core operator[](size_t index) const;
// add() adds the cores from the given affinity to this affinity.
// This affinity is returned to allow for fluent calls.
Affinity& add(const Affinity&);
// remove() removes the cores from the given affinity from this affinity.
// This affinity is returned to allow for fluent calls.
Affinity& remove(const Affinity&);
private:
Affinity(const Affinity&) = delete;
containers::vector<Core, 32> cores;
};
Thread() = default;
Thread(Thread&&);
Thread& operator=(Thread&&);
// Start a new thread that calls func.
// logicalCpuHint is a hint to run the thread on the specified logical CPU.
// logicalCpuHint may be entirely ignored.
Thread(unsigned int logicalCpuHint, const Func& func);
// Start a new thread using the given affinity that calls func.
Thread(Affinity&& affinity, Func&& func);
~Thread();
......@@ -59,6 +142,18 @@ class Thread {
Impl* impl = nullptr;
};
////////////////////////////////////////////////////////////////////////////////
// Thread::Core
////////////////////////////////////////////////////////////////////////////////
// Comparison functions
bool Thread::Core::operator==(const Core& other) const {
return pthread.index == other.pthread.index;
}
bool Thread::Core::operator<(const Core& other) const {
return pthread.index < other.pthread.index;
}
} // namespace marl
#endif // marl_thread_h
......@@ -13,7 +13,12 @@ if [ "$BUILD_SYSTEM" == "cmake" ]; then
mkdir build
cd build
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_BUILD_BENCHMARKS=1 -DMARL_WARNINGS_AS_ERRORS=1
cmake .. -DMARL_BUILD_EXAMPLES=1 \
-DMARL_BUILD_TESTS=1 \
-DMARL_BUILD_BENCHMARKS=1 \
-DMARL_WARNINGS_AS_ERRORS=1 \
-DMARL_DEBUG_ENABLED=1
make -j$(sysctl -n hw.logicalcpu)
./marl-unittests
......
......@@ -39,7 +39,8 @@ if [ "$BUILD_SYSTEM" == "cmake" ]; then
-DMARL_BUILD_EXAMPLES=1 \
-DMARL_BUILD_TESTS=1 \
-DMARL_BUILD_BENCHMARKS=1 \
-DMARL_WARNINGS_AS_ERRORS=1
-DMARL_WARNINGS_AS_ERRORS=1 \
-DMARL_DEBUG_ENABLED=1
make --jobs=$(nproc)
......
......@@ -20,7 +20,7 @@ cd %SRC%\build
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
IF /I "%BUILD_SYSTEM%"=="cmake" (
cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_BUILD_BENCHMARKS=1" "-DMARL_WARNINGS_AS_ERRORS=1"
cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_BUILD_BENCHMARKS=1" "-DMARL_WARNINGS_AS_ERRORS=1" "-DMARL_DEBUG_ENABLED=1"
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
%MSBUILD% /p:Configuration=%CONFIG% Marl.sln
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
......
......@@ -132,6 +132,21 @@ TEST_F(ContainersVectorTest, CopyConstruct) {
vectorA[1] = "B";
vectorA[2] = "C";
marl::containers::vector<std::string, 4> vectorB(vectorA, allocator);
ASSERT_EQ(vectorB.size(), size_t(3));
ASSERT_EQ(vectorB[0], "A");
ASSERT_EQ(vectorB[1], "B");
ASSERT_EQ(vectorB[2], "C");
}
TEST_F(ContainersVectorTest, CopyConstructDifferentBaseCapacity) {
marl::containers::vector<std::string, 4> vectorA(allocator);
vectorA.resize(3);
vectorA[0] = "A";
vectorA[1] = "B";
vectorA[2] = "C";
marl::containers::vector<std::string, 2> vectorB(vectorA, allocator);
ASSERT_EQ(vectorB.size(), size_t(3));
ASSERT_EQ(vectorB[0], "A");
......@@ -139,6 +154,38 @@ TEST_F(ContainersVectorTest, CopyConstruct) {
ASSERT_EQ(vectorB[2], "C");
}
TEST_F(ContainersVectorTest, CopyAssignment) {
marl::containers::vector<std::string, 4> vectorA(allocator);
vectorA.resize(3);
vectorA[0] = "A";
vectorA[1] = "B";
vectorA[2] = "C";
marl::containers::vector<std::string, 4> vectorB(allocator);
vectorB = vectorA;
ASSERT_EQ(vectorB.size(), size_t(3));
ASSERT_EQ(vectorB[0], "A");
ASSERT_EQ(vectorB[1], "B");
ASSERT_EQ(vectorB[2], "C");
}
TEST_F(ContainersVectorTest, CopyAssignmentDifferentBaseCapacity) {
marl::containers::vector<std::string, 4> vectorA(allocator);
vectorA.resize(3);
vectorA[0] = "A";
vectorA[1] = "B";
vectorA[2] = "C";
marl::containers::vector<std::string, 2> vectorB(allocator);
vectorB = vectorA;
ASSERT_EQ(vectorB.size(), size_t(3));
ASSERT_EQ(vectorB[0], "A");
ASSERT_EQ(vectorB[1], "B");
ASSERT_EQ(vectorB[2], "C");
}
TEST_F(ContainersVectorTest, MoveConstruct) {
marl::containers::vector<std::string, 4> vectorA(allocator);
......
......@@ -14,20 +14,22 @@
#include "marl_bench.h"
#include "marl/containers.h"
#include "marl/event.h"
#include "benchmark/benchmark.h"
#include <vector>
BENCHMARK_DEFINE_F(Schedule, Event)(benchmark::State& state) {
run(state, [&](int numTasks) {
for (auto _ : state) {
std::vector<marl::Event> events(numTasks + 1);
marl::containers::vector<marl::Event, 1> events;
events.resize(numTasks + 1);
for (auto i = 0; i < numTasks; i++) {
marl::Event prev = events[i];
marl::Event next = events[i + 1];
marl::schedule([=] {
events[i].wait();
events[i + 1].signal();
prev.wait();
next.signal();
});
}
events.front().signal();
......@@ -69,4 +71,4 @@ BENCHMARK_DEFINE_F(Schedule, EventBaton)(benchmark::State& state) {
}
});
}
BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<1000000>);
BENCHMARK_REGISTER_F(Schedule, EventBaton)->Apply(Schedule::args<262144>);
......@@ -18,6 +18,8 @@
#include "marl_test.h"
#include <array>
namespace std {
namespace chrono {
template <typename Rep, typename Period>
......@@ -28,9 +30,7 @@ std::ostream& operator<<(std::ostream& os, const duration<Rep, Period>& d) {
} // namespace std
TEST_P(WithBoundScheduler, EventIsSignalled) {
std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
marl::Event::Mode::Auto};
for (auto mode : modes) {
for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) {
auto event = marl::Event(mode);
ASSERT_EQ(event.isSignalled(), false);
event.signal();
......@@ -99,9 +99,7 @@ TEST_P(WithBoundScheduler, EventManualWait) {
}
TEST_P(WithBoundScheduler, EventSequence) {
std::vector<marl::Event::Mode> modes = {marl::Event::Mode::Manual,
marl::Event::Mode::Auto};
for (auto mode : modes) {
for (auto mode : {marl::Event::Mode::Manual, marl::Event::Mode::Auto}) {
std::string sequence;
auto eventA = marl::Event(mode);
auto eventB = marl::Event(mode);
......@@ -216,7 +214,7 @@ TEST_P(WithBoundScheduler, EventWaitStressTest) {
TEST_P(WithBoundScheduler, EventAny) {
for (int i = 0; i < 3; i++) {
std::vector<marl::Event> events = {
std::array<marl::Event, 3> events = {
marl::Event(marl::Event::Mode::Auto),
marl::Event(marl::Event::Mode::Auto),
marl::Event(marl::Event::Mode::Auto),
......
......@@ -14,7 +14,30 @@
#include "marl_bench.h"
BENCHMARK_MAIN();
#include "marl/sanitizers.h"
int main(int argc, char** argv) {
#if ADDRESS_SANITIZER_ENABLED
printf(
"***WARNING*** Marl built with address sanitizer enabled. "
"Timings will be affected\n");
#endif
#if MEMORY_SANITIZER_ENABLED
printf(
"***WARNING*** Marl built with memory sanitizer enabled. "
"Timings will be affected\n");
#endif
#if THREAD_SANITIZER_ENABLED
printf(
"***WARNING*** Marl built with thread sanitizer enabled. "
"Timings will be affected\n");
#endif
::benchmark::Initialize(&argc, argv);
if (::benchmark::ReportUnrecognizedArguments(argc, argv))
return 1;
::benchmark::RunSpecifiedBenchmarks();
return 0;
}
uint32_t Schedule::doSomeWork(uint32_t x) {
uint32_t q = x;
......
......@@ -17,35 +17,66 @@
#include "benchmark/benchmark.h"
// Define MARL_FULL_BENCHMARK to 1 if you want to run benchmarks for every
// available logical CPU core.
#ifndef MARL_FULL_BENCHMARK
#define MARL_FULL_BENCHMARK 0
#endif
class Schedule : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State&) {}
void TearDown(const ::benchmark::State&) {}
// run() creates a scheduler, sets the number of worker threads from the
// benchmark arguments, calls f, then unbinds and destructs the scheduler.
// run() creates a scheduler using the config cfg, sets the number of worker
// threads from the benchmark arguments, calls f, then unbinds and destructs
// the scheduler.
// F must be a function of the signature: void(int numTasks)
template <typename F>
void run(const ::benchmark::State& state, F&& f) {
marl::Scheduler scheduler;
scheduler.setWorkerThreadCount(numThreads(state));
void run(const ::benchmark::State& state,
marl::Scheduler::Config cfg,
F&& f) {
cfg.setWorkerThreadCount(numThreads(state));
marl::Scheduler scheduler(cfg);
scheduler.bind();
f(numTasks(state));
scheduler.unbind();
}
// args() sets up the benchmark to run from [1 .. NumTasks] tasks (in 8^n
// steps) across 0 worker threads to numLogicalCPUs.
// run() creates a scheduler, sets the number of worker threads from the
// benchmark arguments, calls f, then unbinds and destructs the scheduler.
// F must be a function of the signature: void(int numTasks)
template <typename F>
void run(const ::benchmark::State& state, F&& f) {
run(state, marl::Scheduler::Config{}, f);
}
// args() sets up the benchmark to run a number of tasks over a number of
// threads.
// If MARL_FULL_BENCHMARK is enabled, then NumTasks tasks will be run
// across from 0 to numLogicalCPUs worker threads.
// If MARL_FULL_BENCHMARK is not enabled, then NumTasks tasks will be run
// across [0 .. numLogicalCPUs] worker threads in 2^n steps.
template <int NumTasks = 0x40000>
static void args(benchmark::internal::Benchmark* b) {
b->ArgNames({"tasks", "threads"});
for (unsigned int tasks = 1U; tasks <= NumTasks; tasks *= 8) {
for (unsigned int threads = 0U; threads <= marl::Thread::numLogicalCPUs();
++threads) {
b->Args({tasks, threads});
}
b->Args({NumTasks, 0});
auto numLogicalCPUs = marl::Thread::numLogicalCPUs();
#if MARL_FULL_BENCHMARK
for (unsigned int threads = 1U; threads <= numLogicalCPUs; threads++) {
b->Args({NumTasks, threads});
}
#else
for (unsigned int threads = 1U; threads <= numLogicalCPUs; threads *= 2) {
b->Args({NumTasks, threads});
}
if ((numLogicalCPUs & (numLogicalCPUs - 1)) != 0) {
// numLogicalCPUs is not a power-of-two. Also test with numLogicalCPUs.
b->Args({NumTasks, numLogicalCPUs});
}
#endif
}
// numThreads() return the number of threads in the benchmark run from the
......
......@@ -54,9 +54,12 @@ class WithBoundScheduler : public testing::TestWithParam<SchedulerParams> {
auto& params = GetParam();
auto scheduler = new marl::Scheduler(allocator);
marl::Scheduler::Config cfg;
cfg.setAllocator(allocator);
cfg.setWorkerThreadCount(params.numWorkerThreads);
auto scheduler = new marl::Scheduler(cfg);
scheduler->bind();
scheduler->setWorkerThreadCount(params.numWorkerThreads);
}
void TearDown() override {
......
......@@ -19,7 +19,7 @@
#include <cstring>
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
#include <sys/mman.h>
#include <unistd.h>
namespace {
......
......@@ -22,14 +22,11 @@ class AllocatorTest : public testing::Test {
};
TEST_F(AllocatorTest, AlignedAllocate) {
std::vector<bool> guards = {false, true};
std::vector<size_t> sizes = {1, 2, 3, 4, 5, 7, 8, 14, 16, 17,
31, 34, 50, 63, 64, 65, 100, 127, 128, 129,
200, 255, 256, 257, 500, 511, 512, 513};
std::vector<size_t> alignments = {1, 2, 4, 8, 16, 32, 64, 128};
for (auto useGuards : guards) {
for (auto alignment : alignments) {
for (auto size : sizes) {
for (auto useGuards : {false, true}) {
for (auto alignment : {1, 2, 4, 8, 16, 32, 64, 128}) {
for (auto size : {1, 2, 3, 4, 5, 7, 8, 14, 16, 17,
31, 34, 50, 63, 64, 65, 100, 127, 128, 129,
200, 255, 256, 257, 500, 511, 512, 513}) {
marl::Allocation::Request request;
request.alignment = alignment;
request.size = size;
......
......@@ -59,21 +59,6 @@ inline uint64_t threadID() {
}
#endif
template <typename T>
inline T take(std::deque<T>& queue) {
auto out = std::move(queue.front());
queue.pop_front();
return out;
}
template <typename T>
inline T take(std::unordered_set<T>& set) {
auto it = set.begin();
auto out = std::move(*it);
set.erase(it);
return out;
}
inline void nop() {
#if defined(_WIN32)
__nop();
......@@ -127,7 +112,12 @@ void Scheduler::unbind() {
bound = nullptr;
}
Scheduler::Scheduler(const Config& config) : cfg(config), workerThreads{} {
Scheduler::Scheduler(const Config& config)
: cfg(config), workerThreads{}, singleThreadedWorkers(config.allocator) {
if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
cfg.workerThread.affinityPolicy = Thread::Affinity::Policy::anyOf(
Thread::Affinity::all(cfg.allocator), cfg.allocator);
}
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
}
......@@ -162,7 +152,7 @@ Scheduler::~Scheduler() {
#if MARL_ENABLE_DEPRECATED_SCHEDULER_GETTERS_SETTERS
Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
: workerThreads{} {
: workerThreads{}, singleThreadedWorkers(allocator) {
cfg.allocator = allocator;
for (size_t i = 0; i < spinningWorkers.size(); i++) {
spinningWorkers[i] = -1;
......@@ -338,6 +328,9 @@ const char* Scheduler::Fiber::toString(State state) {
////////////////////////////////////////////////////////////////////////////////
// Scheduler::WaitingFibers
////////////////////////////////////////////////////////////////////////////////
Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
: timeouts(allocator), fibers(allocator) {}
Scheduler::WaitingFibers::operator bool() const {
return !fibers.empty();
}
......@@ -399,12 +392,19 @@ bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
: id(id), mode(mode), scheduler(scheduler) {}
: id(id),
mode(mode),
scheduler(scheduler),
work(scheduler->cfg.allocator),
idleFibers(scheduler->cfg.allocator) {}
void Scheduler::Worker::start() {
switch (mode) {
case Mode::MultiThreaded:
thread = Thread(id, [=] {
case Mode::MultiThreaded: {
auto allocator = scheduler->cfg.allocator;
auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
auto affinity = affinityPolicy->get(id, allocator);
thread = Thread(std::move(affinity), [=] {
Thread::setName("Thread<%.2d>", int(id));
if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
......@@ -423,13 +423,13 @@ void Scheduler::Worker::start() {
Worker::current = nullptr;
});
break;
case Mode::SingleThreaded:
}
case Mode::SingleThreaded: {
Worker::current = this;
mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
currentFiber = mainFiber.get();
break;
}
default:
MARL_ASSERT(false, "Unknown mode: %d", int(mode));
}
......@@ -517,12 +517,12 @@ void Scheduler::Worker::suspend(
if (!work.fibers.empty()) {
// There's another fiber that has become unblocked, resume that.
work.num--;
auto to = take(work.fibers);
auto to = containers::take(work.fibers);
ASSERT_FIBER_STATE(to, Fiber::State::Queued);
switchToFiber(to);
} else if (!idleFibers.empty()) {
// There's an old fiber we can reuse, resume that.
auto to = take(idleFibers);
auto to = containers::take(idleFibers);
ASSERT_FIBER_STATE(to, Fiber::State::Idle);
switchToFiber(to);
} else {
......@@ -597,7 +597,7 @@ bool Scheduler::Worker::steal(Task& out) {
return false;
}
work.num--;
out = take(work.tasks);
out = containers::take(work.tasks);
work.mutex.unlock();
return true;
}
......@@ -714,7 +714,7 @@ void Scheduler::Worker::runUntilIdle() {
while (!work.fibers.empty()) {
work.num--;
auto fiber = take(work.fibers);
auto fiber = containers::take(work.fibers);
// Sanity checks,
MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
......@@ -731,7 +731,7 @@ void Scheduler::Worker::runUntilIdle() {
if (!work.tasks.empty()) {
work.num--;
auto task = take(work.tasks);
auto task = containers::take(work.tasks);
work.mutex.unlock();
// Run the task.
......@@ -752,7 +752,7 @@ Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId, FiberStackSize,
[&]() REQUIRES(work.mutex) { run(); });
auto ptr = fiber.get();
workerFibers.push_back(std::move(fiber));
workerFibers.emplace_back(std::move(fiber));
return ptr;
}
......@@ -768,6 +768,9 @@ void Scheduler::Worker::switchToFiber(Fiber* to) {
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker::Work
////////////////////////////////////////////////////////////////////////////////
Scheduler::Worker::Work::Work(Allocator* allocator)
: tasks(allocator), fibers(allocator), waiting(allocator) {}
template <typename F>
void Scheduler::Worker::Work::wait(F&& f) {
notifyAdded = true;
......@@ -779,4 +782,10 @@ void Scheduler::Worker::Work::wait(F&& f) {
notifyAdded = false;
}
////////////////////////////////////////////////////////////////////////////////
// Scheduler::Worker::Work
////////////////////////////////////////////////////////////////////////////////
Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
: byTid(allocator) {}
} // namespace marl
......@@ -46,3 +46,25 @@ BENCHMARK_DEFINE_F(Schedule, SomeWork)
});
}
BENCHMARK_REGISTER_F(Schedule, SomeWork)->Apply(Schedule::args);
BENCHMARK_DEFINE_F(Schedule, SomeWorkWorkerAffinityOneOf)
(benchmark::State& state) {
marl::Scheduler::Config cfg;
cfg.setWorkerThreadAffinityPolicy(
marl::Thread::Affinity::Policy::oneOf(marl::Thread::Affinity::all()));
run(state, cfg, [&](int numTasks) {
for (auto _ : state) {
marl::WaitGroup wg;
wg.add(numTasks);
for (auto i = 0; i < numTasks; i++) {
marl::schedule([=] {
benchmark::DoNotOptimize(doSomeWork(i));
wg.done();
});
}
wg.wait();
}
});
}
BENCHMARK_REGISTER_F(Schedule, SomeWorkWorkerAffinityOneOf)
->Apply(Schedule::args);
......@@ -14,32 +14,38 @@
#include "marl_test.h"
#include "marl/containers.h"
#include "marl/defer.h"
#include "marl/event.h"
#include "marl/waitgroup.h"
#include <atomic>
#include <unordered_set>
TEST_F(WithoutBoundScheduler, SchedulerConstructAndDestruct) {
auto scheduler = new marl::Scheduler();
delete scheduler;
auto scheduler = std::unique_ptr<marl::Scheduler>(
new marl::Scheduler(marl::Scheduler::Config()));
}
TEST_F(WithoutBoundScheduler, SchedulerBindGetUnbind) {
auto scheduler = new marl::Scheduler();
auto scheduler = std::unique_ptr<marl::Scheduler>(
new marl::Scheduler(marl::Scheduler::Config()));
scheduler->bind();
auto got = marl::Scheduler::get();
ASSERT_EQ(scheduler, got);
ASSERT_EQ(scheduler.get(), got);
scheduler->unbind();
got = marl::Scheduler::get();
ASSERT_EQ(got, nullptr);
delete scheduler;
}
TEST_P(WithBoundScheduler, SetAndGetWorkerThreadCount) {
ASSERT_EQ(marl::Scheduler::get()->getWorkerThreadCount(),
GetParam().numWorkerThreads);
TEST_F(WithoutBoundScheduler, CheckConfig) {
marl::Scheduler::Config cfg;
cfg.setAllocator(allocator).setWorkerThreadCount(10);
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
auto gotCfg = scheduler->config();
ASSERT_EQ(gotCfg.allocator, allocator);
ASSERT_EQ(gotCfg.workerThread.count, 10);
}
TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
......@@ -56,7 +62,7 @@ TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
ASSERT_EQ(counter.load(), 1000);
// Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
(new marl::Scheduler())->bind();
(new marl::Scheduler(marl::Scheduler::Config()))->bind();
}
TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
......@@ -85,7 +91,7 @@ TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
ASSERT_EQ(counter.load(), 1000);
// Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
(new marl::Scheduler())->bind();
(new marl::Scheduler(marl::Scheduler::Config()))->bind();
}
TEST_P(WithBoundScheduler, FibersResumeOnSameThread) {
......@@ -114,9 +120,9 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
marl::WaitGroup fence(1);
marl::WaitGroup wg(num_threads);
std::vector<std::thread> threads;
marl::containers::vector<std::thread, 32> threads;
for (int i = 0; i < num_threads; i++) {
threads.push_back(std::thread([=] {
threads.emplace_back(std::thread([=] {
scheduler->bind();
defer(scheduler->unbind());
......@@ -137,12 +143,15 @@ TEST_P(WithBoundScheduler, FibersResumeOnSameStdThread) {
}
TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
marl::Scheduler::Config cfg;
cfg.setWorkerThreadCount(8);
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
scheduler->bind();
defer(scheduler->unbind());
scheduler->setWorkerThreadCount(8);
std::mutex mutex;
std::unordered_set<std::thread::id> threads;
marl::containers::unordered_set<std::thread::id> threads(allocator);
marl::WaitGroup wg;
for (int i = 0; i < 10000; i++) {
wg.add(1);
......@@ -161,8 +170,9 @@ TEST_F(WithoutBoundScheduler, TasksOnlyScheduledOnWorkerThreads) {
// Test that a marl::Scheduler *with dedicated worker threads* can be used
// without first binding to the scheduling thread.
TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
scheduler->setWorkerThreadCount(8);
marl::Scheduler::Config cfg;
cfg.setWorkerThreadCount(8);
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
marl::WaitGroup wg;
for (int i = 0; i < 100; i++) {
......@@ -191,7 +201,8 @@ TEST_F(WithoutBoundScheduler, ScheduleMTWWithNoBind) {
// Test that a marl::Scheduler *without dedicated worker threads* cannot be used
// without first binding to the scheduling thread.
TEST_F(WithoutBoundScheduler, ScheduleSTWWithNoBind) {
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler());
marl::Scheduler::Config cfg;
auto scheduler = std::unique_ptr<marl::Scheduler>(new marl::Scheduler(cfg));
#if MARL_DEBUG_ENABLED && GTEST_HAS_DEATH_TEST
EXPECT_DEATH(scheduler->enqueue(marl::Task([] {})),
......
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "marl_test.h"
#include "marl/thread.h"
namespace {
marl::Thread::Core core(int idx) {
marl::Thread::Core c;
c.pthread.index = static_cast<uint16_t>(idx);
return c;
}
} // anonymous namespace
TEST_F(WithoutBoundScheduler, ThreadAffinityCount) {
auto affinity = marl::Thread::Affinity(
{
core(10),
core(20),
core(30),
core(40),
},
allocator);
EXPECT_EQ(affinity.count(), 4U);
}
TEST_F(WithoutBoundScheduler, ThreadAdd) {
auto affinity = marl::Thread::Affinity(
{
core(10),
core(20),
core(30),
core(40),
},
allocator);
affinity
.add(marl::Thread::Affinity(
{
core(25),
core(15),
},
allocator))
.add(marl::Thread::Affinity({core(35)}, allocator));
EXPECT_EQ(affinity.count(), 7U);
EXPECT_EQ(affinity[0], core(10));
EXPECT_EQ(affinity[1], core(15));
EXPECT_EQ(affinity[2], core(20));
EXPECT_EQ(affinity[3], core(25));
EXPECT_EQ(affinity[4], core(30));
EXPECT_EQ(affinity[5], core(35));
EXPECT_EQ(affinity[6], core(40));
}
TEST_F(WithoutBoundScheduler, ThreadRemove) {
auto affinity = marl::Thread::Affinity(
{
core(10),
core(20),
core(30),
core(40),
},
allocator);
affinity
.remove(marl::Thread::Affinity(
{
core(25),
core(20),
},
allocator))
.remove(marl::Thread::Affinity({core(40)}, allocator));
EXPECT_EQ(affinity.count(), 2U);
EXPECT_EQ(affinity[0], core(10));
EXPECT_EQ(affinity[1], core(30));
}
TEST_F(WithoutBoundScheduler, ThreadAffinityAllCountNonzero) {
auto affinity = marl::Thread::Affinity::all(allocator);
if (marl::Thread::Affinity::supported) {
EXPECT_NE(affinity.count(), 0U);
} else {
EXPECT_EQ(affinity.count(), 0U);
}
}
TEST_F(WithoutBoundScheduler, ThreadAffinityPolicyOneOf) {
auto all = marl::Thread::Affinity(
{
core(10),
core(20),
core(30),
core(40),
},
allocator);
auto policy =
marl::Thread::Affinity::Policy::oneOf(std::move(all), allocator);
EXPECT_EQ(policy->get(0, allocator).count(), 1U);
EXPECT_EQ(policy->get(0, allocator)[0].pthread.index, 10);
EXPECT_EQ(policy->get(1, allocator).count(), 1U);
EXPECT_EQ(policy->get(1, allocator)[0].pthread.index, 20);
EXPECT_EQ(policy->get(2, allocator).count(), 1U);
EXPECT_EQ(policy->get(2, allocator)[0].pthread.index, 30);
EXPECT_EQ(policy->get(3, allocator).count(), 1U);
EXPECT_EQ(policy->get(3, allocator)[0].pthread.index, 40);
}
......@@ -28,7 +28,6 @@
#include <atomic>
#include <fstream>
#include <unordered_set>
namespace {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment