Send ActionScript Worker Messages 6x Faster With Mutex
While ActionScript Workers made their debut in Flash Player 11.4, the Mutex
class didn’t arrive until the next version: 11.5. This class is specifically designed to solve a subtle problem that cropped up in the last article. As you’ll see in this article, it does the job quite well! The result is even faster message passing between workers/threads, which is often key to efficiently using multiple core CPUs.
The Mutex class is special in a couple of ways. First, when you call Thread.setSharedProperty
and pass in a Mutex
, the Mutex
object is not copied to the other worker. Instead, the other worker has the same Mutex
object. This behavior is key to how Mutex
works, as you’ll soon see.
The second special functionality is the sole reason for Mutex
to exist. It provides a mechanism for you to “lock” and “unlock” it, but only one worker can lock the Mutex
at a time. Let’s see a quick example:
function doubleFirstByte(bytes:ByteArray, mutex:Mutex): void { // Lock the Mutex. If the Mutex is already locked by another worker, // this worker will be suspended until the Mutex is unlocked. At that // point the lock() function will return. mutex.lock(); // The Mutex has now been locked by our thread. In this example we // take that to mean that the ByteArray is free for us to manipulate. // Here we simply double the first byte's value. bytes[0] *= 2; // Now that we're done with the Mutex we need to tell other workers // so they can resume and their calls to lock() will return. Calling // unlock does exactly this. mutex.unlock(); }
Imagine two workers running the doubleFirstByte
function. If it weren’t for the Mutex
they might execute in an arbitrary order. Here is one scenario where it would all go wrong:
Initial value is set to 1 Worker A executes and reads 1 Worker B executes and reads 1 Worker A executes and writes 2 Worker B executes and writes 2 Final result after two runs: 2 Expected result: 4
Any number of scenarios could play out because you don’t know the order that the threads will execute in. Hoping for the best simply won’t do, so you need a way to communicate between the threads and avoid contention over the shared ByteArray
. The first article used a MessageChannel
to communicate, but it was quite slow. The the last article directly passed messages via Thread.setSharedProperty
and achieved a 2.5x speedup, but it employed a tight loop to continuously call Thread.getSharedProperty
until a response from the other worker was received:
// Check for the incoming message over and over until it's received while (worker.getSharedProperty("message") == inMsg) { }
Essentially, the receiving worker would spend 100% of the CPU time it was allotted to simply check for incoming messages. With a Mutex
, it can simply suspend execution until one is received.
Let’s see how that strategy works out. The following is an amended version of the last article’s small test application that includes a Mutex
-based test as shown above.
package { import flash.display.Sprite; import flash.events.Event; import flash.utils.getTimer; import flash.utils.ByteArray; import flash.text.TextField; import flash.text.TextFieldAutoSize; import flash.system.Worker; import flash.system.WorkerDomain; import flash.system.WorkerState; import flash.system.MessageChannel; import flash.concurrent.Mutex; /** * Test to show the speed of passing messages between workers via three * methods: MessageChannel, setSharedProperty with a tight polling loop, and * setSharedProperty with a Mutex. * @author Jackson Dunstan (JacksonDunstan.com) */ public class MutexTest extends Sprite { /** Output logger */ private var logger:TextField = new TextField(); /** * Log a CSV row * @param cols Columns of the row */ private function row(...cols): void { logger.appendText(cols.join(",")+"\n"); } /** Message channel from the main thread to the worker thread */ private var mainToWorker:MessageChannel; /** Message channel from the worker thread to the main thread */ private var workerToMain:MessageChannel; /** The worker thread (main thread only) */ private var worker:Worker; /** Number of messages to send back and forth in the test */ private var REPS:int = 1000; /** Time before the message passing test started */ private var beforeTime:int; /** Current message index */ private var cur:int; /** * Start the app in main thread or worker thread mode */ public function MutexTest() { // Setup the logger logger.autoSize = TextFieldAutoSize.LEFT; addChild(logger); // If this is the main SWF, start the main thread if (Worker.current.isPrimordial) { startMainThread(); } // If this is the worker thread SWF, start the worker thread else { startWorkerThread(); } } /** * Start the main thread */ private function startMainThread(): void { // Try to get a very good framerate stage.frameRate = 60; // Create the worker from our own SWF bytes worker = WorkerDomain.current.createWorker(this.loaderInfo.bytes); // Create a message channel to send to the worker thread mainToWorker = Worker.current.createMessageChannel(worker); worker.setSharedProperty("mainToWorker", mainToWorker); // Create a message channel to receive from the worker thread workerToMain = worker.createMessageChannel(Worker.current); workerToMain.addEventListener(Event.CHANNEL_MESSAGE, onWorkerToMainDirect); worker.setSharedProperty("workerToMain", workerToMain); // Start the worker worker.start(); // Begin the test where we use MessageChannel to send messages // between the threads by sending the first message beforeTime = getTimer(); mainToWorker.send("1"); } /** * Start the worker thread */ private function startWorkerThread(): void { // Get the message channels the main thread set up for communication // between the threads mainToWorker = Worker.current.getSharedProperty("mainToWorker"); workerToMain = Worker.current.getSharedProperty("workerToMain"); mainToWorker.addEventListener(Event.CHANNEL_MESSAGE, onMainToWorker); } /** * Callback for when a message has been received from the main thread to * the worker thread on a MessageChannel * @param ev CHANNEL_MESSAGE event */ private function onMainToWorker(ev:Event): void { // Record the message and send a response cur++; workerToMain.send("1"); // If this was the last message, prepare for the next test where the // two threads communicate with shared properties if (cur == REPS) { // We receive "1" on our own Thread (the worker thread) and send // "2" in response for the setSharedProperty and mutex tests setSharedPropertyTest(Worker.current, "1", "2", false); mutexTest(Worker.current, "1", "2", false); } } /** * Callback for when the worker thread sends a message to the main thread * via a MessageChannel * @param ev CHANNEL_MESSAGE event */ private function onWorkerToMainDirect(ev:Event): void { // Record the message and show progress (this version is slow) cur++; logger.text = "MessageChannel: " + cur + " / " + REPS; // If this wasn't the last message, send another message to the // worker thread if (cur < REPS) { mainToWorker.send("1"); return; } // The MessageChannel test is done. Record the time it took. var afterTime:int = getTimer(); var messageChannelTime:int = afterTime - beforeTime; // Run the setSharedProperty test where the two threads communicate // by directly setting shared properties on the worker thread. The // main thread receives "2", sends "1", and is responsible for // starting the process with an initial "1" message. beforeTime = getTimer(); setSharedPropertyTest(worker, "2", "1", true); afterTime = getTimer(); var setSharedPropertyTime:int = afterTime - beforeTime; // Run the mutex test where the two threads communicate by directly // setting shared properties on the worker thread and pause // execution via a mutex instead of constantly polling via a tight // while loop. The main thread receives "2", sends "1", and is // responsible for starting the process with an initial "1" message. beforeTime = getTimer(); mutexTest(worker, "2", "1", true); afterTime = getTimer(); var mutexTime:int = afterTime - beforeTime; // Clear the logger and show the results instead logger.text = ""; row("Type", "Time", "Messages/sec"); var messagesPerSecond:Number = messageChannelTime/Number(REPS); row("MessageChannel", messageChannelTime, messagesPerSecond); messagesPerSecond = setSharedPropertyTime/Number(REPS); row("setSharedProperty", setSharedPropertyTime, messagesPerSecond); messagesPerSecond = mutexTime/Number(REPS); row("mutex", mutexTime, messagesPerSecond); } /** * Perform the setSharedProperty test where the two threads communicate * by directly setting shared properties on the worker thread. This * version uses a tight polling loop to repeatedly check until the * incoming message is ready. * @param worker Worker the shared properties are set on * @param inMessage Expected message this thread receives from the other * @param outMessage Message to send to the other thread * @param sendInitial If an initial message should be sent */ private function setSharedPropertyTest( worker:Worker, inMsg:String, outMsg:String, sendInitial:Boolean ): void { // Reset the count from the first test cur = 0; // Optionally send an initial outgoing message to start the process if (sendInitial) { worker.setSharedProperty("message", outMsg); } // Send messages until we've hit the limit while (cur < REPS) { // Check to see if the shared property is the incoming message if (worker.getSharedProperty("message") == inMsg) { // Record the message and send a response by setting the // shared property to the outgoing message cur++; worker.setSharedProperty("message", outMsg); } } } /** * Perform the setSharedProperty test where the two threads communicate * by directly setting shared properties on the worker thread. This * version uses a Mutex to pause execution of the worker/thread until the * incoming message is ready. * @param worker Worker the shared properties are set on * @param inMessage Expected message this thread receives from the other * @param outMessage Message to send to the other thread * @param sendInitial If an initial message should be sent */ private function mutexTest( worker:Worker, inMsg:String, outMsg:String, sendInitial:Boolean ): void { // Reset the count from the first test cur = 0; // Optionally send an initial outgoing message to start the process var mutex:Mutex; if (sendInitial) { mutex = new Mutex(); worker.setSharedProperty("mutex", mutex); worker.setSharedProperty("message", outMsg); } else { // Wait for the main thread to send the mutex do { mutex = worker.getSharedProperty("mutex") as Mutex; } while (mutex == null); } // Send messages until we've hit the limit while (cur < REPS) { // Wait for the other thread to unlock the mutex. When they do, // the incoming message is ready. mutex.lock(); // Record the message and send a response by setting the // shared property to the outgoing message cur++; worker.setSharedProperty("message", outMsg); // Notify the other thread that the outgoing message is ready // by unlocking the mutex mutex.unlock(); } } } }
I ran this test in the following environment:
- Release version of Flash Player 11.9.900.117
- 2.3 Ghz Intel Core i7
- Mac OS X 10.9.0
- Google Chrome 30.0.1599.101
- ASC 2.0.0 build 353981 (
-debug=false -verbose-stacktraces=false -inline -optimize=true
)
And here are the results I got:
Type | Time | Messages/sec |
---|---|---|
MessageChannel | 153 | 6535.9 |
setSharedProperty | 62 | 16129 |
mutex | 25 | 40000 |
The Mutex
code is definitely doing its job. Each worker/thread no longer needs to spend 100% of its CPU time continually checking for messages, but instead can simply suspend until a message is ready. This pays off with an over 2x speedup from the setSharedProperty
method and over 6x speedup from the convenient-yet-slow MessageChannel
method. Is the speedup worth requiring Flash Player 11.5? That decision is up to you.
Spot a bug? Have a question or suggestion? Post a comment!
#1 by Mateo on November 4th, 2013 ·
Keep it up coming Jackson– these Worker tips are really helping me out!
THANKS!
#2 by Simon Gladman on November 5th, 2013 ·
Cheers! I’ve been hammering workers for ages, but haven’t used Mutex – I’ll give this a go tonight. Thanks a million!
simon
#3 by Simon Gladman on November 6th, 2013 ·
A quick tip – you may need to add…
-swf-version=18
…to your compiler arguments to get mutex to work. My SWF silently failed until I did.
simon
#4 by jackson on November 6th, 2013 ·
Good point. I didn’t explicitly include the version-specific parameters. You probably also want to add
--target-player=11.5.0
to tell the compiler to use the 11.5 version of playerglobal.swc, the first version to include theMutex
class.#5 by Glidias on February 11th, 2014 ·
Mutex is pretty cool, preventing the need to wait for callback notifications via MessageChannel listeners. One of the features is also a “tryLock()” method, which allows you to determine if it’s currently busy at the moment, and if so, maybe do something else while waiting for the other threads to finish their tasks. I assume the current flash player pauses when Mutex is is waiting for it to be released, if one calls lock() but it’s currently busy? Does it literally pause the current flash player instance at the particular line when lock() is called? Can this potentially hang your app if you forgot to release the Mutex in another thread? To simply check (ie. peek) into whether Mutex is busy or not without locking it, i guess I’d have to do a “if (mutex.tryLock()) mutex.unlock();” statement?? A bit convoluted, but i guess it makes sense since i’m accessing the mutex’s lock state at the moment, so others cannot change it while those lines of code are running on my end (the assertion must persistently hold true within the “if(true)” block, obviously…).
#6 by loover on August 8th, 2014 ·
Hi! Its possible accelerate send/receive byteArray with Mutex in this example http://esdot.ca/site/2012/intro-to-as3-workers-part-2-image-processing ? Example use timer for sharpen image, messageChannel.send() to Worker if slider update and worker send updated byteArray to main tread. But even if change timer delay to 10 ms, image updated with lag.
#7 by Tadeu on March 21st, 2018 ·
It is incredible that no one noticed that… on the mutex test you simply unlock() the mutex… this don’t “Notify the other thread that the outgoing message is ready by unlocking the mutex”, instead you are not allowing the worker to work because you lock() the mutex again in the next instruction of your thread… so simple add this to see that the worker didn’t woked at all (you will see only lots of 1 as outMsg):
#8 by Tadeu on March 21st, 2018 ·
Ok the code above proof nothing because logger isn’t shared between workers, so I’ve made this changes to proof the point:
#9 by Tadeu on March 21st, 2018 ·
Assuming we want communication between threads where only one thread speaks one thing at a time and the other listens one thing at a time and vice versa. Synchronous communication shows horrible results for the use of mutex.lock (). Instead, on my end, that checking if new message arrived with messageChannel.receive () is the best option.