Friday, August 23, 2013

An Aggregator Pattern for the Akka Actor Framework

Even before I start, I need to give full credit of the prior work to Jamie Allen for his tutorial at http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html. This post is merely an attempt to improve some of the code I still feel is hard to understand and risky to use, no pun intended.

I also need to give tremendous credit for the Typesafe team that gave me the inspiration for coming up with a solution to what I feel is a slight programming model problem. You can read the original posts here.

Problem Statements


Lets talk a bit about some of the difficult areas of developing with the Akka actor framework:

  1. Your receive block is statically defined for all the messages an actor wants to receive. Yes, you can implement state changes using become() and unbecome(). But even then, the actor could only receive one set of messages at a time. If you create an actor to aggregate data from multiple sources, given there are dependencies between the sources, some messages may only be expected if certain preconditions exist or if the logic follows a certain path in the dependency graph. Declaring the pattern of the message to be received in a static way will need to deal with qualifiers (when is it OK to receive this message?) to accept or not accept the message in certain conditions.
  2. Whenever it is decided in the logic flow that a certain message is expected, a receive pattern needs to be added in the receive block. This is not DRY. The developer and any reader of this code will see the logic fragmented between the sender of the request and the receive block.
  3. Risk of unintentional closing over. Yes, it can be avoided by good coders but the risk of running into this issue is very real. This would happen commonly in many places in actor code:
    • When using ask
    • When using inner actors
    • When using futures
    • Even when using the scheduler to schedule a timeout to yourself.
  4. The use of ask to expect a certain message when a request is sent seems like a good way to get out of problem #1 (statically defining the receive block) but ask has some of these other deficiencies:
    • It creates a temporary actor for each ask which results in higher overhead
    • Processing the results of the ask via a future can easily close over the asking actor.
    For those reasons above, and probably more... tell, don't ask!

Some Rules to Follow or Break


Especially the close-over problem brought me to set a couple of concrete rules on writing the actors as follows:
  • No inner actors. If you need to transfer the state from a stateless (listening) actor to a stateful (executing) actor, create the actor instance and forward the message to the actor instance instead.
  • Scheduler calls ONLY send this actor a message. No direct calls to the actor implementation.
Also Promises and Futures are to be used with extreme care. Future callbacks can run on different threads and close-over their actors.

Again rules are rules and are subject to be broken, but only if you really know what you're doing. For the mere mortal rest of us - please stick to them. Or you can end up in no mans land where the behavior becomes undefined.

The Aggregator Pattern


To facilitate an almost 'ask' like behavior where we could dynamically add what message we're expecting but without the drawbacks of ask, I put together the Aggregator trait to be mixed into actors. This trait takes over the receive block (just like many other traits such as FSM). The trait provides expect, expectOnce, and unexpect calls that can be used as in the samples below...

  // Sending a message expecting one response  
  context.actorOf(Props[FooBarActor]) ! FooBarRequest(reqId) 
  expectOnce {
    case FooBarResponse(reqId, content) => println(content) 
  }  

  // Always expecting a certain message like an exception from other actors   
  expect {
    case FooBarException(message, cause) => println(message) 
  }  

  // Expecting a message  
  val expectHandle = expect {
    case FooBarException(message, cause) => println(message) 
  }

  // At some point, we want to no longer expect - this also works with expectOnce
  unexpect(expectHandle)  

The ready-to-use code for the aggregator trait is available here.

The Real Sample


We'll base the code on the AccountBalanceRetriever sample, just adding a twist allowing caller an option to select certain account types to display, which results in the actor responding with a subset of all account types. The first part of the code is just a shameless copy of what is already out there with just the case objects added for identifying the account types, and few other case objects used for timing out, and so forth...

import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal

import akka.actor._

sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType

case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
case class GetAccountBalances(id: Long)

case class AccountBalances(accountType: AccountType, 
                           balance: Option[List[(Long, BigDecimal)]])

case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])

case object TimedOut
case object CantUnderstand

class SavingsAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
  }
}
class CheckingAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! CheckingAccountBalances(Some(List((3, 15000))))
  }
}
class MoneyMarketAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! MoneyMarketAccountBalances(None)
  }
}

Next is the actual AccountBalanceRetriever using the aggregator pattern.

class AccountBalanceRetriever extends Actor with Aggregator {

  import context._

  expectOnce {
    case GetCustomerAccountBalances(id, types) =>
      new AccountAggregator(sender, id, types)
    case _ =>
      sender ! CantUnderstand
      context.stop(self)
  }

  class AccountAggregator(originalSender: ActorRef,
                          id: Long, types: Set[AccountType]) {

    val results =
      mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]

    if (types.size > 0)
      types foreach {
        case Checking => fetchCheckingAccountsBalance()
        case Savings => fetchSavingsAccountsBalance()
        case MoneyMarket => fetchMoneyMarketAccountsBalance()
      }
    else collectBalances() // Empty type list yields empty response

