At Seevibes we use the Twitter Streaming API to harvest what people say about television shows. The Twitter Streaming API docs state that:

Upon a change, reconnect immediately if no changes have occurred for some time. For example, reconnect no more than twice every four minutes, or three times per six minutes, or some similar metric.

From Updating Filter Predicates

The way my system is architected, whenever we change the list of keywords, a ShowDataErased message is sent, followed by a bunch of ShowKeywordsReplaced events, one per show. The system uses CQRS where each change that occurs is reflected in the system through one or more events, hence the two events above.

Here’s my state machine in all it’s gory details:

Seevibes Streaming Harvester State Machine

What the machine does is:

  • When streaming, and time passes, nothing happens;
  • When streaming and we change keywords (either reset or add keyword), we enter a state where we’re pending some changes;
  • When we’re pending, and we change keywords (reset or add again), we stay in the pending state;
  • When we’re pending, and insufficient time has passed, then we must stay in the same state, in case we receive more keyword change events;
  • Finally, if we’re pending and sufficient time has passed, we just go back to the streaming state.

The obvious first test is to start with the fact that when we’re streaming, we must still be streaming. Here’s an implementation of this test.

 1 import org.junit.Test
 2 import org.scalatest.Assertions
 3 import org.joda.time.Instant
 4 
 5 class StreamingStateMachineTest extends Assertions {
 6     val KEY2 = Set("b")
 7 
 8     @Test
 9     def whenStreaming_andReceiveTick_thenShouldStillBeStreaming() {
10         expect(Streaming(KEY2)) {
11             Streaming(KEY2).tick(new Instant())
12         }
13     }
14 }

For the Rubyists out there, case classes are a kind of class that has a few nice properties:

  • They act like methods, thus we can call the Streaming method (which the compiler turns into a call of Streaming.apply());
  • They have an unapply() method, which enables pattern-matching (which I won’t use in this case);
  • They are serializable, externalizable, clonable, have a free equals() and hashCode() implementation that is correct and consistent with the Java equals() and hashCode() guidelines.

Case classes are an ideal vehicle to transport data with one or more methods. One thing that you must realize when using case classes is that you must use only immutable objects as parameters to your case classes. If you used a mutable object, your case class would suddenly be mutable, and the equals() and hashCode() methods would be useless for you. The Scala compiler has no way to enfore this.

Getting back to our business, the obvious implementation is a no-op:

1 import org.joda.time.ReadableInstant
2 case class Streaming(keywords: Set[String]) {
3     def tick(now: ReadableInstant) = this
4 }

One event down, 6 to go! Let’s change state : go from the streaming state to the pending keywords change state. I’ll start with the simpler case: resetting the keywords list. Here’s an implementation:

1 @Test
2 def whenStreaming_andReceiveReset_thenShouldBePending() {
3     expect(PendingKeywordsChange(Set.empty[String], now)) {
4         Streaming(KEY1).resetKeywords(now)
5     }
6 }

Notice that I’m passing explicit times to my methods. This is to make testing much easier. It doesn’t really bother me to send the current time along. It’s certainly easier than sending a factory which returns the current time, which would add a level of indirection, and be more complicated.

The implementation is also pretty simple:

1 def resetKeywords(now: ReadableInstant) =
2     PendingKeywordsChange(Set.empty[String], now)

And so on for the other implementations of Streaming. The guts of the state machine are really in the PendingKeywordsChange class. First, a test:

 1 @Test
 2 def whenStreaming_andReceiveAddKeyword_thenShouldBePending() {
 3     expect(PendingKeywordsChange(KEY1 + "b", now)) {
 4         Streaming(KEY1).addKeyword("b", now)
 5     }
 6 }
 7 
 8 case class PendingKeywordsChange(keywords: Set[String],
 9                                  lastChangedAt: ReadableInstant) {
10 
11     def addKeyword(keyword: String, now: ReadableInstant) =
12         PendingKeywordsChange(newKeywords + keyword, now)
13 
14 }

When I wrote the code, I was really excited by how concise and clear my intents shined through the code. The final method I’d like to show is the tick() method on PendingKeywordsChange:

 1 @Test
 2 def whenPending_andReceiveTick_andInsufficientTimeHasPassed_thenShouldStayPending() {
 3     expect(PendingKeywordsChange(KEY1, oneMinuteAgo)) {
 4         PendingKeywordsChange(KEY1, oneMinuteAgo).tick(now)
 5     }
 6 
 7     expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
 8         PendingKeywordsChange(KEY1, twoMinutesAgo).tick(now)
 9     }
