Commit 3f46e493 by Nicolas Capens

Update Marl to 12872a0df

12872a0df Comments: Improve & make style consistent 5bb61cf6a Scheduler: Ensure all scheduled tasks are finished when the scheduler is destructed. b5e3e1e00 src\scheduler_test.cpp: Reduce max number of in-flight tasks. d13286050 src\osfiber_windows.h: Assert that the Win32 fiber calls succeeded. 08da5139c .gitignore - Add Visual Studio generated files. 8007ea5de Temporarily disable DestructWithPendingFibers test. 5390897be Fix alignment of Pool items. 89425dcbc CMakeLists: Bump MSVC warnings to level 4 6df4597c3 Kokoro: Shuffle presubmit file layouts. e3b3c7df4 Add include/marl/memory.h for aligned memory allocations. 60598ef45 Fix TSAN issue with BlockingCall test. be3628456 CMakeLists: Add TSAN build flag Commands: git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash Bug: b/141380274 Change-Id: Ic33110f917737d0d84d882f63a4e1c0a87a16c33
parents e3a59837 07ed7cf1
/.vs/
/.vscode/
/build/
/out/
bazel-*
CMakeSettings.json
\ No newline at end of file
......@@ -25,13 +25,10 @@ option(MARL_WARNINGS_AS_ERRORS "Treat warnings as errors" OFF)
option(MARL_BUILD_EXAMPLES "Build example applications" OFF)
option(MARL_BUILD_TESTS "Build tests" OFF)
option(MARL_ASAN "Build marl with address sanitizer" OFF)
option(MARL_MSAN "Build marl with memory sanitizer" OFF)
option(MARL_TSAN "Build marl with thread sanitizer" OFF)
option(MARL_INSTALL "Create marl install target" OFF)
if(MARL_ASAN AND MARL_TSAN)
message(FATAL_ERROR "MARL_ASAN and MARL_TSAN are mutually exclusive")
endif(MARL_ASAN AND MARL_TSAN)
###########################################################
# Directories
###########################################################
......@@ -59,19 +56,19 @@ set(MARL_LIST
${MARL_SRC_DIR}/scheduler.cpp
${MARL_SRC_DIR}/thread.cpp
${MARL_SRC_DIR}/trace.cpp
${MARL_SRC_DIR}/osfiber_aarch64.c
${MARL_SRC_DIR}/osfiber_arm.c
${MARL_SRC_DIR}/osfiber_ppc64.c
${MARL_SRC_DIR}/osfiber_x64.c
${MARL_SRC_DIR}/osfiber_x86.c
)
if(NOT MSVC)
list(APPEND MARL_LIST
${MARL_SRC_DIR}/osfiber_aarch64.c
${MARL_SRC_DIR}/osfiber_arm.c
${MARL_SRC_DIR}/osfiber_asm_aarch64.S
${MARL_SRC_DIR}/osfiber_asm_arm.S
${MARL_SRC_DIR}/osfiber_asm_ppc64.S
${MARL_SRC_DIR}/osfiber_asm_x64.S
${MARL_SRC_DIR}/osfiber_asm_x86.S
${MARL_SRC_DIR}/osfiber_ppc64.c
${MARL_SRC_DIR}/osfiber_x64.c
${MARL_SRC_DIR}/osfiber_x86.c
)
endif(NOT MSVC)
......@@ -92,7 +89,12 @@ endif()
function(marl_set_target_options target)
# Enable all warnings
if(NOT MSVC)
if(MSVC)
target_compile_options(${target} PRIVATE
"-W4"
"/wd4127" # conditional expression is constant
)
else()
target_compile_options(${target} PRIVATE "-Wall")
endif()
......@@ -116,6 +118,9 @@ function(marl_set_target_options target)
elseif(MARL_MSAN)
target_compile_options(${target} PUBLIC "-fsanitize=memory")
target_link_libraries(${target} "-fsanitize=memory")
elseif(MARL_TSAN)
target_compile_options(${target} PUBLIC "-fsanitize=thread")
target_link_libraries(${target} "-fsanitize=thread")
endif()
target_include_directories(${target} PRIVATE ${MARL_INCLUDE_DIR})
......@@ -168,6 +173,7 @@ if(MARL_BUILD_TESTS)
${MARL_SRC_DIR}/defer_test.cpp
${MARL_SRC_DIR}/marl_test.cpp
${MARL_SRC_DIR}/marl_test.h
${MARL_SRC_DIR}/memory_test.cpp
${MARL_SRC_DIR}/osfiber_test.cpp
${MARL_SRC_DIR}/pool_test.cpp
${MARL_SRC_DIR}/scheduler_test.cpp
......
......@@ -147,6 +147,9 @@ constexpr float cx = -0.8f;
constexpr float cy = 0.156f;
int main(int argc, const char** argv) {
(void)argc; // unused parameter
(void)argv; // unused parameter
// Create a marl scheduler using the full number of logical cpus.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
......
......@@ -40,6 +40,9 @@ bool isPrime(int i) {
}
int main(int argc, const char** argv) {
(void)argc; // unused parameter
(void)argv; // unused parameter
// Create a marl scheduler using the full number of logical cpus.
// Bind this scheduler to the main thread so we can call marl::schedule()
marl::Scheduler scheduler;
......
......@@ -33,13 +33,14 @@ namespace marl {
// thread will work on other tasks until the ConditionVariable is unblocked.
class ConditionVariable {
public:
// Notifies and potentially unblocks one waiting fiber or thread.
// notify_one() notifies and potentially unblocks one waiting fiber or thread.
inline void notify_one();
// Notifies and potentially unblocks all waiting fibers and/or threads.
// notify_all() notifies and potentially unblocks all waiting fibers and/or
// threads.
inline void notify_all();
// Blocks the current fiber or thread until the predicate is satisfied
// wait() blocks the current fiber or thread until the predicate is satisfied
// and the ConditionVariable is notified.
template <typename Predicate>
inline void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
......
......@@ -16,10 +16,10 @@
#define marl_containers_h
#include "debug.h"
#include "memory.h" // aligned_storage
#include <algorithm> // std::max
#include <type_traits> // std::aligned_storage
#include <utility> // std::move
#include <algorithm> // std::max
#include <utility> // std::move
#include <cstddef> // size_t
......@@ -69,7 +69,7 @@ class vector {
inline void reserve(size_t n);
private:
using TStorage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
using TStorage = typename marl::aligned_storage<sizeof(T), alignof(T)>::type;
inline void free();
......
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef marl_memory_h
#define marl_memory_h
#include "debug.h"
#include <stdint.h>
#include <cstdlib>
#include <memory>
#include <utility> // std::forward
namespace marl {
template <typename T>
inline T alignUp(T val, T alignment) {
return alignment * ((val + alignment - 1) / alignment);
}
// aligned_malloc() allocates size bytes of uninitialized storage with the
// specified minimum byte alignment. The pointer returned must be freed with
// aligned_free().
inline void* aligned_malloc(size_t alignment, size_t size) {
MARL_ASSERT(alignment < 256, "alignment must less than 256");
auto allocation = new uint8_t[size + sizeof(uint8_t) + alignment];
auto aligned = allocation;
aligned += sizeof(uint8_t); // Make space for the base-address offset.
aligned = reinterpret_cast<uint8_t*>(
alignUp(reinterpret_cast<uintptr_t>(aligned), alignment)); // align
auto offset = static_cast<uint8_t>(aligned - allocation);
aligned[-1] = offset;
return aligned;
}
// aligned_free() frees memory allocated by aligned_malloc.
inline void aligned_free(void* ptr) {
auto aligned = reinterpret_cast<uint8_t*>(ptr);
auto offset = aligned[-1];
auto allocation = aligned - offset;
delete[] allocation;
}
// aligned_new() allocates and constructs an object of type T, respecting the
// alignment of the type.
// The pointer returned by aligned_new() must be deleted with aligned_delete().
template <typename T, typename... ARGS>
T* aligned_new(ARGS&&... args) {
auto ptr = aligned_malloc(alignof(T), sizeof(T));
new (ptr) T(std::forward<ARGS>(args)...);
return reinterpret_cast<T*>(ptr);
}
// aligned_delete() destructs and frees the object allocated with aligned_new().
template <typename T>
void aligned_delete(T* object) {
object->~T();
aligned_free(object);
}
// make_aligned_shared() returns a new object wrapped in a std::shared_ptr that
// respects the alignemnt of the type.
template <typename T, typename... ARGS>
inline std::shared_ptr<T> make_aligned_shared(ARGS&&... args) {
auto ptr = aligned_new<T>(std::forward<ARGS>(args)...);
return std::shared_ptr<T>(ptr, aligned_delete<T>);
}
// aligned_storage() is a replacement for std::aligned_storage that isn't busted
// on older versions of MSVC.
template <size_t SIZE, size_t ALIGNMENT>
struct aligned_storage {
struct alignas(ALIGNMENT) type {
unsigned char data[SIZE];
};
};
} // namespace marl
#endif // marl_memory_h
......@@ -16,6 +16,7 @@
#define marl_pool_h
#include "conditionvariable.h"
#include "memory.h"
#include <atomic>
#include <mutex>
......@@ -90,7 +91,7 @@ class Pool {
// destruct() calls the destructor on the item's data.
inline void destruct();
using Data = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
using Data = typename aligned_storage<sizeof(T), alignof(T)>::type;
Data data;
std::atomic<int> refcount = {0};
Item* next = nullptr; // pointer to the next free item in the pool.
......@@ -233,12 +234,12 @@ class BoundedPool : public Pool<T> {
inline ~Storage();
inline void return_(Item*) override;
Item items[N];
std::mutex mutex;
ConditionVariable returned;
Item items[N];
Item* free = nullptr;
};
std::shared_ptr<Storage> storage = std::make_shared<Storage>();
std::shared_ptr<Storage> storage = make_aligned_shared<Storage>();
};
template <typename T, int N, PoolPolicy POLICY>
......@@ -359,7 +360,7 @@ UnboundedPool<T, POLICY>::Storage::~Storage() {
if (POLICY == PoolPolicy::Preserve) {
item->destruct();
}
delete item;
aligned_delete(item);
}
}
......@@ -377,8 +378,8 @@ inline void UnboundedPool<T, POLICY>::borrow(size_t n, const F& f) const {
for (size_t i = 0; i < n; i++) {
if (storage->free == nullptr) {
auto count = std::max<size_t>(storage->items.size(), 32);
for (size_t i = 0; i < count; i++) {
auto item = new Item();
for (size_t j = 0; j < count; j++) {
auto item = aligned_new<Item>();
if (POLICY == PoolPolicy::Preserve) {
item->construct();
}
......
......@@ -232,6 +232,12 @@ class Scheduler {
// frequently putting the thread to sleep and re-waking.
void spinForWork();
// numBlockedFibers() returns the number of fibers currently blocked and
// held externally.
_Requires_lock_held_(lock) inline size_t numBlockedFibers() const {
return workerFibers.size() - idleFibers.size();
}
// Work holds tasks and fibers that are enqueued on the Worker.
struct Work {
std::atomic<uint64_t> num = {0}; // tasks.size() + fibers.size()
......
......@@ -30,15 +30,15 @@ namespace marl {
// order in which they are called.
//
// The first ticket to be taken from a queue will be in the 'called' state,
// others will be in the 'waiting' state until the previous ticket has finished.
// subsequent tickets will be in the 'waiting' state.
//
// Ticket::wait() will block until the ticket is called.
// Ticket::done() sets the ticket into the 'finished' state and calls the next
// taken ticket from the queue.
//
// If a ticket is taken from a queue and does not have done() called before
// its last reference is dropped, it will implicitly call done(), calling the
// next ticket.
// Ticket::done() moves the ticket into the 'finished' state. If all preceeding
// tickets are finished, done() will call the next unfinished ticket.
//
// If the last remaining reference to an unfinished ticket is dropped then
// done() will be automatically called on that ticket.
//
// Example:
//
......
......@@ -7,3 +7,8 @@ env_vars {
key: "BUILD_SYSTEM"
value: "bazel"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
......@@ -7,3 +7,8 @@ env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/linux/presubmit.sh"
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "bazel"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
\ No newline at end of file
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
env_vars {
key: "BUILD_SANITIZER"
value: "asan"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/linux/presubmit.sh"
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/ubuntu/presubmit.sh"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
env_vars {
key: "BUILD_SANITIZER"
value: "tsan"
}
......@@ -22,9 +22,15 @@ if [ "$BUILD_SYSTEM" == "cmake" ]; then
./primes > /dev/null
}
build_and_run ""
build_and_run "-DMARL_ASAN=1"
build_and_run "-DMARL_MSAN=1"
if [ "$BUILD_SANITIZER" == "asan" ]; then
build_and_run "-DMARL_ASAN=1"
elif [ "$BUILD_SANITIZER" == "msan" ]; then
build_and_run "-DMARL_MSAN=1"
elif [ "$BUILD_SANITIZER" == "tsan" ]; then
build_and_run "-DMARL_TSAN=1"
else
build_and_run
fi
elif [ "$BUILD_SYSTEM" == "bazel" ]; then
# Get bazel
curl -L -k -O -s https://github.com/bazelbuild/bazel/releases/download/0.29.1/bazel-0.29.1-installer-linux-x86_64.sh
......
......@@ -7,3 +7,8 @@ env_vars {
key: "BUILD_SYSTEM"
value: "bazel"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
......@@ -7,3 +7,13 @@ env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_GENERATOR"
value: "Visual Studio 15 2017 Win64"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x64"
}
# Format: //devtools/kokoro/config/proto/build.proto
# Location of the continuous bash script in Git.
build_file: "marl/kokoro/windows/presubmit.bat"
env_vars {
key: "BUILD_SYSTEM"
value: "cmake"
}
env_vars {
key: "BUILD_GENERATOR"
value: "Visual Studio 15 2017"
}
env_vars {
key: "BUILD_TARGET_ARCH"
value: "x86"
}
......@@ -20,7 +20,7 @@ cd %SRC%\build
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
IF /I "%BUILD_SYSTEM%"=="cmake" (
cmake .. -G "Visual Studio 15 2017 Win64" -Thost=x64 "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_WARNINGS_AS_ERRORS=1"
cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_WARNINGS_AS_ERRORS=1"
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
%MSBUILD% /p:Configuration=%CONFIG% Marl.sln
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
......
......@@ -35,6 +35,6 @@ TEST_P(WithBoundScheduler, BlockingCall) {
});
}
marl::schedule([=] { mutex->unlock(); });
mutex->unlock();
wg.wait();
}
......@@ -40,6 +40,7 @@ void warn(const char* msg, ...) {
}
void assert_has_bound_scheduler(const char* feature) {
(void)feature; // unreferenced parameter
MARL_ASSERT(Scheduler::get() != nullptr,
"%s requires a marl::Scheduler to be bound", feature);
}
......
// Copyright 2019 The Marl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "marl/memory.h"
#include "marl_test.h"
class MemoryTest : public testing::Test {};
TEST(MemoryTest, AlignedMalloc) {
std::vector<size_t> sizes = {1, 2, 3, 4, 5, 7, 8, 14, 16, 17,
31, 34, 50, 63, 64, 65, 100, 127, 128, 129,
200, 255, 256, 257, 500, 511, 512, 513};
std::vector<size_t> alignments = {1, 2, 4, 8, 16, 32, 64, 128};
for (auto alignment : alignments) {
for (auto size : sizes) {
auto ptr = marl::aligned_malloc(alignment, size);
ASSERT_EQ(reinterpret_cast<uintptr_t>(ptr) & (alignment - 1), 0U);
memset(ptr, 0, size); // Check the memory was actually allocated.
marl::aligned_free(ptr);
}
}
}
struct alignas(16) StructWith16ByteAlignment {
uint8_t i;
uint8_t padding[15];
};
struct alignas(32) StructWith32ByteAlignment {
uint8_t i;
uint8_t padding[31];
};
struct alignas(64) StructWith64ByteAlignment {
uint8_t i;
uint8_t padding[63];
};
TEST(MemoryTest, AlignedNew) {
auto s16 = marl::aligned_new<StructWith16ByteAlignment>();
auto s32 = marl::aligned_new<StructWith32ByteAlignment>();
auto s64 = marl::aligned_new<StructWith64ByteAlignment>();
ASSERT_EQ(alignof(StructWith16ByteAlignment), 16U);
ASSERT_EQ(alignof(StructWith32ByteAlignment), 32U);
ASSERT_EQ(alignof(StructWith64ByteAlignment), 64U);
ASSERT_EQ(reinterpret_cast<uintptr_t>(s16) & 15U, 0U);
ASSERT_EQ(reinterpret_cast<uintptr_t>(s32) & 31U, 0U);
ASSERT_EQ(reinterpret_cast<uintptr_t>(s64) & 63U, 0U);
marl::aligned_delete(s64);
marl::aligned_delete(s32);
marl::aligned_delete(s16);
}
\ No newline at end of file
......@@ -19,8 +19,8 @@
// assembly implementations *do not* save or restore signal masks,
// floating-point control or status registers, FS and GS segment registers,
// thread-local storage state nor any SIMD registers. This should not be a
// problem as the marl scheduler requires fibers to be executed on a single
// thread.
// problem as the marl scheduler requires fibers to be executed on the same
// thread throughout their lifetime.
#if defined(__x86_64__)
#include "osfiber_asm_x64.h"
......
......@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "marl/debug.h"
#include <functional>
#include <memory>
......@@ -58,6 +60,9 @@ OSFiber* OSFiber::createFiberFromCurrentThread() {
auto out = new OSFiber();
out->fiber = ConvertThreadToFiber(nullptr);
out->isFiberFromThread = true;
MARL_ASSERT(out->fiber != nullptr,
"ConvertThreadToFiber() failed with error 0x%x",
int(GetLastError()));
return out;
}
......@@ -66,11 +71,13 @@ OSFiber* OSFiber::createFiber(size_t stackSize,
auto out = new OSFiber();
out->fiber = CreateFiber(stackSize, &OSFiber::run, out);
out->target = func;
MARL_ASSERT(out->fiber != nullptr, "CreateFiber() failed with error 0x%x",
int(GetLastError()));
return out;
}
void OSFiber::switchTo(OSFiber* fiber) {
SwitchToFiber(fiber->fiber);
void OSFiber::switchTo(OSFiber* to) {
SwitchToFiber(to->fiber);
}
void WINAPI OSFiber::run(void* self) {
......
......@@ -14,6 +14,7 @@
#include "marl_test.h"
#include "marl/memory.h"
#include "marl/pool.h"
#include "marl/waitgroup.h"
......@@ -168,3 +169,28 @@ TEST_P(WithBoundScheduler, BoundedPool_PolicyPreserve) {
}
ASSERT_EQ(CtorDtorCounter::ctor_count, CtorDtorCounter::dtor_count);
}
struct alignas(64) StructWithAlignment {
uint8_t i;
uint8_t padding[63];
};
TEST_P(WithBoundScheduler, BoundedPool_AlignedTypes) {
marl::BoundedPool<StructWithAlignment, 100> pool;
for (int i = 0; i < 100; i++) {
auto loan = pool.borrow();
ASSERT_EQ(reinterpret_cast<uintptr_t>(&loan->i) &
(alignof(StructWithAlignment) - 1),
0U);
}
}
TEST_P(WithBoundScheduler, UnboundedPool_AlignedTypes) {
marl::UnboundedPool<StructWithAlignment> pool;
for (int i = 0; i < 100; i++) {
auto loan = pool.borrow();
ASSERT_EQ(reinterpret_cast<uintptr_t>(&loan->i) &
(alignof(StructWithAlignment) - 1),
0U);
}
}
......@@ -294,6 +294,7 @@ void Scheduler::Worker::stop() {
}
void Scheduler::Worker::yield(Fiber* from) {
(void)from; // unreferenced parameter
MARL_ASSERT(currentFiber == from,
"Attempting to call yield from a non-current fiber");
......@@ -382,7 +383,7 @@ void Scheduler::Worker::run() {
{
std::unique_lock<std::mutex> lock(work.mutex);
work.added.wait(lock, [this] { return work.num > 0 || shutdown; });
while (!shutdown) {
while (!shutdown || work.num > 0 || numBlockedFibers() > 0U) {
waitForWork(lock);
runUntilIdle(lock);
}
......@@ -414,7 +415,9 @@ _Requires_lock_held_(lock) void Scheduler::Worker::waitForWork(
spinForWork();
lock.lock();
}
work.added.wait(lock, [this] { return work.num > 0 || shutdown; });
work.added.wait(lock, [this] {
return work.num > 0 || (shutdown && numBlockedFibers() == 0U);
});
}
void Scheduler::Worker::spinForWork() {
......@@ -484,8 +487,8 @@ _Requires_lock_held_(lock) void Scheduler::Worker::runUntilIdle(
}
Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
auto id = static_cast<uint32_t>(workerFibers.size() + 1);
auto fiber = Fiber::create(id, FiberStackSize, [&] { run(); });
auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
auto fiber = Fiber::create(fiberId, FiberStackSize, [&] { run(); });
workerFibers.push_back(std::unique_ptr<Fiber>(fiber));
return fiber;
}
......
......@@ -17,6 +17,7 @@
#include "marl/defer.h"
#include "marl/waitgroup.h"
#include <atomic>
#include <unordered_set>
TEST(WithoutBoundScheduler, SchedulerConstructAndDestruct) {
......@@ -41,22 +42,47 @@ TEST_P(WithBoundScheduler, SetAndGetWorkerThreadCount) {
}
TEST_P(WithBoundScheduler, DestructWithPendingTasks) {
for (int i = 0; i < 10000; i++) {
marl::schedule([] {});
std::atomic<int> counter = {0};
for (int i = 0; i < 1000; i++) {
marl::schedule([&] { counter++; });
}
auto scheduler = marl::Scheduler::get();
scheduler->unbind();
delete scheduler;
// All scheduled tasks should be completed before the scheduler is destructed.
ASSERT_EQ(counter.load(), 1000);
// Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
(new marl::Scheduler())->bind();
}
TEST_P(WithBoundScheduler, DestructWithPendingFibers) {
std::atomic<int> counter = {0};
marl::WaitGroup wg(1);
for (int i = 0; i < 10000; i++) {
marl::schedule([=] { wg.wait(); });
for (int i = 0; i < 1000; i++) {
marl::schedule([&] {
wg.wait();
counter++;
});
}
wg.done();
// Schedule a task to unblock all the tasks scheduled above.
// We assume that some of these tasks will not finish before the scheduler
// destruction logic kicks in.
marl::schedule([=] {
wg.done(); // Ready, steady, go...
});
auto scheduler = marl::Scheduler::get();
scheduler->unbind();
delete scheduler;
// All scheduled tasks should be completed before the scheduler is destructed.
ASSERT_EQ(counter.load(), 1000);
// Rebind a new scheduler so WithBoundScheduler::TearDown() is happy.
(new marl::Scheduler())->bind();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment