Commit 707868ae by Etienne Bergeron Committed by Commit Bot

Implement a WorkerPool delegate to execute background task (1/3)

This CL is adding a WorkerPool delegate to allow an embedder to post task on a custom thread pool. The target for this code is Chromium. The plan is to post tasks into the Chromium ThreadPool. Related CLs: 1) [this] 2) https://chromium-review.googlesource.com/c/chromium/src/+/2231864 3) https://chromium-review.googlesource.com/c/angle/angle/+/2231710 Bug: chromium:1091259 Change-Id: Ib990b06d4672b6f859d04b97ac4311a7a80ef7a9 Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/2231708 Commit-Queue: Jamie Madill <jmadill@chromium.org> Reviewed-by: 's avatarShahbaz Youssefi <syoussefi@chromium.org> Reviewed-by: 's avatarJamie Madill <jmadill@chromium.org>
parent 37e6ede6
......@@ -247,6 +247,17 @@ inline void DefaultCacheProgram(PlatformMethods *platform,
const uint8_t *programBytes)
{}
using PostWorkerTaskCallback = void (*)(void *userData);
using PostWorkerTaskFunc = void (*)(PlatformMethods *platform,
PostWorkerTaskCallback callback,
void *userData);
inline void DefaultPostWorkerTask(PlatformMethods *platform,
PostWorkerTaskCallback callback,
void *userData)
{
callback(userData);
}
// Platform methods are enumerated here once.
#define ANGLE_PLATFORM_OP(OP) \
OP(currentTime, CurrentTime) \
......@@ -264,7 +275,8 @@ inline void DefaultCacheProgram(PlatformMethods *platform,
OP(overrideWorkaroundsD3D, OverrideWorkaroundsD3D) \
OP(overrideFeaturesVk, OverrideFeaturesVk) \
OP(cacheProgram, CacheProgram) \
OP(overrideFeaturesMtl, OverrideFeaturesMtl)
OP(overrideFeaturesMtl, OverrideFeaturesMtl) \
OP(postWorkerTask, PostWorkerTask)
#define ANGLE_PLATFORM_METHOD_DEF(Name, CapsName) CapsName##Func Name = Default##CapsName;
......
......@@ -213,21 +213,127 @@ void AsyncWorkerPool::checkToRunPendingTasks()
}
#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
class DelegateWaitableEvent final : public WaitableEvent
{
public:
DelegateWaitableEvent() = default;
~DelegateWaitableEvent() override = default;
void wait() override;
bool isReady() override;
void markAsReady();
private:
// To protect the concurrent accesses from both main thread and background
// threads to the member fields.
std::mutex mMutex;
bool mIsReady = false;
std::condition_variable mCondition;
};
void DelegateWaitableEvent::markAsReady()
{
std::lock_guard<std::mutex> lock(mMutex);
mIsReady = true;
mCondition.notify_all();
}
void DelegateWaitableEvent::wait()
{
std::unique_lock<std::mutex> lock(mMutex);
mCondition.wait(lock, [this] { return mIsReady; });
}
bool DelegateWaitableEvent::isReady()
{
std::lock_guard<std::mutex> lock(mMutex);
return mIsReady;
}
class DelegateWorkerPool final : public WorkerThreadPool
{
public:
DelegateWorkerPool() = default;
~DelegateWorkerPool() override = default;
std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
void setMaxThreads(size_t maxThreads) override;
bool isAsync() override;
};
// A function wrapper to execute the closure and to notify the waitable
// event after the execution.
class DelegateWorkerTask
{
public:
DelegateWorkerTask(std::shared_ptr<Closure> task,
std::shared_ptr<DelegateWaitableEvent> waitable)
: mTask(task), mWaitable(waitable)
{}
DelegateWorkerTask() = delete;
DelegateWorkerTask(DelegateWorkerTask &) = delete;
static void RunTask(void *userData)
{
DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
(*workerTask->mTask)();
workerTask->mWaitable->markAsReady();
// Delete the task after its execution.
delete workerTask;
}
private:
~DelegateWorkerTask() = default;
std::shared_ptr<Closure> mTask;
std::shared_ptr<DelegateWaitableEvent> mWaitable;
};
std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
{
auto waitable = std::make_shared<DelegateWaitableEvent>();
// The task will be deleted bu RunTask(...) its execution.
DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
auto *platform = ANGLEPlatformCurrent();
platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
return waitable;
}
void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
bool DelegateWorkerPool::isAsync()
{
return true;
}
#endif
// static
std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
{
std::shared_ptr<WorkerThreadPool> pool(nullptr);
#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
#if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
if (multithreaded)
{
pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
}
#elif (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
if (multithreaded)
{
pool = std::shared_ptr<WorkerThreadPool>(static_cast<WorkerThreadPool *>(
new AsyncWorkerPool(std::thread::hardware_concurrency())));
pool = std::shared_ptr<WorkerThreadPool>(
new AsyncWorkerPool(std::thread::hardware_concurrency()));
}
#endif
if (!pool)
{
return std::shared_ptr<WorkerThreadPool>(
static_cast<WorkerThreadPool *>(new SingleThreadedWorkerPool()));
return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
}
return pool;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment