FsShelter


Overview

FsShelter is a library for implementation of Apache Storm components and topologies in F#. FsShelter is based on and a major rewrite of FsStorm. It departs from FsStrom in significant ways and therefore has been split into itsown project.

Overall, the librabry provides "batteries included" experience with wrappers for Nimbus API as well as support for packaging and exporting:

  • bundle and submit a topology for execution w/o needing JDK or Storm CLI
  • include Storm-side serializer along
  • kill a running topology
  • generate a topology graph as part of your build

The topology and the components could be implemented in a single EXE project and are executed by Storm via its multilang protocol as separate processes - one for each task/instance. Corresponding ProtoShell Storm-side library facilitates Protobuf serialization, which improve throughput of FsShelter topologies as compared to standard JSON. See samples to learn how to bundle the assemblies and a serializer for upload to Storm.

Bring your own, if you need it:

  • command line parser
  • logging
  • custom serializer

FsShelter topology schema

While Storm tuples are dynamically typed and to a large extend the types are transparent to Storm itself, they are not types-less. Mistakes and inconsistencies between declared outputs and tuple consumers could easily lead to errors detectable at run-time only and may be frustrating to test, detect and fix. FsShelter introduces concept of topology schema, defined as F# discriminated union:

1: 
2: 
3: 
type BasicSchema = 
    | Original of int
    | Incremented of int

where every DU case becomes a distinct stream in the topology. The fields of each DU case will become tuple fields in Storm streams.

It is often handy to define a type that's shared across streams and FsShelter supports defining cases with records:

1: 
2: 
3: 
4: 
5: 
6: 
type Number = { X:int; Desc:string }

type RecordSchema = 
    | Original of int
    | Described of Number
    | Translated of Number

It is also common to join/zip tuples from multiple streams and FsShelter supports defining cases with records adjoined:

1: 
2: 
3: 
type RecordsSchema = 
    | Original of Number
    | Doubled of Number*Number

Other than safety of working with statically-verified schema the reason we care about structure of the tuple is because we reference them in Storm grouping definitions. FsShelter "flattens" the first immediate "layer" of the DU case so that all the fields, weither they come from the embedded record or the DU case itself, are available for grouping expressions.

FsShelter components

Some of the flexibility of Storm has been hidden to provide simple developer experience for authoring event-driven solutions. For exmple, FsShelter components are implemeted as simple functions:

1: 
2: 
// numbers spout - produces messages
let numbers source = async { return Some(Original(source())) }

The async body of a spout is expected to return an option if there's a tuple to emit or None if there's nothing to emit at this time.

Bolts can get a tuple on any number of streams, and so we pattern match:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
// add 1 bolt - consumes and emits messages to Incremented stream
let addOne (input, emit) = 
    async { 
        match input with
        | BasicSchema.Original(x) -> Incremented(x + 1)
        | _ -> failwithf "unexpected input: %A" input
        |> emit
    }

The bolt can also emit at any time, and we can hold on to the passed emit function (with caveates). Also, there can be as many arguments for the component functions as needed, the specifics will be determined when the components are put together in a topology.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
// terminating bolt - consumes messages
let logResult (info, input) = 
    async { 
        match input with
        | BasicSchema.Incremented(x) -> info (sprintf "%A" x)
        | _ -> failwithf "unexpected input: %A" input
    }

Using F# DSL to define the topology

Storm topology is a graph of spouts and bolts connected via streams. FsShelter provides an embedded DSL for defining the topologies, which allows for mix and match of native Java, external shell and FsShell components:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
// define our source dependency
let source = 
    let rnd = Random()
    fun () -> rnd.Next(0, 100)

open FsShelter.DSL
open FsShelter.Multilang

//define the Storm topology
let sampleTopology = 
    topology "Sample" { 
        let s1 = numbers |> runUnreliably (fun log cfg -> source) // ignoring available Storm logging and cfg and passing our source function
        
        let b1 = 
            addOne
            |> runBolt (fun log cfg tuple emit -> (tuple, emit)) // pass incoming tuple and emit function
            |> withParallelism 2 // override default parallelism of 1
        
        let b2 = 
            logResult
            |> runBolt (fun log cfg tuple emit -> ((log LogLevel.Info), tuple)) // example of passing Info-level Storm logger into the bolt
            |> withParallelism 2
        
        yield s1 --> b1 |> shuffle.on BasicSchema.Original // emit from s1 to b1 on Original stream
        yield b1 --> b2 |> shuffle.on Incremented // emit from b1 to b2 on Incremented stream
    }

Storm will start (a copy of) the same EXE for every component instance in the topology and will assign each instance a task it supposed to execute.

The topology can be packaged with all its dependecies and submitted using embedded Nimbus client, see the examples for details.

Exporting the topology graph in DOT format (GraphViz) using F# scripts

Once the number of components grows beyond trivial it is often handy to be able to visualize them and FsShelter includes a simple way to export the topology into a graph:

1: 
sampleTopology |> DotGraph.writeToConsole

See the samples included for further details.

Samples & documentation

  • WordCount contains a "unreliable" spout example - emitted tuples do not require ack, could be lost in case of failure.

  • Guaranteed contains a "reliable" spout example - emitted tuples have unique ID and require ack.

  • API Reference contains automatically generated documentation for public types, modules and functions in the library.

Getting FsShelter

The FsShelter library can be installed from NuGet or MyGet:
PM> Install-Package FsShelter
The library can also be tried out quickly as a Docker container, downloaded from docker hub:
$ docker run --name fsshelter-samples -d -p 8080:8080 prolucid/fsshelter-samples

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests. If you're adding a new public API, please also consider adding samples that can be turned into a documentation. You might also want to read the library design notes to understand how it works.

The library is available under Apache 2.0 license, which allows modification and redistribution for both commercial and non-commercial purposes. For more information see the License file in the GitHub repository.

Commercial support

Commercial training and support are available from the project sponsor: Prolucid

namespace System
namespace FsShelter
type BasicSchema =
  | Original of int
  | Incremented of int

Full name: Index.BasicSchema
union case BasicSchema.Original: int -> BasicSchema
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
union case BasicSchema.Incremented: int -> BasicSchema
type Number =
  {X: int;
   Desc: string;}

Full name: Index.Number
Number.X: int
Number.Desc: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
type RecordSchema =
  | Original of int
  | Described of Number
  | Translated of Number

Full name: Index.RecordSchema
union case RecordSchema.Original: int -> RecordSchema
union case RecordSchema.Described: Number -> RecordSchema
union case RecordSchema.Translated: Number -> RecordSchema
type RecordsSchema =
  | Original of Number
  | Doubled of Number * Number

Full name: Index.RecordsSchema
union case RecordsSchema.Original: Number -> RecordsSchema
union case RecordsSchema.Doubled: Number * Number -> RecordsSchema
val numbers : source:(unit -> Number) -> Async<RecordsSchema option>

Full name: Index.numbers
val source : (unit -> Number)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
union case Option.Some: Value: 'T -> Option<'T>
val addOne : input:BasicSchema * emit:(BasicSchema -> unit) -> Async<unit>

Full name: Index.addOne
val input : BasicSchema
val emit : (BasicSchema -> unit)
val x : int
val failwithf : format:Printf.StringFormat<'T,'Result> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.failwithf
val logResult : info:(string -> unit) * input:BasicSchema -> Async<unit>

Full name: Index.logResult
val info : (string -> unit)
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val source : (unit -> int)

Full name: Index.source
val rnd : Random
Multiple items
type Random =
  new : unit -> Random + 1 overload
  member Next : unit -> int + 2 overloads
  member NextBytes : buffer:byte[] -> unit
  member NextDouble : unit -> float

Full name: System.Random

--------------------
Random() : unit
Random(Seed: int) : unit
Random.Next() : int
Random.Next(maxValue: int) : int
Random.Next(minValue: int, maxValue: int) : int
module DSL

from FsShelter
module Multilang

from FsShelter
val sampleTopology : Topology.Topology<BasicSchema>

Full name: Index.sampleTopology
val topology : name:string -> TopologyBuilder

Full name: FsShelter.DSL.topology
val s1 : obj
val log : value:'T -> 'T (requires member Log)

Full name: Microsoft.FSharp.Core.Operators.log
val b1 : Topology.Bolt<BasicSchema>
val runBolt : mkArgs:((LogLevel -> string -> unit) -> Conf -> 't -> ('t -> unit) -> 'a) -> consume:Consume<'a> -> Topology.Bolt<'t>

Full name: FsShelter.DSL.runBolt
val log : (LogLevel -> string -> unit)
val cfg : Conf
val tuple : BasicSchema
val withParallelism : parallelism:'a -> spec:'s -> 's (requires member op_Explicit and member WithParallelism)

Full name: FsShelter.DSL.withParallelism
val b2 : Topology.Bolt<BasicSchema>
type LogLevel =
  | Trace = 0
  | Debug = 1
  | Info = 2
  | Warn = 3
  | Error = 4

Full name: FsShelter.Multilang.LogLevel
LogLevel.Info: LogLevel = 2
type shuffle =
  static member on : case:Expr<('a0 -> 't)> -> (bool -> ComponentId -> ComponentId -> Stream<'t>)

Full name: FsShelter.DSL.shuffle
static member shuffle.on : case:Quotations.Expr<('a0 -> 't)> -> (bool -> Topology.ComponentId -> Topology.ComponentId -> Topology.Stream<'t>)
module DotGraph

from FsShelter
val writeToConsole : t:Topology.Topology<'a> -> unit

Full name: FsShelter.DotGraph.writeToConsole
Fork me on GitHub