Easy Worker Communication With MessageComm
ActionScript Workers are great, but they can be tricky to set up and especially debug. Today I’m introducing a couple of helper classes to take some of the pain out of communicating between threads. Read on for the helper class source code (MIT licensed) and an example app that uses it.
The following pair of helper classes represent the two sides of communication between workers/threads: sending and receiving. Let’s start with the simpler of the two—WorkerCommOut
—which sends messages. Here is how you would normally set up a MessageChannel
for sending:
// Get the shared message channel var sharedProp:* = worker.getSharedProperty("mainToWorker"); if (sharedProp is MessageChannel) { mainToWorker = sharedProp as MessageChannel; } // If the shared message channel hasn't been created yet, create it else { mainToWorker = Worker.current.createMessageChannel(worker); worker.setSharedProperty("mainToWorker", mainToWorker); }
And here’s how you do it with WorkerCommOut
:
mainToWorker = new WorkerCommOut(worker, "mainToWorker");
WorkerCommOut
is mostly hiding that work for you. Here’s its source code:
package { import flash.system.MessageChannel; import flash.system.Worker; /** * Outgoing communication channel to a worker * @author Jackson Dunstan, http://JacksonDunstan.com * @license MIT */ public class WorkerCommOut { /** Outgoing message channel */ private var channel:MessageChannel; /** * Create the outgoing communication channel * @param worker Worker to send communications to. Defaults to the * current worker. * @param channelName Name of the message channel to send on */ public function WorkerCommOut(worker:Worker, channelName:String) { // Default worker to current worker if (!worker) { worker = Worker.current; } // Get the shared message channel var sharedProp:* = worker.getSharedProperty(channelName); if (sharedProp is MessageChannel) { channel = sharedProp as MessageChannel; } // If the shared message channel hasn't been created yet, create it else { channel = Worker.current.createMessageChannel(worker); worker.setSharedProperty(channelName, channel); } } /** * Send a message * @param arg Message to send */ public function send(arg:*): void { channel.send(arg); } } }
As you can see, sending messages is identical:
// Normal (MessageChannel) mainToWorker.send("hi"); // WorkerCommOut mainToWorker.send("hi");
Now for incoming message channels, which are a little more complex. Here’s how you’d set one up normally with MessageChannel
:
// Get the shared message channel var sharedProp:* = worker.getSharedProperty("workerToMain"); if (sharedProp is MessageChannel) { workerToMain = sharedProp as MessageChannel; } // If the shared message channel hasn't been created yet, create it else { workerToMain = worker.createMessageChannel(Worker.current); worker.setSharedProperty("workerToMain", workerToMain); } // Listen for messages on the channel workerToMain.addEventListener(Event.CHANNEL_MESSAGE, onChannelMsg); private function onChannelMsg(ev:Event): void { // Receive the message var arg:* = channel.receive(); // ... do something with arg }
And here’s how you do it with WorkerCommIn
:
workerToMain = new WorkerCommIn(worker, "workerToMain", onChannelMsg); private function onChannelMsg(arg:*): void { // ... do something with arg }
Much less code! You can change the callback after the fact and ignore all messages by setting it to null
. Even better, you can set up a simple callback system between threads:
////////////// // Main thread ////////////// workerToMain = new WorkerCommIn(worker, "workerToMain", onChannelMsg, "backToWorker"); private function onChannelMsg(arg:int): int { // respond with double the value return arg * 2; } //////////////// // Worker thread //////////////// backToWorker = new WorkerCommIn(Worker.current, "backToWorker", onBTW); function onBTW(doubled:int): void { // doubled is 44 } workerToMain = new WorkerCommOut(Worker.current, "workerToMain"); workerToMain.send(22);
Here’s the WorkerCommIn
source code:
package { import flash.events.Event; import flash.system.MessageChannel; import flash.system.Worker; /** * Incoming communication channel from a worker * @author Jackson Dunstan, http://JacksonDunstan.com * @license MIT */ public class WorkerCommIn { /** Incoming message channel */ private var channel:MessageChannel; /** Outgoing message channel */ private var out:WorkerCommOut; /** Callback to call when a message is received. Passed any type (*) and *** may return any type. If a value other than undefined is returned and *** the callback channel is defined, the returned value is sent to the *** callback channel. The callback may be set to null to ignore *** messages. */ public var callback:Function; /** * Create the incoming communication channel * @param worker Worker to receive communications from. Defaults to the * current worker. * @param channelName Name of the message channel to receive on * @param callback Initial callback to call when messages are received. * @see callback for more information. * @param callbackChannelName If non-null and non-empty, an outgoing * message channel is set up and messages are * sent automatically when the callback * returns any value other than undefined. */ public function WorkerCommIn( worker:Worker, channelName:String, callback:Function, callbackChannelName:String=null ) { // Default worker to current worker if (!worker) { worker = Worker.current; } // Get the shared message channel var sharedProp:* = worker.getSharedProperty(channelName); if (sharedProp is MessageChannel) { channel = sharedProp as MessageChannel; } // If the shared message channel hasn't been created yet, create it else { channel = worker.createMessageChannel(Worker.current); worker.setSharedProperty(channelName, channel); } // If a callback channel was specified, create an outgoing // WorkerComm to send with if (callbackChannelName) { out = new WorkerCommOut(worker, callbackChannelName); } // Save the initial callback this.callback = callback; // Listen for messages on the channel channel.addEventListener(Event.CHANNEL_MESSAGE, onChannelMsg); } /** * Callback for when messages are received on the message channel * @param ev CHANNEL_MESSAGE event */ private function onChannelMsg(ev:Event): void { // Receive the message var arg:* = channel.receive(); // Callback can be null to ignore messages if (callback != null) { // Pass the message to the callback var ret:* = callback(arg); // If there is an outgoing channel and the callback returned a // value other than undefined, send the returned value to the // outgoing channel if (out && ret !== undefined) { out.send(ret); } } } } }
Now for an example app that uses them. This app is mostly the app from last week designed to test the speed of message passing. Performance remains the same, but the raw MessageChannel
usage has been swapped out for the WorkerComm
classes.
WorkerCommTestMain — the main thread
package { import flash.display.Sprite; import flash.utils.getTimer; import flash.utils.ByteArray; import flash.text.TextField; import flash.text.TextFieldAutoSize; import flash.system.Worker; import flash.system.WorkerDomain; public class WorkerCommTestMain extends Sprite { [Embed(source="WorkerCommTestWorker.swf", mimeType="application/octet-stream")] private static var WORKER_SWF:Class; private var logger:TextField = new TextField(); private function row(...cols): void { logger.appendText(cols.join(",")+"\n"); } private var workerToMainStartup:WorkerCommIn; private var mainToWorker:WorkerCommOut; private var mainToWorkerBytes:WorkerCommOut; private var mainToWorkerClear:WorkerCommOut; private var workerToMain:WorkerCommIn; private var worker:Worker; private var REPS:int = 100000; private var beforeTime:int; private var sum:int; private var correctSum:int; public function WorkerCommTestMain() { logger.autoSize = TextFieldAutoSize.LEFT; addChild(logger); for (var i:int = 0; i < REPS; ++i) { correctSum += i; } row("Type", "Time", "Correct?"); var workerBytes:ByteArray = new WORKER_SWF() as ByteArray; worker = WorkerDomain.current.createWorker(workerBytes); mainToWorker = new WorkerCommOut( worker, "mainToWorker" ); mainToWorkerBytes = new WorkerCommOut( worker, "mainToWorkerBytes" ); mainToWorkerClear = new WorkerCommOut( worker, "mainToWorkerClear" ); workerToMain = new WorkerCommIn( worker, "workerToMain", onWorkerToMainDirect ); workerToMainStartup = new WorkerCommIn( worker, "workerToMainStartup", onWorkerToMainStartup ); worker.start(); } private function onWorkerToMainStartup(arg:*): void { sum = 0; beforeTime = getTimer(); for (var i:int = 0; i < REPS; ++i) { mainToWorker.send(i); } mainToWorkerClear.send(true); } private function onWorkerToMainDirect(arg:*): void { sum = arg; var afterTime:int = getTimer(); row("Direct", (afterTime-beforeTime), (sum==correctSum)); workerToMain.callback = onWorkerToMainBytes; var bytes:ByteArray = new ByteArray(); bytes.shareable = true; bytes.length = REPS*4; bytes.position = 0; sum = 0; beforeTime = getTimer(); for (var i:int; i < REPS; ++i) { bytes.writeInt(i); } mainToWorkerBytes.send(bytes); } private function onWorkerToMainBytes(arg:*): void { sum = arg; var afterTime:int = getTimer(); row("ByteArray", (afterTime-beforeTime), (sum==correctSum)); } } }
WorkerCommTestWorker — the worker thread
package { import flash.display.Sprite; import flash.utils.ByteArray; import flash.system.Worker; import flash.system.WorkerDomain; public class WorkerCommTestWorker extends Sprite { private var workerToMainStartup:WorkerCommOut; private var mainToWorker:WorkerCommIn; private var mainToWorkerBytes:WorkerCommIn; private var mainToWorkerClear:WorkerCommIn; private var sum:int; public function WorkerCommTestWorker() { mainToWorker = new WorkerCommIn( Worker.current, "mainToWorker", onMainToWorker ); mainToWorkerBytes = new WorkerCommIn( Worker.current, "mainToWorkerBytes", onMainToWorkerBytes, "workerToMain" ); mainToWorkerClear = new WorkerCommIn( Worker.current, "mainToWorkerClear", onMainToWorkerClear, "workerToMain" ); workerToMainStartup = new WorkerCommOut( Worker.current, "workerToMainStartup" ); workerToMainStartup.send(true); } private function onMainToWorker(arg:*): void { sum += arg; } private function onMainToWorkerBytes(arg:*): int { var bytes:ByteArray = arg; bytes.position = 0; var numInts:int = bytes.length / 4; var sum:int; for (var i:int; i < numInts; ++i) { sum += bytes.readInt(); } return sum; } private function onMainToWorkerClear(arg:*): int { var ret:int = sum; sum = 0; return ret; } } }
Hopefully WorkerCommIn
and WorkerCommOut
will save you from typing a lot of boilerplate code and, even better, the bugs that inevitably come with it.
Spot a bug in either class or the example? Have a question or suggestion? Post a comment!
#1 by Mateo on October 7th, 2013 ·
Awesome Jackson, Thanks very much!
#2 by Zwick on November 5th, 2013 ·
Thx for sharing!
#3 by max troost on November 28th, 2013 ·
Hi Jackson, very nice this :)
i have only 1 question regarding using these helper classes. how would i implement Mutex in all of this?
thanx for all the great stuff.
Max
#4 by jackson on November 29th, 2013 ·
While it doesn’t directly have a
Mutex
-based version ofWorkerComm
, there is a followup article onMutex
that should give you a primer if you want to adapt it yourself. Or perhaps I’ll adapt it in a future article. :)