Commit ce7bdd0b by Jamie Madill Committed by Commit Bot

Vulkan: Merge CommandQueue and TaskProcessor.

Bug: b/172704839 Change-Id: I43a40e6a3e1eb00a7ddebfba6e915437aa69aeb6 Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/2525141 Commit-Queue: Jamie Madill <jmadill@chromium.org> Reviewed-by: 's avatarTim Van Patten <timvp@google.com> Reviewed-by: 's avatarCourtney Goeltzenleuchter <courtneygo@google.com>
parent 0c3d0b21
...@@ -337,12 +337,11 @@ struct FeaturesVk : FeatureSetBase ...@@ -337,12 +337,11 @@ struct FeaturesVk : FeatureSetBase
// Tell Vulkan back-end to use CommandProcessor class to dispatch work to the GPU. The work will // Tell Vulkan back-end to use CommandProcessor class to dispatch work to the GPU. The work will
// happen asynchronously in a different thread if asynchronousCommandProcessing is true. // happen asynchronously in a different thread if asynchronousCommandProcessing is true.
// Otherwise use Renderer::CommandQueue to dispatch work. // Otherwise use Renderer::CommandQueue to dispatch work.
// TODO(jmadill): Merge these two features. b/172704839
Feature commandProcessor = {"commandProcessor", FeatureCategory::VulkanFeatures, Feature commandProcessor = {"commandProcessor", FeatureCategory::VulkanFeatures,
"Use CommandProcessor class to dispatch work to GPU.", &members, "Use CommandProcessor class to dispatch work to GPU.", &members,
"http://anglebug.com/4324"}; "http://anglebug.com/4324"};
// Enable parallel thread execution when commandProcessor is enabled.
// Currently off by default.
Feature asynchronousCommandProcessing = {"asynchronousCommandProcessing", Feature asynchronousCommandProcessing = {"asynchronousCommandProcessing",
FeatureCategory::VulkanFeatures, FeatureCategory::VulkanFeatures,
"Enable/Disable parallel processing of worker thread", "Enable/Disable parallel processing of worker thread",
......
...@@ -249,312 +249,7 @@ void CommandBatch::destroy(VkDevice device) ...@@ -249,312 +249,7 @@ void CommandBatch::destroy(VkDevice device)
fence.reset(device); fence.reset(device);
} }
// TaskProcessor implementation. // CommandProcessor implementation.
TaskProcessor::TaskProcessor() : mCurrentQueueSerial(mQueueSerialFactory.generate()) {}
TaskProcessor::~TaskProcessor() = default;
void TaskProcessor::destroy(VkDevice device)
{
mPrimaryCommandBuffer.destroy(device);
mPrimaryCommandPool.destroy(device);
ASSERT(mInFlightCommands.empty() && mGarbageQueue.empty());
}
angle::Result TaskProcessor::init(Context *context, std::thread::id threadId)
{
mThreadId = threadId;
// Initialize the command pool now that we know the queue family index.
ANGLE_TRY(mPrimaryCommandPool.init(context, context->getRenderer()->getQueueFamilyIndex()));
return angle::Result::Continue;
}
angle::Result TaskProcessor::checkCompletedCommands(Context *context)
{
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::checkCompletedCommands");
VkDevice device = context->getDevice();
RendererVk *rendererVk = context->getRenderer();
int finishedCount = 0;
for (CommandBatch &batch : mInFlightCommands)
{
VkResult result = batch.fence.get().getStatus(device);
if (result == VK_NOT_READY)
{
break;
}
ANGLE_VK_TRY(context, result);
mLastCompletedQueueSerial = batch.serial;
rendererVk->resetSharedFence(&batch.fence);
ANGLE_TRACE_EVENT0("gpu.angle", "command buffer recycling");
batch.commandPool.destroy(device);
ANGLE_TRY(mPrimaryCommandPool.collect(context, std::move(batch.primaryCommands)));
++finishedCount;
}
if (finishedCount > 0)
{
auto beginIter = mInFlightCommands.begin();
mInFlightCommands.erase(beginIter, beginIter + finishedCount);
}
size_t freeIndex = 0;
for (; freeIndex < mGarbageQueue.size(); ++freeIndex)
{
GarbageAndSerial &garbageList = mGarbageQueue[freeIndex];
if (garbageList.getSerial() <= mLastCompletedQueueSerial)
{
for (GarbageObject &garbage : garbageList.get())
{
garbage.destroy(rendererVk);
}
}
else
{
break;
}
}
// Remove the entries from the garbage list - they should be ready to go.
if (freeIndex > 0)
{
mGarbageQueue.erase(mGarbageQueue.begin(), mGarbageQueue.begin() + freeIndex);
}
return angle::Result::Continue;
}
angle::Result TaskProcessor::releaseToCommandBatch(Context *context,
PrimaryCommandBuffer &&commandBuffer,
CommandPool *commandPool,
CommandBatch *batch)
{
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::releaseToCommandBatch");
batch->primaryCommands = std::move(commandBuffer);
if (commandPool->valid())
{
batch->commandPool = std::move(*commandPool);
// Recreate CommandPool
VkCommandPoolCreateInfo poolInfo = {};
poolInfo.sType = VK_STRUCTURE_TYPE_COMMAND_POOL_CREATE_INFO;
poolInfo.flags = VK_COMMAND_POOL_CREATE_TRANSIENT_BIT;
poolInfo.queueFamilyIndex = context->getRenderer()->getQueueFamilyIndex();
ANGLE_VK_TRY(context, commandPool->init(context->getDevice(), poolInfo));
}
return angle::Result::Continue;
}
angle::Result TaskProcessor::ensurePrimaryCommandBufferValid(Context *context)
{
if (mPrimaryCommandBuffer.valid())
{
return angle::Result::Continue;
}
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::ensurePrimaryCommandBufferValid");
ANGLE_TRY(mPrimaryCommandPool.allocate(context, &mPrimaryCommandBuffer));
VkCommandBufferBeginInfo beginInfo = {};
beginInfo.sType = VK_STRUCTURE_TYPE_COMMAND_BUFFER_BEGIN_INFO;
beginInfo.flags = VK_COMMAND_BUFFER_USAGE_ONE_TIME_SUBMIT_BIT;
beginInfo.pInheritanceInfo = nullptr;
ANGLE_VK_TRY(context, mPrimaryCommandBuffer.begin(beginInfo));
return angle::Result::Continue;
}
void TaskProcessor::handleDeviceLost(Context *context)
{
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::handleDeviceLost");
VkDevice device = context->getDevice();
for (CommandBatch &batch : mInFlightCommands)
{
// On device loss we need to wait for fence to be signaled before destroying it
VkResult status =
batch.fence.get().wait(device, context->getRenderer()->getMaxFenceWaitTimeNs());
// If the wait times out, it is probably not possible to recover from lost device
ASSERT(status == VK_SUCCESS || status == VK_ERROR_DEVICE_LOST);
// On device lost, here simply destroy the CommandBuffer, it will be fully cleared later by
// CommandPool::destroy
batch.primaryCommands.destroy(device);
batch.commandPool.destroy(device);
batch.fence.reset(device);
}
mInFlightCommands.clear();
}
// If there are any inflight commands worker will look for fence that corresponds to the request
// serial or the last available fence and wait on that fence. Will then do necessary cleanup work.
// This can cause the worker thread to block.
// TODO: https://issuetracker.google.com/issues/170312581 - A more optimal solution might be to do
// the wait in CommandProcessor rather than the worker thread. That would require protecting access
// to mInFlightCommands
angle::Result TaskProcessor::finishToSerial(Context *context, Serial serial)
{
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::finishToSerial");
RendererVk *rendererVk = context->getRenderer();
uint64_t timeout = rendererVk->getMaxFenceWaitTimeNs();
if (mInFlightCommands.empty())
{
// No outstanding work, nothing to wait for.
return angle::Result::Continue;
}
// Find the first batch with serial equal to or bigger than given serial (note that
// the batch serials are unique, otherwise upper-bound would have been necessary).
size_t batchIndex = mInFlightCommands.size() - 1;
for (size_t i = 0; i < mInFlightCommands.size(); ++i)
{
if (mInFlightCommands[i].serial >= serial)
{
batchIndex = i;
break;
}
}
const CommandBatch &batch = mInFlightCommands[batchIndex];
// Wait for it finish
VkDevice device = context->getDevice();
ANGLE_VK_TRY(context, batch.fence.get().wait(device, timeout));
// Clean up finished batches.
return checkCompletedCommands(context);
}
angle::Result TaskProcessor::submitFrame(
Context *context,
egl::ContextPriority priority,
const std::vector<VkSemaphore> &waitSemaphores,
const std::vector<VkPipelineStageFlags> &waitSemaphoreStageMasks,
const Semaphore *signalSemaphore,
Shared<Fence> &&sharedFence,
GarbageList *currentGarbage,
CommandPool *commandPool,
Serial submitQueueSerial)
{
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::submitFrame");
// Start an empty primary buffer if we have an empty submit.
ANGLE_TRY(ensurePrimaryCommandBufferValid(context));
ANGLE_VK_TRY(context, mPrimaryCommandBuffer.end());
VkSubmitInfo submitInfo = {};
InitializeSubmitInfo(&submitInfo, mPrimaryCommandBuffer, waitSemaphores,
waitSemaphoreStageMasks, signalSemaphore);
VkDevice device = context->getDevice();
DeviceScoped<CommandBatch> scopedBatch(device);
CommandBatch &batch = scopedBatch.get();
batch.fence = std::move(sharedFence);
batch.serial = submitQueueSerial;
ANGLE_TRY(queueSubmit(context, priority, submitInfo, &batch.fence.get(), batch.serial));
if (!currentGarbage->empty())
{
mGarbageQueue.emplace_back(std::move(*currentGarbage), submitQueueSerial);
}
// Store the primary CommandBuffer and command pool used for secondary CommandBuffers
// in the in-flight list.
ANGLE_TRY(
releaseToCommandBatch(context, std::move(mPrimaryCommandBuffer), commandPool, &batch));
mInFlightCommands.emplace_back(scopedBatch.release());
ANGLE_TRY(checkCompletedCommands(context));
// CPU should be throttled to avoid mInFlightCommands from growing too fast. Important for
// off-screen scenarios.
if (mInFlightCommands.size() > kInFlightCommandsLimit)
{
size_t numCommandsToFinish = mInFlightCommands.size() - kInFlightCommandsLimit;
Serial finishSerial = mInFlightCommands[numCommandsToFinish].serial;
return finishToSerial(context, finishSerial);
}
return angle::Result::Continue;
}
angle::Result TaskProcessor::queueSubmit(Context *context,
egl::ContextPriority priority,
const VkSubmitInfo &submitInfo,
const Fence *fence,
Serial submitQueueSerial)
{
RendererVk *renderer = context->getRenderer();
ASSERT(isValidWorkerThread(context));
ANGLE_TRACE_EVENT0("gpu.angle", "TaskProcessor::queueSubmit");
ASSERT((renderer->getFeatures().asynchronousCommandProcessing.enabled == false) ||
std::this_thread::get_id() == mThreadId);
if (kOutputVmaStatsString)
{
renderer->outputVmaStatString();
}
// Don't need a QueueMutex since all queue accesses are serialized through the worker.
VkQueue queue = renderer->getVkQueue(priority);
VkFence handle = fence ? fence->getHandle() : VK_NULL_HANDLE;
ANGLE_VK_TRY(context, vkQueueSubmit(queue, 1, &submitInfo, handle));
mLastSubmittedQueueSerial = submitQueueSerial;
// Now that we've submitted work, clean up RendererVk garbage
return renderer->cleanupGarbage(mLastCompletedQueueSerial);
}
bool TaskProcessor::isValidWorkerThread(Context *context) const
{
return (context->getRenderer()->getFeatures().asynchronousCommandProcessing.enabled == false) ||
std::this_thread::get_id() == mThreadId;
}
Serial TaskProcessor::reserveSubmitSerial()
{
Serial returnSerial = mCurrentQueueSerial;
mCurrentQueueSerial = mQueueSerialFactory.generate();
return returnSerial;
}
angle::Result TaskProcessor::flushOutsideRPCommands(Context *context,
CommandBufferHelper *outsideRPCommands)
{
ANGLE_TRY(ensurePrimaryCommandBufferValid(context));
return outsideRPCommands->flushToPrimary(context->getRenderer()->getFeatures(),
&mPrimaryCommandBuffer, nullptr);
}
angle::Result TaskProcessor::flushRenderPassCommands(Context *context,
const RenderPass &renderPass,
CommandBufferHelper *renderPassCommands)
{
ANGLE_TRY(ensurePrimaryCommandBufferValid(context));
return renderPassCommands->flushToPrimary(context->getRenderer()->getFeatures(),
&mPrimaryCommandBuffer, &renderPass);
}
void CommandProcessor::handleError(VkResult errorCode, void CommandProcessor::handleError(VkResult errorCode,
const char *file, const char *file,
const char *function, const char *function,
...@@ -626,15 +321,6 @@ void CommandProcessor::queueCommand(Context *context, CommandProcessorTask *task ...@@ -626,15 +321,6 @@ void CommandProcessor::queueCommand(Context *context, CommandProcessorTask *task
} }
} }
angle::Result CommandProcessor::initTaskProcessor(Context *context)
{
// Initialization prior to work thread loop
ANGLE_TRY(mTaskProcessor.init(context, std::this_thread::get_id()));
// Allocate and begin primary command buffer
return angle::Result::Continue;
}
void CommandProcessor::processTasks() void CommandProcessor::processTasks()
{ {
...@@ -660,7 +346,7 @@ void CommandProcessor::processTasks() ...@@ -660,7 +346,7 @@ void CommandProcessor::processTasks()
angle::Result CommandProcessor::processTasksImpl(bool *exitThread) angle::Result CommandProcessor::processTasksImpl(bool *exitThread)
{ {
ANGLE_TRY(initTaskProcessor(this)); ANGLE_TRY(mCommandQueue.init(this));
while (true) while (true)
{ {
...@@ -699,9 +385,10 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task) ...@@ -699,9 +385,10 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task)
{ {
case CustomTask::Exit: case CustomTask::Exit:
{ {
ANGLE_TRY(mTaskProcessor.finishToSerial(this, Serial::Infinite())); ANGLE_TRY(mCommandQueue.finishToSerial(this, Serial::Infinite(),
mRenderer->getMaxFenceWaitTimeNs()));
// Shutting down so cleanup // Shutting down so cleanup
mTaskProcessor.destroy(mRenderer->getDevice()); mCommandQueue.destroy(mRenderer);
mCommandPool.destroy(mRenderer->getDevice()); mCommandPool.destroy(mRenderer->getDevice());
break; break;
} }
...@@ -717,10 +404,10 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task) ...@@ -717,10 +404,10 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task)
ANGLE_TRY(mRenderer->newSharedFence(this, &fence)); ANGLE_TRY(mRenderer->newSharedFence(this, &fence));
// Call submitFrame() // Call submitFrame()
ANGLE_TRY(mTaskProcessor.submitFrame( ANGLE_TRY(mCommandQueue.submitFrame(
this, task->getPriority(), task->getWaitSemaphores(), this, task->getPriority(), task->getWaitSemaphores(),
task->getWaitSemaphoreStageMasks(), task->getSemaphore(), std::move(fence), task->getWaitSemaphoreStageMasks(), task->getSemaphore(), std::move(fence),
&task->getGarbage(), &mCommandPool, task->getQueueSerial())); std::move(task->getGarbage()), &mCommandPool, task->getQueueSerial()));
ASSERT(task->getGarbage().empty()); ASSERT(task->getGarbage().empty());
break; break;
...@@ -738,20 +425,21 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task) ...@@ -738,20 +425,21 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task)
// TODO: https://issuetracker.google.com/issues/170328907 - vkQueueSubmit should be // TODO: https://issuetracker.google.com/issues/170328907 - vkQueueSubmit should be
// owned by TaskProcessor to ensure proper synchronization // owned by TaskProcessor to ensure proper synchronization
ANGLE_TRY(mTaskProcessor.queueSubmit(this, task->getPriority(), submitInfo, ANGLE_TRY(mCommandQueue.queueSubmit(this, task->getPriority(), submitInfo,
task->getOneOffFence(), task->getQueueSerial())); task->getOneOffFence(), task->getQueueSerial()));
ANGLE_TRY(mTaskProcessor.checkCompletedCommands(this)); ANGLE_TRY(mCommandQueue.checkCompletedCommands(this));
break; break;
} }
case CustomTask::FinishToSerial: case CustomTask::FinishToSerial:
{ {
ANGLE_TRY(mTaskProcessor.finishToSerial(this, task->getQueueSerial())); ANGLE_TRY(mCommandQueue.finishToSerial(this, task->getQueueSerial(),
mRenderer->getMaxFenceWaitTimeNs()));
break; break;
} }
case CustomTask::Present: case CustomTask::Present:
{ {
VkResult result = VkResult result =
present(getRenderer()->getVkQueue(task->getPriority()), task->getPresentInfo()); present(mRenderer->getVkQueue(task->getPriority()), task->getPresentInfo());
if (ANGLE_UNLIKELY(result == VK_ERROR_OUT_OF_DATE_KHR || result == VK_SUBOPTIMAL_KHR)) if (ANGLE_UNLIKELY(result == VK_ERROR_OUT_OF_DATE_KHR || result == VK_SUBOPTIMAL_KHR))
{ {
// We get to ignore these as they are not fatal // We get to ignore these as they are not fatal
...@@ -771,12 +459,12 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task) ...@@ -771,12 +459,12 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task)
ASSERT(!task->getCommandBuffer()->empty()); ASSERT(!task->getCommandBuffer()->empty());
if (task->getRenderPass()) if (task->getRenderPass())
{ {
ANGLE_TRY(mTaskProcessor.flushRenderPassCommands(this, *task->getRenderPass(), ANGLE_TRY(mCommandQueue.flushRenderPassCommands(this, *task->getRenderPass(),
task->getCommandBuffer())); task->getCommandBuffer()));
} }
else else
{ {
ANGLE_TRY(mTaskProcessor.flushOutsideRPCommands(this, task->getCommandBuffer())); ANGLE_TRY(mCommandQueue.flushOutsideRPCommands(this, task->getCommandBuffer()));
} }
ASSERT(task->getCommandBuffer()->empty()); ASSERT(task->getCommandBuffer()->empty());
task->getCommandBuffer()->releaseToContextQueue(task->getContextVk()); task->getCommandBuffer()->releaseToContextQueue(task->getContextVk());
...@@ -784,7 +472,7 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task) ...@@ -784,7 +472,7 @@ angle::Result CommandProcessor::processTask(CommandProcessorTask *task)
} }
case CustomTask::CheckCompletedCommands: case CustomTask::CheckCompletedCommands:
{ {
ANGLE_TRY(mTaskProcessor.checkCompletedCommands(this)); ANGLE_TRY(mCommandQueue.checkCompletedCommands(this));
break; break;
} }
default: default:
...@@ -847,25 +535,25 @@ void CommandProcessor::shutdown(std::thread *commandProcessorThread) ...@@ -847,25 +535,25 @@ void CommandProcessor::shutdown(std::thread *commandProcessorThread)
Serial CommandProcessor::getLastCompletedQueueSerial() Serial CommandProcessor::getLastCompletedQueueSerial()
{ {
std::lock_guard<std::mutex> lock(mQueueSerialMutex); std::lock_guard<std::mutex> lock(mQueueSerialMutex);
return mTaskProcessor.getLastCompletedQueueSerial(); return mCommandQueue.getLastCompletedQueueSerial();
} }
Serial CommandProcessor::getLastSubmittedQueueSerial() Serial CommandProcessor::getLastSubmittedQueueSerial()
{ {
std::lock_guard<std::mutex> lock(mQueueSerialMutex); std::lock_guard<std::mutex> lock(mQueueSerialMutex);
return mTaskProcessor.getLastSubmittedQueueSerial(); return mCommandQueue.getLastSubmittedQueueSerial();
} }
Serial CommandProcessor::getCurrentQueueSerial() Serial CommandProcessor::getCurrentQueueSerial()
{ {
std::lock_guard<std::mutex> lock(mQueueSerialMutex); std::lock_guard<std::mutex> lock(mQueueSerialMutex);
return mTaskProcessor.getCurrentQueueSerial(); return mCommandQueue.getCurrentQueueSerial();
} }
Serial CommandProcessor::reserveSubmitSerial() Serial CommandProcessor::reserveSubmitSerial()
{ {
std::lock_guard<std::mutex> lock(mQueueSerialMutex); std::lock_guard<std::mutex> lock(mQueueSerialMutex);
return mTaskProcessor.reserveSubmitSerial(); return mCommandQueue.reserveSubmitSerial();
} }
// Wait until all commands up to and including serial have been processed // Wait until all commands up to and including serial have been processed
...@@ -894,7 +582,7 @@ void CommandProcessor::handleDeviceLost() ...@@ -894,7 +582,7 @@ void CommandProcessor::handleDeviceLost()
} }
// Worker thread is idle and command queue is empty so good to continue // Worker thread is idle and command queue is empty so good to continue
mTaskProcessor.handleDeviceLost(this); mCommandQueue.handleDeviceLost(mRenderer);
} }
void CommandProcessor::finishAllWork(Context *context) void CommandProcessor::finishAllWork(Context *context)
...@@ -965,7 +653,6 @@ angle::Result CommandQueue::init(Context *context) ...@@ -965,7 +653,6 @@ angle::Result CommandQueue::init(Context *context)
angle::Result CommandQueue::checkCompletedCommands(Context *context) angle::Result CommandQueue::checkCompletedCommands(Context *context)
{ {
ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::checkCompletedCommandsNoLock"); ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::checkCompletedCommandsNoLock");
ASSERT(!context->getRenderer()->getFeatures().commandProcessor.enabled);
RendererVk *renderer = context->getRenderer(); RendererVk *renderer = context->getRenderer();
VkDevice device = renderer->getDevice(); VkDevice device = renderer->getDevice();
...@@ -1045,6 +732,8 @@ angle::Result CommandQueue::releaseToCommandBatch(Context *context, ...@@ -1045,6 +732,8 @@ angle::Result CommandQueue::releaseToCommandBatch(Context *context,
CommandPool *commandPool, CommandPool *commandPool,
CommandBatch *batch) CommandBatch *batch)
{ {
ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::releaseToCommandBatch");
RendererVk *renderer = context->getRenderer(); RendererVk *renderer = context->getRenderer();
VkDevice device = renderer->getDevice(); VkDevice device = renderer->getDevice();
...@@ -1079,6 +768,8 @@ void CommandQueue::clearAllGarbage(RendererVk *renderer) ...@@ -1079,6 +768,8 @@ void CommandQueue::clearAllGarbage(RendererVk *renderer)
void CommandQueue::handleDeviceLost(RendererVk *renderer) void CommandQueue::handleDeviceLost(RendererVk *renderer)
{ {
ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::handleDeviceLost");
VkDevice device = renderer->getDevice(); VkDevice device = renderer->getDevice();
for (CommandBatch &batch : mInFlightCommands) for (CommandBatch &batch : mInFlightCommands)
...@@ -1105,8 +796,6 @@ bool CommandQueue::allInFlightCommandsAreAfterSerial(Serial serial) const ...@@ -1105,8 +796,6 @@ bool CommandQueue::allInFlightCommandsAreAfterSerial(Serial serial) const
angle::Result CommandQueue::finishToSerial(Context *context, Serial finishSerial, uint64_t timeout) angle::Result CommandQueue::finishToSerial(Context *context, Serial finishSerial, uint64_t timeout)
{ {
ASSERT(!context->getRenderer()->getFeatures().commandProcessor.enabled);
if (mInFlightCommands.empty()) if (mInFlightCommands.empty())
{ {
return angle::Result::Continue; return angle::Result::Continue;
...@@ -1171,7 +860,6 @@ angle::Result CommandQueue::submitFrame( ...@@ -1171,7 +860,6 @@ angle::Result CommandQueue::submitFrame(
signalSemaphore); signalSemaphore);
ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::submitFrame"); ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::submitFrame");
ASSERT(!context->getRenderer()->getFeatures().commandProcessor.enabled);
RendererVk *renderer = context->getRenderer(); RendererVk *renderer = context->getRenderer();
VkDevice device = renderer->getDevice(); VkDevice device = renderer->getDevice();
...@@ -1198,10 +886,11 @@ angle::Result CommandQueue::submitFrame( ...@@ -1198,10 +886,11 @@ angle::Result CommandQueue::submitFrame(
// CPU should be throttled to avoid mInFlightCommands from growing too fast. Important for // CPU should be throttled to avoid mInFlightCommands from growing too fast. Important for
// off-screen scenarios. // off-screen scenarios.
while (mInFlightCommands.size() > kInFlightCommandsLimit) if (mInFlightCommands.size() > kInFlightCommandsLimit)
{ {
ANGLE_TRY(finishToSerial(context, mInFlightCommands[0].serial, size_t numCommandsToFinish = mInFlightCommands.size() - kInFlightCommandsLimit;
renderer->getMaxFenceWaitTimeNs())); Serial finishSerial = mInFlightCommands[numCommandsToFinish].serial;
ANGLE_TRY(finishToSerial(context, finishSerial, renderer->getMaxFenceWaitTimeNs()));
} }
return angle::Result::Continue; return angle::Result::Continue;
...@@ -1296,6 +985,8 @@ angle::Result CommandQueue::queueSubmit(Context *context, ...@@ -1296,6 +985,8 @@ angle::Result CommandQueue::queueSubmit(Context *context,
const Fence *fence, const Fence *fence,
Serial submitQueueSerial) Serial submitQueueSerial)
{ {
ANGLE_TRACE_EVENT0("gpu.angle", "CommandQueue::queueSubmit");
RendererVk *renderer = context->getRenderer(); RendererVk *renderer = context->getRenderer();
if (kOutputVmaStatsString) if (kOutputVmaStatsString)
......
...@@ -236,71 +236,6 @@ class CommandQueue final : angle::NonCopyable ...@@ -236,71 +236,6 @@ class CommandQueue final : angle::NonCopyable
Serial mCurrentQueueSerial; Serial mCurrentQueueSerial;
}; };
class TaskProcessor : angle::NonCopyable
{
public:
TaskProcessor();
~TaskProcessor();
angle::Result init(Context *context, std::thread::id threadId);
void destroy(VkDevice device);
angle::Result finishToSerial(Context *context, Serial serial);
angle::Result submitFrame(Context *context,
egl::ContextPriority priority,
const std::vector<VkSemaphore> &waitSemaphores,
const std::vector<VkPipelineStageFlags> &waitSemaphoreStageMasks,
const Semaphore *signalSemaphore,
Shared<Fence> &&sharedFence,
GarbageList *currentGarbage,
CommandPool *commandPool,
Serial submitQueueSerial);
angle::Result queueSubmit(Context *context,
egl::ContextPriority priority,
const VkSubmitInfo &submitInfo,
const Fence *fence,
Serial submitQueueSerial);
void handleDeviceLost(Context *context);
angle::Result checkCompletedCommands(Context *context);
angle::Result flushOutsideRPCommands(Context *context, CommandBufferHelper *outsideRPCommands);
angle::Result flushRenderPassCommands(Context *context,
const RenderPass &renderPass,
CommandBufferHelper *renderPassCommands);
Serial reserveSubmitSerial();
ANGLE_INLINE Serial getLastSubmittedQueueSerial() const { return mLastSubmittedQueueSerial; }
ANGLE_INLINE Serial getLastCompletedQueueSerial() const { return mLastCompletedQueueSerial; }
ANGLE_INLINE Serial getCurrentQueueSerial() const { return mCurrentQueueSerial; }
private:
bool isValidWorkerThread(Context *context) const;
angle::Result releaseToCommandBatch(Context *context,
PrimaryCommandBuffer &&commandBuffer,
CommandPool *commandPool,
CommandBatch *batch);
angle::Result ensurePrimaryCommandBufferValid(Context *context);
GarbageQueue mGarbageQueue;
std::vector<CommandBatch> mInFlightCommands;
// Keeps a free list of reusable primary command buffers.
PrimaryCommandBuffer mPrimaryCommandBuffer;
PersistentCommandPool mPrimaryCommandPool;
std::thread::id mThreadId;
// Queue serial management.
AtomicSerialFactory mQueueSerialFactory;
Serial mLastCompletedQueueSerial;
Serial mLastSubmittedQueueSerial;
Serial mCurrentQueueSerial;
};
// TODO(jmadill): Give this the same API as CommandQueue. b/172704839 // TODO(jmadill): Give this the same API as CommandQueue. b/172704839
class CommandProcessor : public Context class CommandProcessor : public Context
{ {
...@@ -308,8 +243,6 @@ class CommandProcessor : public Context ...@@ -308,8 +243,6 @@ class CommandProcessor : public Context
CommandProcessor(RendererVk *renderer); CommandProcessor(RendererVk *renderer);
~CommandProcessor() override; ~CommandProcessor() override;
angle::Result initTaskProcessor(Context *context);
void handleError(VkResult result, void handleError(VkResult result,
const char *file, const char *file,
const char *function, const char *function,
...@@ -376,7 +309,7 @@ class CommandProcessor : public Context ...@@ -376,7 +309,7 @@ class CommandProcessor : public Context
bool mWorkerThreadIdle; bool mWorkerThreadIdle;
// Command pool to allocate processor thread primary command buffers from // Command pool to allocate processor thread primary command buffers from
CommandPool mCommandPool; CommandPool mCommandPool;
TaskProcessor mTaskProcessor; CommandQueue mCommandQueue;
std::mutex mQueueSerialMutex; std::mutex mQueueSerialMutex;
......
...@@ -911,17 +911,11 @@ angle::Result RendererVk::initialize(DisplayVk *displayVk, ...@@ -911,17 +911,11 @@ angle::Result RendererVk::initialize(DisplayVk *displayVk,
if (getFeatures().commandProcessor.enabled) if (getFeatures().commandProcessor.enabled)
{ {
if (getFeatures().asynchronousCommandProcessing.enabled) // TODO(jmadill): Clean up. b/172704839
{ ASSERT(mFeatures.asynchronousCommandProcessing.enabled);
ASSERT(getFeatures().commandProcessor.enabled); mCommandProcessorThread =
mCommandProcessorThread = std::thread(&vk::CommandProcessor::processTasks, &mCommandProcessor);
std::thread(&vk::CommandProcessor::processTasks, &mCommandProcessor); waitForCommandProcessorIdle(displayVk);
waitForCommandProcessorIdle(displayVk);
}
else
{
ANGLE_TRY(mCommandProcessor.initTaskProcessor(displayVk));
}
} }
else else
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment