Commit e90568ee by Ben Clayton

Update Marl to a047dd0bb

Fixes another hairy bug in condition-variable timeouts, adds docs Changes: a047dd0bb Include benchmarks on project's README.md 4c702da52 Scheduler: Fix lock state on Fiber::wait timeout. b4e305525 Docs: Add documentation for marl::Scheduler. 5f18ac0e0 ConditionVariable: Delete copy and move constructors f78eb441f Scheduler: document requirement to unbind() before destruction 9f9f6d32e Defer benchmark - avoid benchmark::DoNotOptimize() 3b610e902 Fix compiler warnings with MARL_FIBERS_USE_UCONTEXT 0dbab1184 Scheduler: Delete copy and move constructors / assignment ops. cbef55d58 Kokoro: Build benchmarks e923c3d96 Rework the 'hello task' example to be more idiomatic Commands: git subtree pull --prefix third_party/marl https://github.com/google/marl master --squash Bug: b/140546382 Change-Id: I3412287ec95bdf761d84fd9f0dffae8f08bcec3c
parents eceeb201 f1f6e688
......@@ -18,6 +18,7 @@ Example:
#include "marl/defer.h"
#include "marl/event.h"
#include "marl/scheduler.h"
#include "marl/waitgroup.h"
#include <cstdio>
......@@ -29,33 +30,38 @@ int main() {
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
// Create an event that automatically resets itself.
marl::Event sayHellow(marl::Event::Mode::Auto);
marl::Event saidHellow(marl::Event::Mode::Auto);
constexpr int numTasks = 10;
// Create an event that is manually reset.
marl::Event sayHellow(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks.
marl::WaitGroup saidHellow(numTasks);
// Schedule some tasks to run asynchronously.
for (int i = 0; i < 10; i++) {
for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value.
printf("Task %d waiting to say hello!\n", i);
// Decrement the WaitGroup counter when the task has finished.
defer(saidHellow.done());
printf("Task %d waiting to say hello...\n", i);
// Blocking in a task?
// The scheduler will find something else for this thread to do.
sayHellow.wait();
printf("Hello from task %d!\n", i);
saidHellow.signal();
});
}
// Unblock the tasks one by one.
for (int i = 0; i < 10; i++) {
sayHellow.signal();
saidHellow.wait();
}
sayHellow.signal(); // Unblock all the tasks.
// All tasks are guaranteed to completed before the scheduler is destructed.
saidHellow.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n");
// All tasks are guaranteed to complete before the scheduler is destructed.
}
```
......@@ -111,6 +117,10 @@ set(MARL_GOOGLETEST_DIR <path-to-googletest>) # defaults to ${MARL_THIR
add_subdirectory(${MARL_DIR})
```
## Benchmarks
Graphs of several microbenchmarks can be found [here](https://google.github.io/marl/benchmarks).
---
Note: This is not an officially supported Google product
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xl="http://www.w3.org/1999/xlink" xmlns:dc="http://purl.org/dc/elements/1.1/" version="1.1" viewBox="724 1659.5 441.5 408.5" width="441.5" height="408.5">
<defs>
<font-face font-family="Courier New" font-size="16" panose-1="2 7 3 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="41.015625" slope="0" x-height="422.85156" cap-height="571.28906" ascent="832.5195" descent="-300.29297" font-weight="400">
<font-face-src>
<font-face-name name="CourierNewPSMT"/>
</font-face-src>
</font-face>
<filter id="Shadow" filterUnits="userSpaceOnUse" x="724" y="1659.5">
<feOffset in="SourceAlpha" result="offset" dx="0" dy="2"/>
<feFlood flood-color="#919191" flood-opacity=".25" result="flood"/>
<feComposite in="flood" in2="offset" operator="in" result="color"/>
<feMerge>
<feMergeNode in="color"/>
<feMergeNode in="SourceGraphic"/>
</feMerge>
</filter>
<font-face font-family="Roboto" font-size="13" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="700">
<font-face-src>
<font-face-name name="Roboto-Bold"/>
</font-face-src>
</font-face>
<font-face font-family="Courier New" font-size="13" panose-1="2 7 6 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="100.09766" slope="0" x-height="443.3594" cap-height="591.7969" ascent="832.5195" descent="-300.29297" font-weight="700">
<font-face-src>
<font-face-name name="CourierNewPS-BoldMT"/>
</font-face-src>
</font-face>
<font-face font-family="Roboto" font-size="12" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="400">
<font-face-src>
<font-face-name name="Roboto-Regular"/>
</font-face-src>
</font-face>
<font-face font-family="Roboto" font-size="11" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="400">
<font-face-src>
<font-face-name name="Roboto-Regular"/>
</font-face-src>
</font-face>
<marker orient="auto" overflow="visible" markerUnits="strokeWidth" id="FilledArrow_Marker" stroke-linejoin="miter" stroke-miterlimit="10" viewBox="-1 -3 5 6" markerWidth="5" markerHeight="6" color="#00aeef">
<g>
<path d="M 2.88 0 L 0 -1.08 L 0 1.08 Z" fill="currentColor" stroke="currentColor" stroke-width="1"/>
</g>
</marker>
</defs>
<metadata> Produced by OmniGraffle 7.12.1
<dc:date>2020-02-12 20:52:25 +0000</dc:date>
</metadata>
<g id="Canvas_1" stroke="none" stroke-opacity="1" fill="none" fill-opacity="1" stroke-dasharray="none">
<title>Canvas 1</title>
<g id="Canvas_1: Layer 1">
<title>Layer 1</title>
<g id="Graphic_259">
<rect x="745" y="1660" width="420" height="385" fill="#4751d4" fill-opacity=".04274277"/>
<path d="M 745 1660 L 1165 1660 L 1165 2045 L 745 2045 Z" stroke="gray" stroke-linecap="round" stroke-linejoin="round" stroke-dasharray="1.0,4.0" stroke-width="1"/>
<clipPath id="clip_path">
<rect x="0" y="0" width="420" height="385" fill="#4751d4" fill-opacity=".04274277"/>
</clipPath>
<text clip-path="url(#clip_path)" transform="translate(750 1665)" fill="black">
<tspan font-family="Courier New" font-size="16" font-weight="400" fill="black" x="27.371094" y="13">Worker::run() (Multi-Threaded-Worker)</tspan>
</text>
</g>
<g id="Graphic_260" filter="url(#Shadow)">
<path d="M 825 2025 L 895 2025 C 903.28 2025 910 2033.96 910 2045 C 910 2056.04 903.28 2065 895 2065 L 825 2065 C 816.72 2065 810 2056.04 810 2045 C 810 2033.96 816.72 2025 825 2025 Z" fill="#ffc7b1"/>
<path d="M 825 2025 L 895 2025 C 903.28 2025 910 2033.96 910 2045 C 910 2056.04 903.28 2065 895 2065 L 825 2065 C 816.72 2065 810 2056.04 810 2045 C 810 2033.96 816.72 2025 825 2025 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(816 2037.5)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="28.946533" y="12">Done</tspan>
</text>
</g>
<g id="Graphic_261" filter="url(#Shadow)">
<path d="M 765 1851.75 L 765 1918.25 C 765 1926.116 756.04 1932.5 745 1932.5 C 733.96 1932.5 725 1926.116 725 1918.25 L 725 1851.75 C 725 1843.884 733.96 1837.5 745 1837.5 C 756.04 1837.5 765 1843.884 765 1851.75 Z" fill="#a7fee5"/>
<path d="M 765 1851.75 L 765 1918.25 C 765 1926.116 756.04 1932.5 745 1932.5 C 733.96 1932.5 725 1926.116 725 1918.25 L 725 1851.75 C 725 1843.884 733.96 1837.5 745 1837.5 C 756.04 1837.5 765 1843.884 765 1851.75 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(752.5 1843.5) rotate(90)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="27.097168" y="12">Start</tspan>
</text>
</g>
<g id="Graphic_262" filter="url(#Shadow)">
<title>join</title>
<rect x="1e3" y="1725" width="140" height="40" fill="#c0c0ff"/>
<rect x="1e3" y="1725" width="140" height="40" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(1006 1737.5)" fill="#515556">
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="13.291748" y="11">waitForWork()</tspan>
</text>
</g>
<g id="Graphic_263" filter="url(#Shadow)">
<path d="M 860 1855 L 925 1885 L 860 1915 L 795 1885 Z" fill="white"/>
<path d="M 860 1855 L 925 1885 L 860 1915 L 795 1885 Z" stroke="#fcc04d" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(801 1878)" fill="#515556">
<tspan font-family="Roboto" font-size="12" font-weight="400" fill="#515556" x="29.410156" y="11">Shutdown?</tspan>
</text>
</g>
<g id="Graphic_264" filter="url(#Shadow)">
<circle cx="860" cy="1960" r="15.0000239685285" fill="white"/>
<circle cx="860" cy="1960" r="15.0000239685285" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(851 1953.5)" fill="#515556">
<tspan font-family="Roboto" font-size="11" font-weight="400" fill="#515556" x=".12158203" y="10">Yes</tspan>
</text>
</g>
<g id="Graphic_268" filter="url(#Shadow)">
<circle cx="860" cy="1810" r="15.0000239685285" fill="white"/>
<circle cx="860" cy="1810" r="15.0000239685285" stroke="#b1001c" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(851 1803.5)" fill="#515556">
<tspan font-family="Roboto" font-size="11" font-weight="400" fill="#515556" x="1.9423828" y="10">No</tspan>
</text>
</g>
<g id="Line_269">
<line x1="860" y1="1853.8986" x2="860" y2="1835.06" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_270">
<line x1="860" y1="1916.1014" x2="860" y2="1934.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_271">
<path d="M 860 1795 L 860 1745 L 990.94 1745" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_272">
<line x1="765" y1="1885" x2="783.5537" y2="1885" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_275">
<line x1="860" y1="1975" x2="860" y2="2014.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Graphic_277" filter="url(#Shadow)">
<title>join</title>
<rect x="1e3" y="1800" width="140" height="40" fill="#c0c0ff"/>
<rect x="1e3" y="1800" width="140" height="40" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(1006 1812.5)" fill="#515556">
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="9.391113" y="11">runUntilIdle()</tspan>
</text>
</g>
<g id="Line_278">
<line x1="1070" y1="1765" x2="1070" y2="1789.94" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_279">
<path d="M 1070 1840 L 1070 1885 L 934.06 1885" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
</g>
</g>
</svg>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xl="http://www.w3.org/1999/xlink" xmlns:dc="http://purl.org/dc/elements/1.1/" version="1.1" viewBox="304 1234.5 377 343.5" width="377" height="343.5">
<defs>
<font-face font-family="Courier New" font-size="16" panose-1="2 7 3 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="41.015625" slope="0" x-height="422.85156" cap-height="571.28906" ascent="832.5195" descent="-300.29297" font-weight="400">
<font-face-src>
<font-face-name name="CourierNewPSMT"/>
</font-face-src>
</font-face>
<filter id="Shadow" filterUnits="userSpaceOnUse" x="304" y="1234.5">
<feOffset in="SourceAlpha" result="offset" dx="0" dy="2"/>
<feFlood flood-color="#919191" flood-opacity=".25" result="flood"/>
<feComposite in="flood" in2="offset" operator="in" result="color"/>
<feMerge>
<feMergeNode in="color"/>
<feMergeNode in="SourceGraphic"/>
</feMerge>
</filter>
<font-face font-family="Roboto" font-size="13" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="400">
<font-face-src>
<font-face-name name="Roboto-Regular"/>
</font-face-src>
</font-face>
<font-face font-family="Courier New" font-size="13" panose-1="2 7 6 9 2 2 5 2 4 4" units-per-em="1000" underline-position="-232.91016" underline-thickness="100.09766" slope="0" x-height="443.3594" cap-height="591.7969" ascent="832.5195" descent="-300.29297" font-weight="700">
<font-face-src>
<font-face-name name="CourierNewPS-BoldMT"/>
</font-face-src>
</font-face>
<font-face font-family="Roboto" font-size="13" panose-1="2 0 0 0 0 0 0 0 0 0" units-per-em="1000" underline-position="-73.24219" underline-thickness="48.828125" slope="0" x-height="528.3203" cap-height="710.9375" ascent="927.7344" descent="-244.14062" font-weight="700">
<font-face-src>
<font-face-name name="Roboto-Bold"/>
</font-face-src>
</font-face>
<marker orient="auto" overflow="visible" markerUnits="strokeWidth" id="FilledArrow_Marker" stroke-linejoin="miter" stroke-miterlimit="10" viewBox="-1 -3 5 6" markerWidth="5" markerHeight="6" color="#00aeef">
<g>
<path d="M 2.88 0 L 0 -1.08 L 0 1.08 Z" fill="currentColor" stroke="currentColor" stroke-width="1"/>
</g>
</marker>
</defs>
<metadata> Produced by OmniGraffle 7.12.1
<dc:date>2020-02-12 20:49:36 +0000</dc:date>
</metadata>
<g id="Canvas_1" stroke="none" stroke-opacity="1" fill="none" fill-opacity="1" stroke-dasharray="none">
<title>Canvas 1</title>
<g id="Canvas_1: Layer 1">
<title>Layer 1</title>
<g id="Graphic_61">
<rect x="325" y="1235" width="335" height="342.5" fill="#4751d4" fill-opacity=".04274277"/>
<path d="M 325 1235 L 660 1235 L 660 1577.5 L 325 1577.5 Z" stroke="gray" stroke-linecap="round" stroke-linejoin="round" stroke-dasharray="1.0,4.0" stroke-width="1"/>
<clipPath id="clip_path">
<rect x="0" y="0" width="335" height="342.5" fill="#4751d4" fill-opacity=".04274277"/>
</clipPath>
<text clip-path="url(#clip_path)" transform="translate(330 1240)" fill="black">
<tspan font-family="Courier New" font-size="16" font-weight="400" fill="black" x="61.683594" y="13">Worker::waitForWork()</tspan>
</text>
</g>
<g id="Graphic_84" filter="url(#Shadow)">
<title>join</title>
<rect x="390" y="1377.5" width="195" height="45" fill="white"/>
<rect x="390" y="1377.5" width="195" height="45" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(396 1385)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="400" fill="#515556" x="53.804443" y="12">Block Thread</tspan>
<tspan font-family="Roboto" font-size="13" font-weight="400" fill="#515556" x="34.256836" y="27">Until Work Available</tspan>
</text>
</g>
<g id="Graphic_85" filter="url(#Shadow)">
<title>join</title>
<rect x="390" y="1447.5" width="195" height="79.56931" fill="white"/>
<rect x="390" y="1447.5" width="195" height="79.56931" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(396 1457.2847)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="400" fill="#515556" x="13.081055" y="12">Move timed-out fibers from</tspan>
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="44.692383" y="26">work.waiting</tspan>
<tspan font-family="Roboto" font-size="13" font-weight="400" fill="#515556" x="85.76172" y="42">to</tspan>
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="48.59302" y="56">work.fibers</tspan>
</text>
</g>
<g id="Graphic_203" filter="url(#Shadow)">
<title>join</title>
<rect x="390" y="1307.5" width="195" height="45" fill="#c0c0ff"/>
<rect x="390" y="1307.5" width="195" height="45" stroke="#00aeef" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(396 1322.5)" fill="#515556">
<tspan font-family="Courier New" font-size="13" font-weight="700" fill="#515556" x="40.79175" y="11">spinForWork()</tspan>
</text>
</g>
<g id="Graphic_204" filter="url(#Shadow)">
<path d="M 345 1371.75 L 345 1438.25 C 345 1446.116 336.04 1452.5 325 1452.5 C 313.96 1452.5 305 1446.116 305 1438.25 L 305 1371.75 C 305 1363.884 313.96 1357.5 325 1357.5 C 336.04 1357.5 345 1363.884 345 1371.75 Z" fill="#a7fee5"/>
<path d="M 345 1371.75 L 345 1438.25 C 345 1446.116 336.04 1452.5 325 1452.5 C 313.96 1452.5 305 1446.116 305 1438.25 L 305 1371.75 C 305 1363.884 313.96 1357.5 325 1357.5 C 336.04 1357.5 345 1363.884 345 1371.75 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(332.5 1363.5) rotate(90)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="27.097168" y="12">Start</tspan>
</text>
</g>
<g id="Graphic_206" filter="url(#Shadow)">
<path d="M 680 1371.75 L 680 1438.25 C 680 1446.116 671.04 1452.5 660 1452.5 C 648.96 1452.5 640 1446.116 640 1438.25 L 640 1371.75 C 640 1363.884 648.96 1357.5 660 1357.5 C 671.04 1357.5 680 1363.884 680 1371.75 Z" fill="#ffc7b1"/>
<path d="M 680 1371.75 L 680 1438.25 C 680 1446.116 671.04 1452.5 660 1452.5 C 648.96 1452.5 640 1446.116 640 1438.25 L 640 1371.75 C 640 1363.884 648.96 1357.5 660 1357.5 C 671.04 1357.5 680 1363.884 680 1371.75 Z" stroke="#235e00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2"/>
<text transform="translate(667.5 1363.5) rotate(90)" fill="#515556">
<tspan font-family="Roboto" font-size="13" font-weight="700" fill="#515556" x="26.446533" y="12">Done</tspan>
</text>
</g>
<g id="Line_207">
<path d="M 345 1405 L 370 1405 L 370 1277.5 L 487.5 1277.5 L 487.5 1298.44" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_208">
<line x1="487.5" y1="1352.5" x2="487.5" y2="1368.44" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_209">
<line x1="487.5" y1="1422.5" x2="487.5" y2="1437.44" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
<g id="Line_210">
<path d="M 487.5 1527.0693 L 487.5 1552.5 L 610 1552.5 L 610 1405 L 630.94 1405" marker-end="url(#FilledArrow_Marker)" stroke="#00aeef" stroke-linecap="square" stroke-linejoin="bevel" stroke-width="2"/>
</g>
</g>
</g>
</svg>
......@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Simple "hello world" example that uses marl::Event.
// Simple "hello world" example that uses marl::Event and marl::WaitGroup.
#include "marl/defer.h"
#include "marl/event.h"
#include "marl/scheduler.h"
#include "marl/waitgroup.h"
#include <cstdio>
......@@ -28,31 +29,36 @@ int main() {
scheduler.setWorkerThreadCount(4);
defer(scheduler.unbind()); // Automatically unbind before returning.
// Create an event that automatically resets itself.
marl::Event sayHellow(marl::Event::Mode::Auto);
marl::Event saidHellow(marl::Event::Mode::Auto);
constexpr int numTasks = 10;
// Create an event that is manually reset.
marl::Event sayHellow(marl::Event::Mode::Manual);
// Create a WaitGroup with an initial count of numTasks.
marl::WaitGroup saidHellow(numTasks);
// Schedule some tasks to run asynchronously.
for (int i = 0; i < 10; i++) {
for (int i = 0; i < numTasks; i++) {
// Each task will run on one of the 4 worker threads.
marl::schedule([=] { // All marl primitives are capture-by-value.
printf("Task %d waiting to say hello!\n", i);
// Decrement the WaitGroup counter when the task has finished.
defer(saidHellow.done());
printf("Task %d waiting to say hello...\n", i);
// Blocking in a task?
// The scheduler will find something else for this thread to do.
sayHellow.wait();
printf("Hello from task %d!\n", i);
saidHellow.signal();
});
}
// Unblock the tasks one by one.
for (int i = 0; i < 10; i++) {
sayHellow.signal();
saidHellow.wait();
}
sayHellow.signal(); // Unblock all the tasks.
saidHellow.wait(); // Wait for all tasks to complete.
printf("All tasks said hello.\n");
// All tasks are guaranteed to completed before the scheduler is destructed.
// All tasks are guaranteed to complete before the scheduler is destructed.
}
......@@ -34,6 +34,8 @@ namespace marl {
// thread will work on other tasks until the ConditionVariable is unblocked.
class ConditionVariable {
public:
inline ConditionVariable();
// notify_one() notifies and potentially unblocks one waiting fiber or thread.
inline void notify_one();
......@@ -65,6 +67,11 @@ class ConditionVariable {
Predicate&& pred);
private:
ConditionVariable(const ConditionVariable&) = delete;
ConditionVariable(ConditionVariable&&) = delete;
ConditionVariable& operator=(const ConditionVariable&) = delete;
ConditionVariable& operator=(ConditionVariable&&) = delete;
std::mutex mutex;
std::unordered_set<Scheduler::Fiber*> waiting;
std::condition_variable condition;
......@@ -72,6 +79,8 @@ class ConditionVariable {
std::atomic<int> numWaitingOnCondition = {0};
};
ConditionVariable::ConditionVariable() {}
void ConditionVariable::notify_one() {
if (numWaiting == 0) {
return;
......
......@@ -43,6 +43,7 @@ using Task = std::function<void()>;
// A scheduler can be bound to one or more threads using the bind() method.
// Once bound to a thread, that thread can call marl::schedule() to enqueue
// work tasks to be executed asynchronously.
// All threads must be unbound with unbind() before the scheduler is destructed.
// Scheduler are initially constructed in single-threaded mode.
// Call setWorkerThreadCount() to spawn dedicated worker threads.
class Scheduler {
......@@ -53,6 +54,10 @@ class Scheduler {
using Predicate = std::function<bool()>;
Scheduler(Allocator* allocator = Allocator::Default);
// Destructor.
// Ensure that all threads are unbound before calling - failure to do so may
// result in leaked memory.
~Scheduler();
// get() returns the scheduler bound to the current thread.
......@@ -64,6 +69,8 @@ class Scheduler {
// unbind() unbinds the scheduler currently bound to the current thread.
// There must be a existing scheduler bound to the thread prior to calling.
// unbind() flushes any enqueued tasks on the single-threaded worker before
// returning.
static void unbind();
// enqueue() queues the task for asynchronous execution.
......@@ -117,6 +124,7 @@ class Scheduler {
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
_Requires_lock_held_(lock)
void wait(Lock& lock, const Predicate& pred);
// wait() suspends execution of this Fiber until the Fiber is woken up with
......@@ -133,6 +141,7 @@ class Scheduler {
// will be locked before wait() returns.
// pred will be always be called with the lock held.
// wait() must only be called on the currently executing fiber.
_Requires_lock_held_(lock)
template <typename Clock, typename Duration>
inline bool wait(Lock& lock,
const std::chrono::time_point<Clock, Duration>& timeout,
......@@ -201,6 +210,11 @@ class Scheduler {
};
private:
Scheduler(const Scheduler&) = delete;
Scheduler(Scheduler&&) = delete;
Scheduler& operator=(const Scheduler&) = delete;
Scheduler& operator=(Scheduler&&) = delete;
// Stack size in bytes of a new fiber.
// TODO: Make configurable so the default size can be reduced.
static constexpr size_t FiberStackSize = 1024 * 1024;
......@@ -271,6 +285,7 @@ class Scheduler {
// wait() suspends execution of the current task until the predicate pred
// returns true.
// See Fiber::wait() for more information.
_Requires_lock_held_(lock)
bool wait(Fiber::Lock& lock,
const TimePoint* timeout,
const Predicate& pred);
......@@ -436,6 +451,7 @@ class Scheduler {
singleThreadedWorkers;
};
_Requires_lock_held_(lock)
template <typename Clock, typename Duration>
bool Scheduler::Fiber::wait(
Lock& lock,
......
......@@ -13,7 +13,7 @@ if [ "$BUILD_SYSTEM" == "cmake" ]; then
mkdir build
cd build
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_WARNINGS_AS_ERRORS=1
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_BUILD_BENCHMARKS=1 -DMARL_WARNINGS_AS_ERRORS=1
make -j$(sysctl -n hw.logicalcpu)
./marl-unittests
......
......@@ -14,7 +14,7 @@ if [ "$BUILD_SYSTEM" == "cmake" ]; then
cd build
build_and_run() {
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_WARNINGS_AS_ERRORS=1 $1
cmake .. -DMARL_BUILD_EXAMPLES=1 -DMARL_BUILD_TESTS=1 -DMARL_BUILD_BENCHMARKS=1 -DMARL_WARNINGS_AS_ERRORS=1 $1
make --jobs=$(nproc)
./marl-unittests
......
......@@ -20,7 +20,7 @@ cd %SRC%\build
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
IF /I "%BUILD_SYSTEM%"=="cmake" (
cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_WARNINGS_AS_ERRORS=1"
cmake .. -G "%BUILD_GENERATOR%" "-DMARL_BUILD_TESTS=1" "-DMARL_BUILD_EXAMPLES=1" "-DMARL_BUILD_BENCHMARKS=1" "-DMARL_WARNINGS_AS_ERRORS=1"
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
%MSBUILD% /p:Configuration=%CONFIG% Marl.sln
if !ERRORLEVEL! neq 0 exit !ERRORLEVEL!
......
......@@ -13,9 +13,13 @@
// limitations under the License.
#include "marl/conditionvariable.h"
#include "marl/waitgroup.h"
#include "marl_test.h"
#include <condition_variable>
#include <mutex>
TEST_F(WithoutBoundScheduler, ConditionVariable) {
bool trigger[3] = {false, false, false};
bool signal[3] = {false, false, false};
......@@ -25,7 +29,11 @@ TEST_F(WithoutBoundScheduler, ConditionVariable) {
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return trigger[i]; });
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return trigger[i];
});
EXPECT_TRUE(lock.owns_lock());
signal[i] = true;
cv.notify_one();
}
......@@ -40,7 +48,11 @@ TEST_F(WithoutBoundScheduler, ConditionVariable) {
std::unique_lock<std::mutex> lock(mutex);
trigger[i] = true;
cv.notify_one();
cv.wait(lock, [&] { return signal[i]; });
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return signal[i];
});
EXPECT_TRUE(lock.owns_lock());
}
ASSERT_EQ(signal[0], 0 <= i);
......@@ -60,7 +72,11 @@ TEST_P(WithBoundScheduler, ConditionVariable) {
std::thread thread([&] {
for (int i = 0; i < 3; i++) {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return trigger[i]; });
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return trigger[i];
});
EXPECT_TRUE(lock.owns_lock());
signal[i] = true;
cv.notify_one();
}
......@@ -75,7 +91,11 @@ TEST_P(WithBoundScheduler, ConditionVariable) {
std::unique_lock<std::mutex> lock(mutex);
trigger[i] = true;
cv.notify_one();
cv.wait(lock, [&] { return signal[i]; });
cv.wait(lock, [&] {
EXPECT_TRUE(lock.owns_lock());
return signal[i];
});
EXPECT_TRUE(lock.owns_lock());
}
ASSERT_EQ(signal[0], 0 <= i);
......@@ -85,3 +105,39 @@ TEST_P(WithBoundScheduler, ConditionVariable) {
thread.join();
}
// ConditionVariableTimeouts spins up a whole lot of wait_fors(), unblocking
// some with timeouts and some with a notify, and then let's all the workers
// go to idle before repeating.
// This is testing to ensure that the scheduler handles timeouts correctly when
// they are early-unblocked, along with expected lock state.
TEST_P(WithBoundScheduler, ConditionVariableTimeouts) {
for (int i = 0; i < 10; i++) {
std::mutex mutex;
marl::ConditionVariable cv;
bool signaled = false; // guarded by mutex
auto wg = marl::WaitGroup(100);
for (int j = 0; j < 100; j++) {
marl::schedule([=, &mutex, &cv, &signaled] {
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait_for(lock, std::chrono::milliseconds(j), [&] {
EXPECT_TRUE(lock.owns_lock());
return signaled;
});
EXPECT_TRUE(lock.owns_lock());
}
// Ensure the mutex unlock happens *before* the wg.done() call,
// otherwise the stack pointer may no longer be valid.
wg.done();
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
{
std::unique_lock<std::mutex> lock(mutex);
signaled = true;
cv.notify_all();
}
wg.wait();
}
}
......@@ -16,10 +16,12 @@
#include "benchmark/benchmark.h"
volatile int do_not_optimize_away_result = 0;
static void Defer(benchmark::State& state) {
int i = 0;
for (auto _ : state) {
defer(benchmark::DoNotOptimize(i++));
// Avoid benchmark::DoNotOptimize() as this is unfairly slower on Windows.
defer(do_not_optimize_away_result++);
}
}
BENCHMARK(Defer);
......@@ -115,6 +115,7 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber(
out->target = func;
auto res = getcontext(&out->context);
(void)res;
MARL_ASSERT(res == 0, "getcontext() returned %d", int(res));
out->context.uc_stack.ss_sp = out->stack.ptr;
out->context.uc_stack.ss_size = stackSize;
......@@ -130,6 +131,7 @@ Allocator::unique_ptr<OSFiber> OSFiber::createFiber(
void OSFiber::switchTo(OSFiber* fiber) {
auto res = swapcontext(&context, &fiber->context);
(void)res;
MARL_ASSERT(res == 0, "swapcontext() returned %d", int(res));
}
......
......@@ -133,12 +133,17 @@ Scheduler::Scheduler(Allocator* allocator /* = Allocator::Default */)
}
Scheduler::~Scheduler() {
#if MARL_DEBUG_ENABLED
{
std::unique_lock<std::mutex> lock(singleThreadedWorkerMutex);
MARL_ASSERT(singleThreadedWorkers.size() == 0,
"Scheduler still bound on %d threads",
int(singleThreadedWorkers.size()));
}
#endif // MARL_DEBUG_ENABLED
// Release all worker threads.
// This will wait for all in-flight tasks to complete before returning.
setWorkerThreadCount(0);
}
......@@ -401,6 +406,7 @@ void Scheduler::Worker::stop() {
}
}
_Requires_lock_held_(waitLock)
bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
const TimePoint* timeout,
const Predicate& pred) {
......@@ -422,17 +428,20 @@ bool Scheduler::Worker::wait(Fiber::Lock& waitLock,
// Fiber resumed. We don't need the work mutex locked any more.
work.mutex.unlock();
// Re-lock to either return due to timeout, or call pred().
waitLock.lock();
// Check timeout.
if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
return false;
}
// Spurious wake up. Re-lock, spin again.
waitLock.lock();
// Spurious wake up. Spin again.
}
return true;
}
_Requires_lock_held_(work.mutex)
void Scheduler::Worker::suspend(
const std::chrono::system_clock::time_point* timeout) {
// Current fiber is yielding as it is blocked.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment