I do more and more Puppet, and I really like it. It’s a simple way to declare the desired state of the world. As I do more and more with Puppet, I needed to debug my manifests: check that they were doing the expected thing. I was looking for some kind of dry run option, similar to apt-get‘s or other tools, but puppet help apply didn’t provide any helpful hints. Until I stumbled upon Puppet dry run.

A direct run with --noop --test failed because --test was not recognized, but --noop was!. A sample run with --noop looks like this:

1 # puppet apply puppet/hosts/db03.staging.internal.pp  --noop
2 notice: /Stage[main]//Postgresql::Database[svreporting_staging]/Exec[createdb svreporting_staging]/returns: current_value notrun, should be 0 (noop)
3 notice: Postgresql::Database[svreporting_staging]: Would have triggered 'refresh' from 1 events
4 notice: /Stage[main]//Cron[replace svreporting_staging with svanalytics_staging]/ensure: current_value absent, should be present (noop)
5 notice: Class[Main]: Would have triggered 'refresh' from 2 events
6 notice: Stage[main]: Would have triggered 'refresh' from 1 events
7 notice: Finished catalog run in 10.86 seconds

Check the wording: Would have triggered, and should be. These are great ways to know what Puppet will do for you. There is only one caveat: when you need to spend more than a few minutes debugging your recipes, disable the automatic run. The scheduled run from puppet agent may go behind your back and apply your recipe, which you’re just debugging. In my case, this is very easy to do since I host my puppet manifests in a Git repository, which is pulled hourly and applied.

I’m only waiting for my pull request to be applied so you don’t have to go through the same process as I did.

At Seevibes we use the Twitter Streaming API to harvest what people say about television shows. The Twitter Streaming API docs state that:

Upon a change, reconnect immediately if no changes have occurred for some time. For example, reconnect no more than twice every four minutes, or three times per six minutes, or some similar metric.

From Updating Filter Predicates

The way my system is architected, whenever we change the list of keywords, a ShowDataErased message is sent, followed by a bunch of ShowKeywordsReplaced events, one per show. The system uses CQRS where each change that occurs is reflected in the system through one or more events, hence the two events above.

Here’s my state machine in all it’s gory details:

Seevibes Streaming Harvester State Machine

What the machine does is:

  • When streaming, and time passes, nothing happens;
  • When streaming and we change keywords (either reset or add keyword), we enter a state where we’re pending some changes;
  • When we’re pending, and we change keywords (reset or add again), we stay in the pending state;
  • When we’re pending, and insufficient time has passed, then we must stay in the same state, in case we receive more keyword change events;
  • Finally, if we’re pending and sufficient time has passed, we just go back to the streaming state.

The obvious first test is to start with the fact that when we’re streaming, we must still be streaming. Here’s an implementation of this test.

 1 import org.junit.Test
 2 import org.scalatest.Assertions
 3 import org.joda.time.Instant
 4 
 5 class StreamingStateMachineTest extends Assertions {
 6     val KEY2 = Set("b")
 7 
 8     @Test
 9     def whenStreaming_andReceiveTick_thenShouldStillBeStreaming() {
10         expect(Streaming(KEY2)) {
11             Streaming(KEY2).tick(new Instant())
12         }
13     }
14 }

For the Rubyists out there, case classes are a kind of class that has a few nice properties:

  • They act like methods, thus we can call the Streaming method (which the compiler turns into a call of Streaming.apply());
  • They have an unapply() method, which enables pattern-matching (which I won’t use in this case);
  • They are serializable, externalizable, clonable, have a free equals() and hashCode() implementation that is correct and consistent with the Java equals() and hashCode() guidelines.

Case classes are an ideal vehicle to transport data with one or more methods. One thing that you must realize when using case classes is that you must use only immutable objects as parameters to your case classes. If you used a mutable object, your case class would suddenly be mutable, and the equals() and hashCode() methods would be useless for you. The Scala compiler has no way to enfore this.

Getting back to our business, the obvious implementation is a no-op:

1 import org.joda.time.ReadableInstant
2 case class Streaming(keywords: Set[String]) {
3     def tick(now: ReadableInstant) = this
4 }

One event down, 6 to go! Let’s change state : go from the streaming state to the pending keywords change state. I’ll start with the simpler case: resetting the keywords list. Here’s an implementation:

1 @Test
2 def whenStreaming_andReceiveReset_thenShouldBePending() {
3     expect(PendingKeywordsChange(Set.empty[String], now)) {
4         Streaming(KEY1).resetKeywords(now)
5     }
6 }

Notice that I’m passing explicit times to my methods. This is to make testing much easier. It doesn’t really bother me to send the current time along. It’s certainly easier than sending a factory which returns the current time, which would add a level of indirection, and be more complicated.

The implementation is also pretty simple:

1 def resetKeywords(now: ReadableInstant) =
2     PendingKeywordsChange(Set.empty[String], now)

And so on for the other implementations of Streaming. The guts of the state machine are really in the PendingKeywordsChange class. First, a test:

 1 @Test
 2 def whenStreaming_andReceiveAddKeyword_thenShouldBePending() {
 3     expect(PendingKeywordsChange(KEY1 + "b", now)) {
 4         Streaming(KEY1).addKeyword("b", now)
 5     }
 6 }
 7 
 8 case class PendingKeywordsChange(keywords: Set[String],
 9                                  lastChangedAt: ReadableInstant) {
10 
11     def addKeyword(keyword: String, now: ReadableInstant) =
12         PendingKeywordsChange(newKeywords + keyword, now)
13 
14 }

When I wrote the code, I was really excited by how concise and clear my intents shined through the code. The final method I’d like to show is the tick() method on PendingKeywordsChange:

 1 @Test
 2 def whenPending_andReceiveTick_andInsufficientTimeHasPassed_thenShouldStayPending() {
 3     expect(PendingKeywordsChange(KEY1, oneMinuteAgo)) {
 4         PendingKeywordsChange(KEY1, oneMinuteAgo).tick(now)
 5     }
 6 
 7     expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
 8         PendingKeywordsChange(KEY1, twoMinutesAgo).tick(now)
 9     }
10 
11     expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
12         PendingKeywordsChange(KEY1, twoMinutesAgo).tick(twoMinutesAgo)
13     }
14 }
15 
16 @Test
17 def whenPending_andReceiveTick_andSufficientTimeHasPassed_thenShouldBeStreaming() {
18     expect(Streaming(KEY1, now)) {
19         PendingKeywordsChange(KEY1, twoMinutesAgo.minus(TimeUnit.SECONDS.toMillis(1))).tick(now)
20     }
21 }
22 
23 // in class PendingKeywordsChange
24 def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))
25 
26 def tick(now: ReadableInstant = new Instant()) =
27     if (now.isAfter(nextChangeAt))
28         Streaming(newKeywords)
29     else
30         this

The final part is where and how I regularly call the tick event. Because the rest of my infrastructure is tied to Akka actors, I used the Akka scheduler API to send a message to an actor, which hid my state machine behind a nice and consistent facade:

 1 import akka.actor.Actor
 2 
 3 case object Tick
 4 case object ResetKeywords
 5 case class AddKeyword(keyword: String)
 6 
 7 trait State {
 8     def resetKeywords(now: ReadableInstant): State
 9     def addKeyword(keyword: String, now: ReadableInstant): State
10     def tick(now: ReadableInstant): State
11     def keywords: Set[String]
12 }
13 
14 case class Streaming(keywords: Set[String],
15                      lastChangedAt: ReadableInstant) extends State {
16     def resetKeywords(now: ReadableInstant) =
17         PendingKeywordsChange(Set.empty[String], now)
18 
19     def addKeyword(keyword: String, now: ReadableInstant) =
20         PendingKeywordsChange(Set(keyword), now)
21 
22     def tick(now: ReadableInstant) = this
23 }
24 
25 case class PendingKeywordsChange(keywords: Set[String],
26                                  lastChangedAt: ReadableInstant) extends State {
27     def resetKeywords(now: ReadableInstant) =
28         PendingKeywordsChange(Set.empty[String], now)
29 
30     def addKeyword(keyword: String, now: ReadableInstant) =
31         PendingKeywordsChange(keywords + keyword, now)
32 
33     def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))
34 
35     def tick(now: ReadableInstant = new Instant()) =
36         if (now.isAfter(nextChangeAt))
37             Streaming(keywords)
38         else
39             this
40 }
41 
42 class KeywordsStateMachine extends Actor {
43     private var state: State = Streaming(Set.empty[String], new Instant())
44     private var currentKeywords = Set.empty[String]
45     private val stream = ... // Twitter4J Streaming API
46 
47     def receive = {
48         case Tick =>
49             state = state.tick(new Instant)
50             if (state.keywords != currentKeywords)
51                 // reset stream with new keywords
52 
53         case AddKeyword(keyword) =>
54             state = state.addKeyword(keyword, new Instant())
55 
56         case ResetKeywords =>
57             state = state.resetKeywords(new Instant())
58     }
59 }
60 
61 object Main {
62     def main(args : Array[String]) {
63         val keywordsStateMachine =
64             Actor.actorOf[KeywordsStateMachine].start()
65 
66         // Send the Tick message to the keywordsStateMachine now,
67         // and every minute thereafter
68         Scheduler.schedule(keywordsStateMachine, Tick, 0, 1, TimeUnit.MINUTES)
69     }
70 }