10 
11     expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
12         PendingKeywordsChange(KEY1, twoMinutesAgo).tick(twoMinutesAgo)
13     }
14 }
15 
16 @Test
17 def whenPending_andReceiveTick_andSufficientTimeHasPassed_thenShouldBeStreaming() {
18     expect(Streaming(KEY1, now)) {
19         PendingKeywordsChange(KEY1, twoMinutesAgo.minus(TimeUnit.SECONDS.toMillis(1))).tick(now)
20     }
21 }
22 
23 // in class PendingKeywordsChange
24 def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))
25 
26 def tick(now: ReadableInstant = new Instant()) =
27     if (now.isAfter(nextChangeAt))
28         Streaming(newKeywords)
29     else
30         this

The final part is where and how I regularly call the tick event. Because the rest of my infrastructure is tied to Akka actors, I used the Akka scheduler API to send a message to an actor, which hid my state machine behind a nice and consistent facade:

 1 import akka.actor.Actor
 2 
 3 case object Tick
 4 case object ResetKeywords
 5 case class AddKeyword(keyword: String)
 6 
 7 trait State {
 8     def resetKeywords(now: ReadableInstant): State
 9     def addKeyword(keyword: String, now: ReadableInstant): State
10     def tick(now: ReadableInstant): State
11     def keywords: Set[String]
12 }
13 
14 case class Streaming(keywords: Set[String],
15                      lastChangedAt: ReadableInstant) extends State {
16     def resetKeywords(now: ReadableInstant) =
17         PendingKeywordsChange(Set.empty[String], now)
18 
19     def addKeyword(keyword: String, now: ReadableInstant) =
20         PendingKeywordsChange(Set(keyword), now)
21 
22     def tick(now: ReadableInstant) = this
23 }
24 
25 case class PendingKeywordsChange(keywords: Set[String],
26                                  lastChangedAt: ReadableInstant) extends State {
27     def resetKeywords(now: ReadableInstant) =
28         PendingKeywordsChange(Set.empty[String], now)
29 
30     def addKeyword(keyword: String, now: ReadableInstant) =
31         PendingKeywordsChange(keywords + keyword, now)
32 
33     def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))
34 
35     def tick(now: ReadableInstant = new Instant()) =
36         if (now.isAfter(nextChangeAt))
37             Streaming(keywords)
38         else
39             this
40 }
41 
42 class KeywordsStateMachine extends Actor {
43     private var state: State = Streaming(Set.empty[String], new Instant())
44     private var currentKeywords = Set.empty[String]
45     private val stream = ... // Twitter4J Streaming API
46 
47     def receive = {
48         case Tick =>
49             state = state.tick(new Instant)
50             if (state.keywords != currentKeywords)
51                 // reset stream with new keywords
52 
53         case AddKeyword(keyword) =>
54             state = state.addKeyword(keyword, new Instant())
55 
56         case ResetKeywords =>
57             state = state.resetKeywords(new Instant())
58     }
59 }
60 
61 object Main {
62     def main(args : Array[String]) {
63         val keywordsStateMachine =
64             Actor.actorOf[KeywordsStateMachine].start()
65 
66         // Send the Tick message to the keywordsStateMachine now,
67         // and every minute thereafter
68         Scheduler.schedule(keywordsStateMachine, Tick, 0, 1, TimeUnit.MINUTES)
69     }
70 }

A few things contributed to the clarity of the implementation:

  • The Joda Time library is really easy to use and understand. You don’t need to use java.util.Date or java.util.Calendar : ditch them as soon as you can;
  • Scala’s case classes reduced boilerplate: no new operator in sight;
  • Public by default enhances clarity by removing keywords where they don’t matter;
  • Libraries, yes, but not when something simple is required. Clarity of implementation before reuse.

Note that this state machine does not have to deal with dropped connections, or rate limiting or anything of the sort, because it’s all hidden behind Twitter4J’s interface. The resulting state machine is much easier to understand and reason about.

In the interest of fun, profit and learning, I’ve made available a GitHub repository with similar code to what’s here at streaming-state-machine. The code is different because it doesn’t actually connect to Twitter, and thus only needs to demonstrate how changing the states works. The time scale was changed from minutes to seconds.

Search

Your Host

A picture of me

I am François Beausoleil, a Ruby on Rails and Scala developer. During the day, I work on Seevibes, a platform to measure social interactions related to TV shows. At night, I am interested many things. Read my biography.

Top Tags

Books I read and recommend

Links

Projects I work on

Projects I worked on