    context.system.scheduler.scheduleOnce(250 milliseconds) {
      self ! TimedOut
    }
    expect {
      case TimedOut => collectBalances(force = true)
    }

    def fetchCheckingAccountsBalance() {
      context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case CheckingAccountBalances(balances) =>
          results += (Checking -> balances)
          collectBalances()
      }
    }

    def fetchSavingsAccountsBalance() {
      context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case SavingsAccountBalances(balances) =>
          results += (Savings -> balances)
          collectBalances()
      }
    }

    def fetchMoneyMarketAccountsBalance() {
      context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case MoneyMarketAccountBalances(balances) =>
          results += (MoneyMarket -> balances)
          collectBalances()
      }
    }

    def collectBalances(force: Boolean = false) {
      if (results.size == types.size || force) {
        originalSender ! results.toList // Make sure it becomes immutable
        context.stop(self)
      }
    }
  }
}

As you can see, the aggregator patterns allows specifying custom expect or expectOnce blocks dynamically based on what message we are expecting in what conditions. For instance, if the types initially requested do not include the money-market account, we would never expect a MoneyMarketAccountBalances message and such a rogue message arriving will just not be handled by the actor. The code for this AccountBalanceRetriever can be found here.

Conclusion


Orchestration flows commonly make use of the aggregator pattern which obtains data from multiple sources. Even these data sources are dynamic in nature. The same aggregator may touch upon some data sources in some conditions and others in other conditions, thus demanding a more dynamic actor receive configuration. In addition, the use of inner actors and techniques for timing out an actor have a high risk of closing over. These can sometimes be hard to identify by the casual developers eye. We should have a solid and safe to use programming pattern with the Aggregator trait and the simple set of rules applied to prevent closing over.

Happy coding, and follow the rules. Don't engage in risky behavior!!!

Sunday, April 7, 2013

Economics of Good Code

On investigating some of the more obscure features of the Scala language, I ran into a nice new feature in Scala 2.10 which is both efficient and more natural to write extension than the usual pimp-my-library approach. Adding a pet project that I did for my son for fun, we came up with some interesting code that deals with distances used in the solar system. Distances are provided by different reference material in many different units: light-minutes, light-hours, au (astronomical units), and kilometers. But the data needs to be normalized into a standard unit - we chose kilometers. So lets represent a couple of simple data points in a map. First the facts:

Distances from the Sun
Earth: 8.3 light-minutes
Saturn: 1.3 light-hours

Now lets create a simple map that represents this in Scala:

 val c = 299792.458 // Speed of light in km/s  
 val sunDistances = Map(  
  "Earth"-> 8.3 * c * 60 ,  
  "Saturn"-> 1.3 * c * 60 * 60  
 )  

Well, this works, but is not elegant and not very readable. Furthermore, it is not very scalable and error prone with a large data set. Lets see how we can solve this in a better way using value classes and extension mechanisms:

 object Distance {  
    
  val c = 299792.458d  
    
  implicit class AstroDistance(val d: Double) extends AnyVal{    
   def lightMinutes = d * c * 60  
   def lightHours = d * c * 60 * 60  
  }  
 }  

With this, our sun distances map could change to the followings:

  import Distance._  
  val sunDistances = Map(  
    "Earth"->(8.3 lightMinutes),   
    "Saturn"->(1.3 lightHours)  
  )  

As you can see, we achieved far more concise code. First of all, we use implicit conversions to convert doubles into AstroDistance allowing calling the lightMinutes or lightHours methods from the double directly. Then, with the help of Scala's value classes, we eliminate wrapper object creation. As a result, such implicit conversions call the methods "lightMinutes" and "lightHours" as static methods and have virtually no performance overhead while achieving the desired expressiveness.

But, how many developers will make use of such features to better their code? At our workplaces, developers are under the pressure to crank out code and features as fast as possible. There is little incentive to write concise and easy to maintain code. In many cases, people don't even come back looking their code again unless they have to fix a bug. Given these constraints, which version of code would developers choose to write? If you guessed the first example, we're on the same page. It will take a bit more learning to write the second version of the code so most developers having their  deadlines looming over their heads will not even think about the "better" way to write. Yet, the second version of the code with the units spelled out will be easier to read and to maintain for years to come. While we may not see the difference at this scale, imagine millions of lines of code written like the first example and it becomes very apparent that spending time on these minor coding tricks can result in far more savings over time.

Lets now add an economic flair to our discussion. The first example is definitely less costly to write if it is throw-away code. The second version requires more skill but makes the code reusable and reduces the cost of long term maintenance. What kind of code we should write depends on the circumstances. Almost always, code is written to stay for a relatively long time (years) so it seems we should generally opt for the better code. Yet, time pressure almost always results in short term thinking, and unknowingly opting for the bigger cost to pay in the long run.