A few things contributed to the clarity of the implementation:

  • The Joda Time library is really easy to use and understand. You don’t need to use java.util.Date or java.util.Calendar : ditch them as soon as you can;
  • Scala’s case classes reduced boilerplate: no new operator in sight;
  • Public by default enhances clarity by removing keywords where they don’t matter;
  • Libraries, yes, but not when something simple is required. Clarity of implementation before reuse.

Note that this state machine does not have to deal with dropped connections, or rate limiting or anything of the sort, because it’s all hidden behind Twitter4J’s interface. The resulting state machine is much easier to understand and reason about.

In the interest of fun, profit and learning, I’ve made available a GitHub repository with similar code to what’s here at streaming-state-machine. The code is different because it doesn’t actually connect to Twitter, and thus only needs to demonstrate how changing the states works. The time scale was changed from minutes to seconds.

A great time was had by all!

Seriously, the Scala workshop was very cool. The actor model was new to almost all attendants and needed some explaining. Looking back, I should have explained a bit more about how the this works. From what I saw, nobody stumbled upon the Scala syntax that much. The people that were there were all experienced programmers, thus may not have needed much hand-holding regarding syntax.

Martin Provencher, organizer of Montreal.rb, and his teammate, Olivier Melcher, wrote an actor-based histogram producing word counter that counted which programming languages were most spoken of. In their tests, Ruby and PHP were the most frequent ones.

The SQL Workshop was more difficult for all. The first exercise was much harder than I anticipated. The first question was:

Calculate the top 5 shows, ordered by social impressions, for the period between 2011-10-17 and 2011-10-23. Return an answer under 3 seconds.

First, people had to get familiar with the data model. I asked people to get the data in under 3 seconds, thus was more of an optimization problem, rather than just getting the correct query. I allowed about 40 minutes for this exercise, and most of the time was used in getting a correct query. Nobody had time to get to the optimization part. Looking back, I’d make this question be two parts: first write the query, then optimize it, or provide the correct query and let people optimize it.

I hope everybody learned a lot by coming to the hackaton.

A big “Thank You” to the event’s sponsors for the beer and pizza:

Notes for Hackaton organizers

If you can, provide a starter application or package. All attendees were happy they could get up and running with little to no fuss. It saved a lot of time because they didn’t have to resolve dependency nightmares.

Move around the room, look over everybody’s shoulders. Without fail, when I walked up to a team, they’d have a question for me.

Speaking of teams, it’s useful for people to work in teams. I’d say this harkens back to Pair Programming with a Novice / Novice pair. Two novices working together can get further along than a single novice. One of the members will have an insight that the other member didn’t have, and both can move forward from there.

Quick Puppet Tip

2012-01-24

While testing my Puppet recipes, I usually boot a new instance then apply configuration manually. I have a base configuration, which is extended with other things. I just noticed I could apply a manifest using STDIN:

1 # ( echo "include sysbase" ; echo "include rabbitmq" ) | puppet apply -v

Viva the Unix tradition!

If you do any kind of configuration management, I highly recommend Puppet.

I just learned this morning that Needium will join Seevibes in sponsoring the beer at the Scala Hackaton and SQL Workshop. Both events are on February 2nd. If you haven’t already done so and wish to join us, please register at EventBrite:

Hope to see you there!

I would like to invite you to attend one or two events on February 2nd: Analyzing Twitter Social Data using Scala and Akka Actors and Social Media Metrics using SQL Engines.

Schedule

  • 2:00 PM Doors open
  • 2:30 PM Hackaton — Analyzing Twitter Social Data using Scala and Akka Actors
  • 5:00 PM Beer & Pizza, sponsored by Seevibes
  • 6:30 PM Workshop — Social Media Metrics using SQL Engines
  • 9:00 PM Socializing

The event will be at the Notman House, 51 Sherbrooke W. We have a limited number of places available, so be sure to reserve your seat now. There are two events, both are free to attend, and both are bilingual French and English.

Registration

Setup

Before you come in, please be sure to follow these instructions to get you started:

Scala Hackaton

