FsShelter


Defining reliable spouts

Processing guarantees are the biggest selling point of Storm, please see the official docs for the details. The reliable spout implementation for a source like a peristent queue (RabbitMQ, Kafka, etc) needs to obtain the event id from the source and forward Storm's acks and nacks back to the source. The obtained Id has to be passed along with the tuple from the spout function:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
// data schema for the topology, every case is a unqiue stream
type Schema = 
    | Original of int
    | Even of int
    | Odd of int

// numbers spout - produces messages
let numbers source =
    let (tupleId,number) = source()
    Some(tupleId, Original (number)) 

// add 1 bolt - consumes and emits messages to either Even or Odd stream
let addOne (input,emit) =
    match input with
    | Original x -> 
        match x % 2 with
        | 1 -> Even (x+1)
        | _ -> Odd (x+1)
    | _ -> failwithf "unexpected input: %A" input
    |> emit

// terminating bolt - consumes messages
let logResult (info,input) =
    match input with
    | Even x
    | Odd x -> info (sprintf "Got: %A" input)
    | _ -> failwithf "unexpected input: %A" input

Here we mimic an external source and implement all three possible cases: produce a new message, retry a failed one (indefinetely) and ack a successfully processed.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
open FsShelter.Topology

type QueueCmd =
    | Get of AsyncReplyChannel<TupleId*int>
    | Ack of TupleId
    | Nack of TupleId

// faking an external source here
let source = 
    let rnd = Random()
    let count = ref 0L
    let pending = Dictionary()
    let nextId() = Threading.Interlocked.Increment &count.contents

    MailboxProcessor.Start (fun inbox -> 
        let rec loop nacked = 
            async { 
                let! cmd = inbox.Receive()
                return! loop <|
                       match cmd, nacked with
                       | Get rc, [] ->
                            let tupleId,number = string(nextId()), rnd.Next(0, 100)
                            pending.Add(tupleId,number)
                            rc.Reply(tupleId,number)
                            []
                       | Get rc,(tupleId,number)::xs ->
                            pending.Add(tupleId,number)
                            rc.Reply (tupleId,number)
                            xs
                       | Ack id, _ -> 
                            pending.Remove id |> ignore
                            nacked
                       | Nack id, _ -> 
                            (id,pending.[id])::nacked
            }
        loop [])

Anchoring

In order to provide processing guarantees Storm needs to construct and track the state of entire "tuple tree", which is built out by emitting "anchored" tuples. FsShelter implements anchoring statically: instead of ad-hoc, as determined by a component, it is a property of the stream leading to an emit. Consequently the implementation of emit (anchored/unanchored) is determined by the topology graph and completely transparent to the bolt that processes a tuple that will be used as an anchor.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
//define the storm topology
open FsShelter.DSL

#nowarn "25" // for stream matching expressions
let sampleTopology = topology "Guaranteed" {
    let s1 = numbers
             |> Spout.runReliable (fun log cfg () -> source.PostAndReply Get)  // ignoring logging and cfg available
                                  (fun _ -> Ack >> source.Post, Nack >> source.Post)
                                  ignore                                       // no deactivation
    let b1 = addOne
             |> Bolt.run (fun log cfg tuple emit -> (tuple,emit)) // pass incoming tuple and emit function
             |> withParallelism 2
    
    let b2 = logResult
             |> Bolt.run (fun log cfg ->
                            let mylog = Common.Logging.asyncLog ("odd.log")
                            fun tuple emit -> (mylog,tuple))
             |> withParallelism 1

    let b3 = logResult
             |> Bolt.run (fun log cfg -> 
                            let mylog = Common.Logging.asyncLog ("even.log") 
                            fun tuple emit -> (mylog,tuple))
             |> withParallelism 1

    yield s1 ==> b1 |> Shuffle.on Original  // emit from s1 to b1 on Original stream and anchor immediately following emits to this tuple
    yield b1 --> b2 |> Shuffle.on Odd       // anchored emit from b1 to b2 on Odd stream 
    yield b1 --> b3 |> Shuffle.on Even      // anchored emit from b1 to b2 on Even stream 
}

Resulting topology graph:

SVG

The solid lines represent "anchoring" streams and the dotted lines indicate the outer limits of the processing guarantees: a tuple emitted along a dotted line is only anchored if the line leading to it is solid.

namespace FsShelter
namespace System.Collections
namespace System.Collections.Generic
val x : int
module Topology

from FsShelter
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member MemoryBarrier : unit -> unit
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
Threading.Interlocked.Increment(location: byref<int64>) : int64
Threading.Interlocked.Increment(location: byref<int>) : int
Ref.contents: int64
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
Dictionary.Add(key: string, value: int) : unit
member AsyncReplyChannel.Reply : value:'Reply -> unit
Dictionary.Remove(key: string) : bool
(extension) IDictionary.Remove<'TKey,'TValue>(key: 'TKey, value: byref<'TValue>) : bool
Dictionary.Remove(key: string, value: byref<int>) : bool
module DSL

from FsShelter
val runReliable : mkArgs:((Multilang.LogLevel -> string -> unit) -> Conf -> 'args) -> mkAcker:('args -> Acker) -> deactivate:('args -> unit) -> next:Next<'args,(string * 't)> -> Spout<'t>

Full name: FsShelter.DSL.Spout.runReliable
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member MailboxProcessor.Post : message:'Msg -> unit
val run : mkArgs:((Multilang.LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume:Consume<'a> -> Bolt<'t>

Full name: FsShelter.DSL.Bolt.run
static member Shuffle.on : case:Quotations.Expr<('a0 -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)
Fork me on GitHub