I also need to give tremendous credit for the Typesafe team that gave me the inspiration for coming up with a solution to what I feel is a slight programming model problem. You can read the original posts here.
Problem Statements
Lets talk a bit about some of the difficult areas of developing with the Akka actor framework:
- Your receive block is statically defined for all the messages an actor wants to receive. Yes, you can implement state changes using become() and unbecome(). But even then, the actor could only receive one set of messages at a time. If you create an actor to aggregate data from multiple sources, given there are dependencies between the sources, some messages may only be expected if certain preconditions exist or if the logic follows a certain path in the dependency graph. Declaring the pattern of the message to be received in a static way will need to deal with qualifiers (when is it OK to receive this message?) to accept or not accept the message in certain conditions.
- Whenever it is decided in the logic flow that a certain message is expected, a receive pattern needs to be added in the receive block. This is not DRY. The developer and any reader of this code will see the logic fragmented between the sender of the request and the receive block.
- Risk of unintentional closing over. Yes, it can be avoided by good coders but the risk of running into this issue is very real. This would happen commonly in many places in actor code:
- When using ask
- When using inner actors
- When using futures
- Even when using the scheduler to schedule a timeout to yourself.
- The use of ask to expect a certain message when a request is sent seems like a good way to get out of problem #1 (statically defining the receive block) but ask has some of these other deficiencies:
- It creates a temporary actor for each ask which results in higher overhead
- Processing the results of the ask via a future can easily close over the asking actor.
Some Rules to Follow or Break
Especially the close-over problem brought me to set a couple of concrete rules on writing the actors as follows:
- No inner actors. If you need to transfer the state from a stateless (listening) actor to a stateful (executing) actor, create the actor instance and forward the message to the actor instance instead.
- Scheduler calls ONLY send this actor a message. No direct calls to the actor implementation.
Also Promises and Futures are to be used with extreme care. Future callbacks can run on different threads and close-over their actors.
Again rules are rules and are subject to be broken, but only if you really know what you're doing. For the mere mortal rest of us - please stick to them. Or you can end up in no mans land where the behavior becomes undefined.
Again rules are rules and are subject to be broken, but only if you really know what you're doing. For the mere mortal rest of us - please stick to them. Or you can end up in no mans land where the behavior becomes undefined.
The Aggregator Pattern
To facilitate an almost 'ask' like behavior where we could dynamically add what message we're expecting but without the drawbacks of ask, I put together the Aggregator trait to be mixed into actors. This trait takes over the receive block (just like many other traits such as FSM). The trait provides expect, expectOnce, and unexpect calls that can be used as in the samples below...
// Sending a message expecting one response
context.actorOf(Props[FooBarActor]) ! FooBarRequest(reqId)
expectOnce {
case FooBarResponse(reqId, content) => println(content)
}
// Always expecting a certain message like an exception from other actors
expect {
case FooBarException(message, cause) => println(message)
}
// Expecting a message
val expectHandle = expect {
case FooBarException(message, cause) => println(message)
}
// At some point, we want to no longer expect - this also works with expectOnce
unexpect(expectHandle)
We'll base the code on the AccountBalanceRetriever sample, just adding a twist allowing caller an option to select certain account types to display, which results in the actor responding with a subset of all account types. The first part of the code is just a shameless copy of what is already out there with just the case objects added for identifying the account types, and few other case objects used for timing out, and so forth...
Next is the actual AccountBalanceRetriever using the aggregator pattern.
As you can see, the aggregator patterns allows specifying custom expect or expectOnce blocks dynamically based on what message we are expecting in what conditions. For instance, if the types initially requested do not include the money-market account, we would never expect a MoneyMarketAccountBalances message and such a rogue message arriving will just not be handled by the actor. The code for this AccountBalanceRetriever can be found here.
Orchestration flows commonly make use of the aggregator pattern which obtains data from multiple sources. Even these data sources are dynamic in nature. The same aggregator may touch upon some data sources in some conditions and others in other conditions, thus demanding a more dynamic actor receive configuration. In addition, the use of inner actors and techniques for timing out an actor have a high risk of closing over. These can sometimes be hard to identify by the casual developers eye. We should have a solid and safe to use programming pattern with the Aggregator trait and the simple set of rules applied to prevent closing over.
Happy coding, and follow the rules. Don't engage in risky behavior!!!
import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal
import akka.actor._
sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType
case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
case class GetAccountBalances(id: Long)
case class AccountBalances(accountType: AccountType,
balance: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case object TimedOut
case object CantUnderstand
class SavingsAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
}
}
class CheckingAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender ! CheckingAccountBalances(Some(List((3, 15000))))
}
}
class MoneyMarketAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender ! MoneyMarketAccountBalances(None)
}
}
Next is the actual AccountBalanceRetriever using the aggregator pattern.
class AccountBalanceRetriever extends Actor with Aggregator {
import context._
expectOnce {
case GetCustomerAccountBalances(id, types) =>
new AccountAggregator(sender, id, types)
case _ =>
sender ! CantUnderstand
context.stop(self)
}
class AccountAggregator(originalSender: ActorRef,
id: Long, types: Set[AccountType]) {
val results =
mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]
if (types.size > 0)
types foreach {
case Checking => fetchCheckingAccountsBalance()
case Savings => fetchSavingsAccountsBalance()
case MoneyMarket => fetchMoneyMarketAccountsBalance()
}
else collectBalances() // Empty type list yields empty response
context.system.scheduler.scheduleOnce(250 milliseconds) {
self ! TimedOut
}
expect {
case TimedOut => collectBalances(force = true)
}
def fetchCheckingAccountsBalance() {
context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case CheckingAccountBalances(balances) =>
results += (Checking -> balances)
collectBalances()
}
}
def fetchSavingsAccountsBalance() {
context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case SavingsAccountBalances(balances) =>
results += (Savings -> balances)
collectBalances()
}
}
def fetchMoneyMarketAccountsBalance() {
context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case MoneyMarketAccountBalances(balances) =>
results += (MoneyMarket -> balances)
collectBalances()
}
}
def collectBalances(force: Boolean = false) {
if (results.size == types.size || force) {
originalSender ! results.toList // Make sure it becomes immutable
context.stop(self)
}
}
}
}
As you can see, the aggregator patterns allows specifying custom expect or expectOnce blocks dynamically based on what message we are expecting in what conditions. For instance, if the types initially requested do not include the money-market account, we would never expect a MoneyMarketAccountBalances message and such a rogue message arriving will just not be handled by the actor. The code for this AccountBalanceRetriever can be found here.
Conclusion
Orchestration flows commonly make use of the aggregator pattern which obtains data from multiple sources. Even these data sources are dynamic in nature. The same aggregator may touch upon some data sources in some conditions and others in other conditions, thus demanding a more dynamic actor receive configuration. In addition, the use of inner actors and techniques for timing out an actor have a high risk of closing over. These can sometimes be hard to identify by the casual developers eye. We should have a solid and safe to use programming pattern with the Aggregator trait and the simple set of rules applied to prevent closing over.
Happy coding, and follow the rules. Don't engage in risky behavior!!!