The Scala Hackaton is an event where you’ll build whatever you wish: word counter, word frequency, hashtag frequency, etc. You get to choose. There will be people more familiar with Scala and Akka at the event which can help you. The first 30 minutes of the event will be reserved for a quick introduction to Scala.

  • Clone the scala-hackaton Git repository, or download a ZIP
  • In the repository / project, run mvn test and mvn exec:java -Dexec.mainClass=seevibes.HelloActor

The two Maven steps are to download all necessary dependencies. If you don’t, you’ll lose a lot of time at the event downloading your dependencies.

If you are unfamiliar with Java and Scala, I strongly recommend you use an IDE, which will help with code completion and syntax awareness. I happen to prefer JetBrains’ IDEA, but this is like Vim vs Emacs. You can use Eclipse if you prefer. If you use Eclipse, be sure to use the Scala IDE extension. In the case of IDEA, download and install the Scala plugin.

If you have any issues, please email me, Fran├žois Beausoleil, and I’ll help you out. I’ll post updates to this page if common errors pop up.

SQL Workshop

The SQL workshop will be a series of directed examples:

1. I will present a problem, a report or a question we want answered, and some details on how you can accomplish the goal;
2. You will answer the question with the knowledge you have;
3. I’ll ask people to present their solutions;
4. I’ll present my solution and discuss specifics

I have 6 exercises planned out, from 15 to 45 minutes each. The topics range from indexing to joining to using intersections and unions and ending with windowing functions. The workshop is for people who wish to learn more about SQL and how to more effectively use thecapabilites of their favorite SQL engines.

  • Install PostgreSQL 9.1 (latest is currently 9.1.2)
  • Load this PostgreSQL database dump svworkshop.sql.bz2 (315 MiB) in your cluster using the following command:
1 bzcat svworkshop.sql.bz2 | psql

The dump file expects to create a new database named svworkshop using your default user.

Hope to see you there!

I sometimes have to do sysadmin work, such as when I’m the sole technical person on a probject. When I need to keep a service running, I usually turn to daemontools. Daemontools was written by D. J. Bernstein, a mathematician and author of many UNIX utilities.

From daemontools’ home page:

daemontools is a collection of tools for managing UNIX services.

daemontools home page

What this means is daemontools is designed to run, and keep running, a service. Daemontools also includes other utilities which I find useful, such as setuidgid, envdir and multilog,. I searched for an article such as this, but couldn’t find it. If you find a factual error, please let me know immediately. If you have your own best practices, let me know so I can expand on the list.

Read the articles themselves here:

I had lots of difficulties running my tests under IDEA. The exact error message was:

Error running All Tests:
Not found suite class.

Where All Tests was the name of my Run configuration.

I finally ended up with the right incantations. In my POM, I have the following:

 1 <dependencies>
 2     <dependency>
 3       <groupId>junit</groupId>
 4       <artifactId>junit</artifactId>
 5       <version>4.8.1</version>
 6       <scope>test</scope>
 7     </dependency>
 8     <dependency>
 9       <groupId>org.scalatest</groupId>
10       <artifactId>scalatest_2.9.0-1</artifactId>
11       <version>1.6.1</version>
12     </dependency>
13     <dependency>
14       <groupId>org.mockito</groupId>
15       <artifactId>mockito-core</artifactId>
16       <version>1.8.1</version>
17       <scope>test</scope>
18     </dependency>
19 </dependencies>

Then, I had to extend org.scalatest.junit.JUnitSuite for my test classes, like this:

1 import JUnitSuite
2 import org.junit.Test
3 
4 class GivenAnEmptyQueue extends JUnitSuite {
5   @Test def thenItShouldNotHaveAnyElements() {
6     assert(new Queue.empty)
7   }
8 }

Finally, I had to verify that both the JUnit and Scala plugins were enabled and at their latest versions in the IDE itself. After that, I was able to run my tests from within the IDE.

I’m starting in Scala, because Seevibes’ code is in Scala. Scala has a tool named simple-build-tool for managing your projects. sbt is similar to Ruby’s Bundler and Clojure’s Leiningen in that it manages dependencies and helps build a project for you.

Unfortunately, I had problems getting started. After following the Setup instructions, I was consistently getting this error:

 1 Getting org.scala-tools.sbt sbt_2.8.1 0.10.1 ...
 2 
 3 :: problems summary ::
 4 :::: WARNINGS
 5     [NOT FOUND  ] commons-logging#commons-logging;1.0.4!commons-logging.jar (5ms)
 6 
 7   ==== Maven2 Local: tried
 8 
 9     file:///Users/francois/.m2/repository/commons-logging/commons-logging/1.0.4/commons-logging-1.0.4.jar
