ActionScript Workers are great, but they can be tricky to set up and especially debug. Today I’m introducing a couple of helper classes to take some of the pain out of communicating between threads. Read on for the helper class source code (MIT licensed) and an example app that uses it.

The following pair of helper classes represent the two sides of communication between workers/threads: sending and receiving. Let’s start with the simpler of the two—WorkerCommOut—which sends messages. Here is how you would normally set up a MessageChannel for sending:

// Get the shared message channel
var sharedProp:* = worker.getSharedProperty("mainToWorker");
if (sharedProp is MessageChannel)
{
	mainToWorker = sharedProp as MessageChannel;
}
// If the shared message channel hasn't been created yet, create it
else
{
	mainToWorker = Worker.current.createMessageChannel(worker);
	worker.setSharedProperty("mainToWorker", mainToWorker);
}

And here’s how you do it with WorkerCommOut:

mainToWorker = new WorkerCommOut(worker, "mainToWorker");

WorkerCommOut is mostly hiding that work for you. Here’s its source code:

package
{
	import flash.system.MessageChannel;
	import flash.system.Worker;
 
	/**
	* Outgoing communication channel to a worker
	* @author Jackson Dunstan, http://JacksonDunstan.com
	* @license MIT
	*/
	public class WorkerCommOut
	{
		/** Outgoing message channel */
		private var channel:MessageChannel;
 
		/**
		* Create the outgoing communication channel
		* @param worker Worker to send communications to. Defaults to the
		*               current worker.
		* @param channelName Name of the message channel to send on
		*/
		public function WorkerCommOut(worker:Worker, channelName:String)
		{
			// Default worker to current worker
			if (!worker)
			{
				worker = Worker.current;
			}
 
			// Get the shared message channel
			var sharedProp:* = worker.getSharedProperty(channelName);
			if (sharedProp is MessageChannel)
			{
				channel = sharedProp as MessageChannel;
			}
			// If the shared message channel hasn't been created yet, create it
			else
			{
				channel = Worker.current.createMessageChannel(worker);
				worker.setSharedProperty(channelName, channel);
			}
		}
 
		/**
		* Send a message
		* @param arg Message to send
		*/
		public function send(arg:*): void
		{
			channel.send(arg);
		}
	}
}

As you can see, sending messages is identical:

// Normal (MessageChannel)
mainToWorker.send("hi");
 
// WorkerCommOut
mainToWorker.send("hi");

Now for incoming message channels, which are a little more complex. Here’s how you’d set one up normally with MessageChannel:

// Get the shared message channel
var sharedProp:* = worker.getSharedProperty("workerToMain");
if (sharedProp is MessageChannel)
{
	workerToMain = sharedProp as MessageChannel;
}
// If the shared message channel hasn't been created yet, create it
else
{
	workerToMain = worker.createMessageChannel(Worker.current);
	worker.setSharedProperty("workerToMain", workerToMain);
}
 
// Listen for messages on the channel
workerToMain.addEventListener(Event.CHANNEL_MESSAGE, onChannelMsg);
 
private function onChannelMsg(ev:Event): void
{
	// Receive the message
	var arg:* = channel.receive();
 
	// ... do something with arg
}

And here’s how you do it with WorkerCommIn:

workerToMain = new WorkerCommIn(worker, "workerToMain", onChannelMsg);
 
private function onChannelMsg(arg:*): void
{
	// ... do something with arg
}

Much less code! You can change the callback after the fact and ignore all messages by setting it to null. Even better, you can set up a simple callback system between threads:

//////////////
// Main thread
//////////////
workerToMain = new WorkerCommIn(worker, "workerToMain", onChannelMsg, "backToWorker");
 
private function onChannelMsg(arg:int): int
{
	// respond with double the value
	return arg * 2;
}
 
////////////////
// Worker thread
////////////////
backToWorker = new WorkerCommIn(Worker.current, "backToWorker", onBTW);
function onBTW(doubled:int): void
{
	// doubled is 44
}
 
workerToMain = new WorkerCommOut(Worker.current, "workerToMain");
workerToMain.send(22);