10 
11     ::::::::::::::::::::::::::::::::::::::::::::::
12 
13     ::              FAILED DOWNLOADS            ::
14 
15     :: ^ see resolution messages for details  ^ ::
16 
17     ::::::::::::::::::::::::::::::::::::::::::::::
18 
19     :: commons-logging#commons-logging;1.0.4!commons-logging.jar
20 
21     ::::::::::::::::::::::::::::::::::::::::::::::
22 
23 
24 
25 :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
26 download failed: commons-logging#commons-logging;1.0.4!commons-logging.jar
27 Error during sbt execution: Error retrieving required libraries
28   (see /Users/francois/Projects/project/boot/update.log for complete log)
29 Error: Could not retrieve sbt 0.10.1

I hit #scala and RSchulz pointed me to ~/.ivy2. After I rm -rf’d ~/.ivy2 and ~/.m2, sbt ran to completion. I was unable to find any mentions of the errors above, so hopefully this may help someone else.

I ran into a little gotcha today, using Sequel. I’m writing an importer, you know the kind: read record from database A, apply some transformations, write to database B. No rocket science required. But, Sequel has a little gotcha that stumped me for a bit. My script looked like this:

 1 DBa = Sequel.connect "..."
 2 DBb = Sequel.connect "..."
 3 
 4 class APerson < Sequel::Model(DBa[:people])
 5 end
 6 
 7 class BContact < Sequel::Model(DBb[:contacts])
 8 end
 9 
10 contacts = Hash.new
11 APerson.all.each do |person|
12   contact = BContact.create(
13     :name        => person.last_name + ", " + person.first_name,
14     :tenant_code => ENV["TENANT_CODE"],
15     :updated_by  => "import",
16     :updated_at  => Time.now)
17   contacts[ person.id ] = contact.contact_id
18 end
19 
20 # Now I can map A's IDs to the correct value in database B, such as
21 # for attaching email addresses, phone numbers, etc.

The Contact model in the B database is declared like this:

1 create_table :contacts do
2   column :tenant_code, :integer,       :null => false
3   column :contact_id,  :serial,        :null => false
4   column :name,        "varchar(240)", :null => false
5 
6   primary_key [:tenant_code, :contact_id]
7   foreign_key [:tenant_code], :tenants
8 end

Notice tenant_code and contact_id are part of the primary key. I don’t write to contact_id because I want the sequence’s value to be returned to me. But I must write my own value to the tenant_code column. I was receiving an exception on the #create call:

 1 /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:1491:in `block in set_restricted': method tenant_code= doesn't exist or access is restricted to it (Sequel::Error)
 2   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:1486:in `each'
 3   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:1486:in `set_restricted'
 4   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:1077:in `set'
 5   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:1456:in `initialize_set'
 6   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:764:in `initialize'
 7   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:134:in `new'
 8   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:134:in `create'
 9   from /Users/francois/.rvm/gems/ruby-1.9.2-p180/gems/sequel-3.25.0/lib/sequel/model/base.rb:248:in `find_or_create'
10   from script/import:65:in `block (2 levels) in <top (required)>'
11 

I was very much stumped, and turned to the excellent documentation. I eventually found my way to #set_all, and changed my code accordingly:

1 APerson.all.each do |person|
2   contact = BContact.new.set_all(
3     :name            => person.last_name + ", " + person.first_name,
4     :tenant_code     => ENV["TENANT_CODE"],
5     :last_updated_by => "import",
6     :last_updated_at => Time.now)
7   contacts[ person.id ] = contact.contact_id
8 end

Even though the Sequel RDoc says #set_all ignores restricted columns, I was still receiving the same exception. I was now doubly stumped, until I found a reference to #unrestrict_primary_key. I added the declaration to BContact and was able to proceed:

1 class BContact < Sequel::Model(DBb[:contacts])
2   unrestrict_primary_key
3 end

You know the drill though: where you import one model, you’ll have more to import shortly. Ruby to the rescue!

1 class Sequel::Model
2   # Open classes win every time!
3   unrestrict_primary_key
4 end

Problem solved!

Search

Your Host

A picture of me

I am François Beausoleil, a Ruby on Rails and Scala developer. During the day, I work on Seevibes, a platform to measure social interactions related to TV shows. At night, I am interested many things. Read my biography.

Top Tags

Books I read and recommend

Links

Projects I work on

Projects I worked on