Here’s the WorkerCommIn source code:

package
{
	import flash.events.Event;
	import flash.system.MessageChannel;
	import flash.system.Worker;
 
	/**
	* Incoming communication channel from a worker
	* @author Jackson Dunstan, http://JacksonDunstan.com
	* @license MIT
	*/
	public class WorkerCommIn
	{
		/** Incoming message channel */
		private var channel:MessageChannel;
 
		/** Outgoing message channel */
		private var out:WorkerCommOut;
 
		/** Callback to call when a message is received. Passed any type (*) and
		*** may return any type. If a value other than undefined is returned and
		*** the callback channel is defined, the returned value is sent to the
		*** callback channel. The callback may be set to null to ignore
		*** messages. */
		public var callback:Function;
 
		/**
		* Create the incoming communication channel
		* @param worker Worker to receive communications from. Defaults to the
		*               current worker.
		* @param channelName Name of the message channel to receive on
		* @param callback Initial callback to call when messages are received.
		*                 @see callback for more information.
		* @param callbackChannelName If non-null and non-empty, an outgoing
		*                            message channel is set up and messages are
		*                            sent automatically when the callback
		*                            returns any value other than undefined.
		*/
		public function WorkerCommIn(
			worker:Worker,
			channelName:String,
			callback:Function,
			callbackChannelName:String=null
		)
		{
			// Default worker to current worker
			if (!worker)
			{
				worker = Worker.current;
			}
 
			// Get the shared message channel
			var sharedProp:* = worker.getSharedProperty(channelName);
			if (sharedProp is MessageChannel)
			{
				channel = sharedProp as MessageChannel;
			}
			// If the shared message channel hasn't been created yet, create it
			else
			{
				channel = worker.createMessageChannel(Worker.current);
				worker.setSharedProperty(channelName, channel);
			}
 
			// If a callback channel was specified, create an outgoing
			// WorkerComm to send with
			if (callbackChannelName)
			{
				out = new WorkerCommOut(worker, callbackChannelName);
			}
 
			// Save the initial callback
			this.callback = callback;
 
			// Listen for messages on the channel
			channel.addEventListener(Event.CHANNEL_MESSAGE, onChannelMsg);
		}
 
		/**
		* Callback for when messages are received on the message channel
		* @param ev CHANNEL_MESSAGE event
		*/
		private function onChannelMsg(ev:Event): void
		{
			// Receive the message
			var arg:* = channel.receive();
 
			// Callback can be null to ignore messages
			if (callback != null)
			{
				// Pass the message to the callback
				var ret:* = callback(arg);
 
				// If there is an outgoing channel and the callback returned a
				// value other than undefined, send the returned value to the
				// outgoing channel
				if (out && ret !== undefined)
				{
					out.send(ret);
				}
			}
		}
	}
}

Now for an example app that uses them. This app is mostly the app from last week designed to test the speed of message passing. Performance remains the same, but the raw MessageChannel usage has been swapped out for the WorkerComm classes.

WorkerCommTestMain — the main thread
package
{
	import flash.display.Sprite;
	import flash.utils.getTimer;
	import flash.utils.ByteArray;
	import flash.text.TextField;
	import flash.text.TextFieldAutoSize;
	import flash.system.Worker;
	import flash.system.WorkerDomain;
 
	public class WorkerCommTestMain extends Sprite
	{
		[Embed(source="WorkerCommTestWorker.swf", mimeType="application/octet-stream")]
		private static var WORKER_SWF:Class;
 
		private var logger:TextField = new TextField();
		private function row(...cols): void
		{
			logger.appendText(cols.join(",")+"\n");
		}
 
		private var workerToMainStartup:WorkerCommIn;
		private var mainToWorker:WorkerCommOut;
		private var mainToWorkerBytes:WorkerCommOut;
		private var mainToWorkerClear:WorkerCommOut;
		private var workerToMain:WorkerCommIn;
 
		private var worker:Worker;
 
		private var REPS:int = 100000;
		private var beforeTime:int;
		private var sum:int;
		private var correctSum:int;
 
		public function WorkerCommTestMain()
		{
			logger.autoSize = TextFieldAutoSize.LEFT;
			addChild(logger);
 
			for (var i:int = 0; i < REPS; ++i)
			{
				correctSum += i;
			}
 
			row("Type", "Time", "Correct?");
 
			var workerBytes:ByteArray = new WORKER_SWF() as ByteArray;
			worker = WorkerDomain.current.createWorker(workerBytes);
 
			mainToWorker = new WorkerCommOut(
				worker,
				"mainToWorker"
			);
			mainToWorkerBytes = new WorkerCommOut(
				worker,
				"mainToWorkerBytes"
			);
			mainToWorkerClear = new WorkerCommOut(
				worker,
				"mainToWorkerClear"
			);
			workerToMain = new WorkerCommIn(
				worker,
				"workerToMain",
				onWorkerToMainDirect
			);
			workerToMainStartup = new WorkerCommIn(
				worker,
				"workerToMainStartup",
				onWorkerToMainStartup
			);
 
			worker.start();
		}
 
		private function onWorkerToMainStartup(arg:*): void
		{
			sum = 0;
			beforeTime = getTimer();
			for (var i:int = 0; i < REPS; ++i)
			{
				mainToWorker.send(i);
			}
			mainToWorkerClear.send(true);
		}
 
		private function onWorkerToMainDirect(arg:*): void
		{
			sum = arg;
 
			var afterTime:int = getTimer();
			row("Direct", (afterTime-beforeTime), (sum==correctSum));
 
			workerToMain.callback = onWorkerToMainBytes;
 
			var bytes:ByteArray = new ByteArray();
			bytes.shareable = true;
			bytes.length = REPS*4;
			bytes.position = 0;
 
			sum = 0;
			beforeTime = getTimer();
			for (var i:int; i < REPS; ++i)
			{
				bytes.writeInt(i);
			}
			mainToWorkerBytes.send(bytes);
		}
 
		private function onWorkerToMainBytes(arg:*): void
		{
			sum = arg;
 
			var afterTime:int = getTimer();
			row("ByteArray", (afterTime-beforeTime), (sum==correctSum));
		}
	}
}
WorkerCommTestWorker — the worker thread
package
{
	import flash.display.Sprite;
	import flash.utils.ByteArray;
	import flash.system.Worker;
	import flash.system.WorkerDomain;
 
	public class WorkerCommTestWorker extends Sprite
	{
		private var workerToMainStartup:WorkerCommOut;
		private var mainToWorker:WorkerCommIn;
		private var mainToWorkerBytes:WorkerCommIn;
		private var mainToWorkerClear:WorkerCommIn;
 
		private var sum:int;
 
		public function WorkerCommTestWorker()
		{
			mainToWorker = new WorkerCommIn(
				Worker.current,
				"mainToWorker",
				onMainToWorker
			);
			mainToWorkerBytes = new WorkerCommIn(
				Worker.current,
				"mainToWorkerBytes",
				onMainToWorkerBytes,
				"workerToMain"
			);
			mainToWorkerClear = new WorkerCommIn(
				Worker.current,
				"mainToWorkerClear",
				onMainToWorkerClear,
				"workerToMain"
			);
			workerToMainStartup = new WorkerCommOut(
				Worker.current,
				"workerToMainStartup"
			);
 
			workerToMainStartup.send(true);
		}
 
		private function onMainToWorker(arg:*): void
		{
			sum += arg;
		}
 
		private function onMainToWorkerBytes(arg:*): int
		{
			var bytes:ByteArray = arg;
			bytes.position = 0;
			var numInts:int = bytes.length / 4;
 
			var sum:int;
			for (var i:int; i < numInts; ++i)
			{
				sum += bytes.readInt();
			}
 
			return sum;
		}
 
		private function onMainToWorkerClear(arg:*): int
		{
			var ret:int = sum;
			sum = 0;
			return ret;
		}
	}
}

Hopefully WorkerCommIn and WorkerCommOut will save you from typing a lot of boilerplate code and, even better, the bugs that inevitably come with it.

Spot a bug in either class or the example? Have a question or suggestion? Post a comment!