What is reactive programming and why should I use it?

A good reactive programming library takes a huge maintenance burden off some of the most commonly written, bug-prone code in applications.

In the previous article, I presented my own library for reactive programming, CwlSignal but simple syntax examples don’t really demonstrate how to use reactive programming to solve problems.

In this article, I’ll explain why I consider reactive programming to be one of the most important design patterns for application programming by looking at three scenarios that are common in application development, yet are a drain on development time, lead to frequent bugs and make design and refactoring hard. I’ll show how reactive programming addresses the verbosity, eliminates the unsafety and restructures the code to aid maintainability.

The specific scenarios I’ll examine will be:

  • A button whose isEnabled status is dependent on two different state values
  • A thread-safe storage type
  • An asynchronous task with a timeout

A description of reactive programming

Reactive programming is defined by the following principle:

Any “getter” for mutable state causes problems. Instead of using getters, any calculated, generated, loaded or received state values should be immediately sent into a channel and any part of the program that wants access to these values must subscribe to the channel.

The idea is that we remove state from exposed parts of our program and encapsulate it inside channels instead. Channels clearly show data dependencies and effects, make changes easy to reason about and easy to maintain and otherwise simplify how we modify application state.

Reactive programming supports this underlying principle with an approach that centers on serial and parallel compositions of channels to transform streams of data as they are emitted and merge changes that may occur concurrently or in otherwise intersecting patterns.

Simply put: reactive programming manages asynchronous data flows between sources of data and components that need to react to that data.

A button dependent on two state values

Let’s start with a fundamental task in application programming: setting the isEnabled status of a button.

Imagine you had an “Add to favorites” button in your user-interface that you wanted to enable if (and only if) both of the following are true:

  1. the user is logged in
  2. the file selection is non-empty

Let’s include a complicating factor: the login status is typically updated on a background thread.

Assuming updates for relevant values are sent using Key-Value-Observing then our view controller might contain the following code:

override func viewDidLoad() {
   super.viewDidLoad()
   
   addToFavoritesButton.isEnabled = loginStatus.isLoggedIn && !folderView.selection.isEmpty
   
   loginStatus.addObserver(self, forKeyPath: #keyPath(LoginStatus.isLoggedIn),
      options: NSKeyValueObservingOptions.new, context: nil)
   folderView.addObserver(self, forKeyPath: #keyPath(FolderView.selection),
      options: NSKeyValueObservingOptions.new, context: nil)
}

override func observeValue(forKeyPath keyPath: String?, of object: Any?,
   change: [NSKeyValueChangeKey : Any]?, context: UnsafeMutableRawPointer?) {
   switch (keyPath, change?[NSKeyValueChangeKey.newKey]) {
   case (.some(#keyPath(LoginStatus.isLoggedIn)), .some(let isLoggedIn as Bool)):
      addToFavoritesButton.isEnabled = isLoggedIn && !folderView.selection.isEmpty
   case (.some(#keyPath(FolderView.selection)), .some(let selection as Array<FileView>)):
      addToFavoritesButton.isEnabled = loginStatus.isLoggedIn && !selection.isEmpty
   default:
      super.observeValue(forKeyPath: keyPath, of: object, change: change, context: context)
   }
}

deinit {
   loginStatus.removeObserver(self, forKeyPath: #keyPath(LoginStatus.isLoggedIn))
   folderView.removeObserver(self, forKeyPath: #keyPath(FolderView.selection))
}

It’s exhausting just to look at; updating the isEnabled status for a single button takes 20 lines of densely written code – and it’s not enough to do things properly.

This code includes a few subtle but common mistakes that occur, not because we’re bad programmers but because we code for the path of least resistance. We fix problems if they’re immediately apparent but if the code mostly works – as this code does – it’s likely to get through testing.

What problems are in this code that could cause headaches later?

  1. Thread unsafe: with the login status updating on a background thread, the observeValue for #keyPath(LoginStatus.isLoggedIn) is memory unsafe when it accesses the folderView.selection.isEmpty value on the wrong thread. This case also updates the addToFavoritesButton.isEnabled illegally on a background thread.
  2. Not possible to make getters and observeValues agree: It’s possible that a login change could occur between the call to loginStatus.isLoggedIn and the respective addObserver calls in the viewDidLoad function. We could change the order of these but then we could get updates before we’re initialized. Similarly, since loginStatus.isLoggedIn is updated on different threads, this getter could end up returning a value from a different point in time compared to the last observeValue update for the value – leading to redundant updates and possibly unexpected state changes (including state that may appear to skip transitions or progress backwards). The only solution involves careful use of NSKeyValueObservingOptions.initial and our own caching of last known values – avoiding the getters entirely.
  3. Using KVO wrong: The most obvious way to use the observeValue method (by switching on the keyPath and/or object) is unreliable. Most observations should be unique, regardless of collisions on keypath or object, so a context object would be better – but it involves more work to allocate and store a context per-observation and then release when done, so we avoid it.
  4. It’s difficult to refactor: We update the addToFavoritesButton.isEnabled value in 3 different places. If another dependency were added to this value, we would need to remember to update all three locations.
  5. No lifecycle notifications: if the loginStatus or folderView objects unexpectedly release, we won’t get a notification and the button could remain inappropriately enabled.

These problems are fixable but proper fixes involve more work on top of what we’ve already written.

In reactive programming, assuming all state values send their changes through reactive programming channels, rather than Key-Value-Observing, you’d only need the following:

override func viewDidLoad() {
   super.viewDidLoad()
   
   self.endpoints += loginStatus.loggedInSignal
      .combineLatest(second: folderView.selectionSignal) { $0 && !$1.isEmpty }
      .subscribe(context: .main) { result in
         self.addToFavoritesButton.isEnabled = result.value ?? false
      }
}
"App scenario - dynamic view properties" appears in the CwlSignal.playground.

The above problems are fixed: this code is completely threadsafe, never issues redundant notifications, doesn’t rely on @objc for key-value-observing and is vastly easier to refactor and maintain.

Maintaining a threadsafe dictionary of values

That was a look at the “subscriber” end of a dependency. Let’s look at the “sender” end by taking a look at a threadsafe dictionary of values.

You might use a dictionary as the “model” in a trivial app. Even if more sophisticated storage than a dictionary is required, the pattern of updating and notifying shown here should be the same in any well written app.

Solving the problem with standard Cocoa classes DispatchQueue and NotificationCenter might look like the following:

class DocumentValues {
   typealias Dict = Dictionary<AnyHashable, Any>
   typealias Tuple = (AnyHashable, Any?)

   static let changed = Notification.Name("com.mycompany.mymodule.documentvalues.changed")
   
   // Underlying storage protected by a `DispatchQueue` mutex
   private var storage = Dict()
   private let mutex = DispatchQueue(label: "")

   init() {}
   
   // Access to the storage involves copying out of the mutex
   var values: Dict {
      return mutex.sync {
         return storage
      }
   }
   
   // Remove a value and send a change notification
   func removeValue(forKey key: AnyHashable) {
      let latest = mutex.sync { () -> Dict in
         storage.removeValue(forKey: key)
         return storage
      }
      NotificationCenter.default.post(name: DocumentValues.changed, object: self,
         userInfo: latest)
   }

   // Create/change a value and send a change notification
   func setValue(_ value: Any, forKey key: AnyHashable) {
      let latest = mutex.sync { () -> Dict in
         storage[key] = value
         return storage
      }
      NotificationCenter.default.post(name: DocumentValues.changed, object: self,
         userInfo: latest)
   }
}

I have been carefully copying values in and out of the mutex and it is memory safe in all cases. The class uses notifications so that other interfaces can receive updates.

What’s the problem?

Let’s look at the key failings, again:

  1. Bad behavior is encouraged: It is easy for another interface to access the current values property but it is additional work to properly observe the DocumentValues.changed notification, so you’re encouraging dependent interfaces to forget to properly observe changes and fall out-of-sync.
  2. There is no safe way to initialize and subscribe: If you get the values then start observing notifications, it’s possible that a change could occur between these two actions (causing you to lose a notification). If you observe notifications first, then get the values, you might get a first notification before you’ve properly initialized. NSKeyValueObservingOptions.initial could fix the problem for KVO but with Notifications, you’d need some clever coding to work around this problem.
  3. Prone to deadlocks: The removeValue function on storage deletes an arbitrary value inside a mutex. If there is a deinit on this deleted value and the deinit tries to change the DocumentValues (re-entering the mutex), you’ve created a deadlock.
  4. It’s difficult to refactor: There’s no single point that all changes to the storage go through. If you need to add functionality in future – like writing DocumentValues to a file on each change – you’d have to carefully integrate this change into multiple places.
  5. No lifecycle notifications: If the DocumentValues object is deleted, it doesn’t notify this, by default.

Many of these are the same or similar problems to the previous example. As before, these problems can be solved through additional careful coding but as before, each solution would require additional code and additional complexity and more than that: you’d need to first realize that the problem exists; due to the subtlety of all of these problems, you might not notice any issues during testing.

An implementation using reactive programming would replace the values property with a channel that emits the current value, followed by future updates, as a stream.

class DocumentValues {
   typealias Dict = Dictionary<AnyHashable, Any>
   typealias Tuple = (AnyHashable, Any?)

   private let input: SignalInput<Tuple>
   
   // Access to the data is via the signal.
   public let signal: SignalMulti<Dict>

   init() {
      // Actual values storage is encapsulated within the signal
      (self.input, self.signal) = Signal<Tuple>.create {
         $0.map(withState: [:]) { (state: inout Dict, update: Tuple) in
            
            // All updates pass through this single, common function.
            switch update {
            case (let key, .some(let value)): state[key] = value
            case (let key, .none): state.removeValue(forKey: key)
            }
            return state
         
         // Convert single `Signal` into multi-subscribable `SignalMulti` with `continuous`
         }.continuous(initial: [:])
      }
   }
   
   func removeValue(forKey key: AnyHashable) {
      input.send(value: (key, nil))
   }
   
   func setValue(_ value: Any, forKey key: AnyHashable) {
      input.send(value: (key, value))
   }
}
"App scenario - threadsafe key-value storage" appears in the CwlSignal.playground..

The code size is not significantly different (27 non-blank, non-comment lines before versus 23 after) but in this case, every problem mentioned above is solved implicitly.

  1. The same work is involved in accessing a value once or subscribing properly so good behavior is encouraged.
  2. If separate handling of initial value and subsequent values is required (e.g. using a capture and subscribe sequence as described in the previous article) the stream is correctly paused so you can’t miss a notification.
  3. Everything is threadsafe (the map closure will never be concurrently invoked and re-entrancy is not possible)
  4. All changes go through the map function and can be coordinated there.
  5. A SignalError.cancelled message is automatically sent to subscribers if input is released.

Associated with being “threadsafe”, notice that there are no longer any mutable variables in the class; state is encapsulated inside the Signal.

Not only is this less code than the previous class but there are far fewer implementation mistakes you could make; code is more declarative and contained and there’s no mutex to carefully administer and work around.

An asynchronous task with a timeout

I originally gave the following code in a previous article, Testing Actions over Time. The code contains a class with a connect function that does two things:

  1. Invokes an underlyingConnect function, which takes a callback and invokes it on completion
  2. Starts a timer, which, if it fires before the underlyingConnect function invokes its completion handler, cancels the underlyingConnect function.

I’ve made the class a little complicated by allowing the user to call connect multiple times on the Service class – possibly while a previous call to connect still has asynchronous tasks outstanding.

public protocol Cancellable: class { func cancel() }
enum ServiceError: Error { case timeout }

class Service {
   typealias ConnectionFunction =
      (DispatchQueue, @escaping (Result<String>) -> ()) -> Cancellable

   let underlyingConnect: ConnectionFunction
   var currentAction: Cancellable? = nil
   let queue = DispatchQueue(label: "\(Service.self)")
   let handler: (Result<String>) -> ()
   
   // Construction of the Service lets us specify the underlying "connect" service
   init(connect: @escaping ConnectionFunction = NetworkService.init,
      handler: @escaping (Result<String>) -> ()) {
      self.underlyingConnect = connect
      self.handler = handler
   }

   // The connect function that we want to test
   func connect(timeout seconds: Double) {
      var previousAction: Cancellable? = nil
      queue.sync {
         previousAction = self.currentAction
         
         // Tie the timer and underlying action together with a single lifetime object for
         // this `connect` action
         let timerAndAction = CancellableTimerAndAction()
         
         // Run the underlying connection
         let underlyingAction = self.underlyingConnect(queue) {
            [weak timerAndAction] result in
            // Cancel the action so no futher callbacks are invoked
            timerAndAction?.cancel()
            
            // Send the succes to the handler
            handler(result)
         }
         
         // Run the timeout timer
         let timer = DispatchSource.singleTimer(interval:
            DispatchTimeInterval.fromSeconds(seconds), queue: queue) {
            [weak timerAndAction] in
            // Cancel the action so no futher callbacks are invoked
            timerAndAction?.cancel()
            
            // Send the timeout to the handler
            handler(.failure(ServiceError.timeout))
         } as? DispatchSource
         
         // Store everything in the lifetime object for this action and then store that
         // in the parent
         timerAndAction.timer = timer
         timerAndAction.action = underlyingAction
         self.currentAction = timerAndAction
      }
      
      // A good rule of thumb: never release lifetime objects inside a mutex – you might
      // trigger a re-entrancy deadlock
      withExtendedLifetime(previousAction) {}
   }
}

class CancellableTimerAndAction: Cancellable {
   var timer: Cancellable? = nil
   var action: Cancellable? = nil

   func cancel() {
      timer?.cancel()
      action?.cancel()
   }
}

Now, this code works and as far as I’m aware, there are no bugs. But it’s gigantic given that all it is doing is applying a timeout to an underlying function.

Most of the size is due to careful coding to avoid problems. After the connect function starts the underlyingAction and the timer, it needs to store both in a custom timerAndAction (to tie their lifetimes together). There’s some careful handling of the previousAction which is released outside the queue.sync (to prevent re-entrancy deadlock problems). Both the underlyingAction’s handler closure and the timer’s handler closure need to access the other (to cancel things properly) so there’s some weak reference juggling going on too.

We shouldn’t need to carefully code around so many issues. We want the code smaller. We want it simpler.

Let’s try the whole thing with reactive programming.

class Service {
   private let input: SignalInput<DispatchTimeInterval>
   
   // Instead of "handler" callbacks, output is now via this signal
   let signal: SignalMulti<Result<String>>
   
   init(connect: @escaping () -> Signal<String>) {
      (self.input, self.signal) = Signal<DispatchTimeInterval>.create { s in
         // Return results only from the latest connection attempt
         Signal<Result<String>>.switchLatest(
           // Each time we receive a timeout duration, convert it into a connection attempt
           s.map { i in connect().timeout(interval: i, resetOnValue: false).materialize() }
         ).multicast()
      }
   }

   func connect(seconds: Double) {
      input.send(value: .fromSeconds(seconds))
   }
}
"Parallel composition - operators" appears in the CwlSignal.playground.

The difference is astounding; it doesn’t even resemble the original class. However, this class does the same thing, just over reactive programming channels, rather than over callbacks with state and mutexes. With reactive programming, all of the threading and lifetime management that caused a huge burden for the previous implementation are implicit – they’re still there but they’re just managed automatically.

Getting comfortable with common reactive programming operators like switchLatest and materialize can take a litle time. They should all be documented in CwlSignal via Xcode quick help but they are also largely the same as the ReactiveX implementations so you can also consult that documentation to gain additional insight.

In case you think using a built-in function for the timeout is cheating in this comparison, it would be simple to replace that one line with:

let timer = Signal<()>.timer(interval: .seconds(10))
return connect().combine(second: timer) { eitherSignal, next in
   switch eitherSignal {
   case .result1(let r): next.send(result: r)
   case .result2: next.send(error: MyErrors.timeout)
   }
}.materialize()

It’s a little more verbose but still not complicated.

Conclusion

Reactive programming changes how data is stored, how it flows through your program and how the elements of your program are connected. The result is significant improvement across the following categories:

  • Thread safety
  • Coordinating concurrent asynchronous tasks
  • Loose coupling of components
  • Data dependencies

The biggest advantage comes when you realize that in applying a solution to just one of these problems, you’ve gained a solution to the other three for free.

For situations where the code already addresses these issues properly, reactive programming can deliver significant lines-of-code savings.

The end result is code that is easier to write and easier to maintain.

Looking forward…

There’s a lot of carefully written code in the CwlSignal library. Even if – despite all the discussion in this article – you’re not interested in reactive programming, there’s lots to learn about how CwlSignal solves problems internally like avoiding re-entrancy and integrating with Key-Value-Observing.

Appendix: A little bit of history

Even though the terms “reactive” and “reactive systems” have been in use in programming at least since Alan Kay’s 1969 paper The Reactive Engine, modern use of the term “reactive programming” really refers to ideas started in Conal Elliot and Paul Hudak’s 1997 paper Function Reactive Animation.

Functional reactive animation, later called functional reactive programming (FRP) is about “behaviors” (continuous, time-dependent equations like those describing the position of a moving object) and how they change and interact with “events” (one-off occurrences at an instantaneous time).

All of this might have been irrelevant to imperative programming languages since pull-driven, lazily evaluated “behaviors” (the primary output of functional reactive programming) are uninteresting for most imperative programming which can use push-driven stateful calculations to produce the same results in a simpler way.

It turns out, however, that the patterns that functional reactive programming provides for handling, transforming and responding to “events” (side effects that functional reactive programming tries to minimize) easily carry across to imperative languages and remain similarly useful for managing and responding to streams of discrete events – even when those streams are predominantly push driven, as in imperative programming, rather than lazily evaluated pull driven, as is typical in functional languages.

Thus we get “reactive programming” (note the absence of “functional”) which focuses on emitting, transforming and subscribing to streams of events.

The biggest difference between functional reactive and imperative reactive is ultimately “time”. In functional reactive programming, time is a parameter passed into each calculation. In imperative reactive programming, time may be pulled from the context but the actual time is usually less important than the order in which relevant mutexes are acquired. This means that functional reactive programming operates a bit more like a simulation (“if two objects should happen to interact in this way at this time…”) whereas reactive programming is more like a simple ordering of blocks into a sequence by the order that they happen to be processed. One of the effect of this is that functional reactive programming can handle two events that are literally simultaneous whereas imperative reactive programming has no such notion of simultaneity.

While there may have been prior implementations of reactive programming in an imperative languages, the first popular implementation is the Reactive Extensions (Rx) for .NET which was first released in late 2009.

The Reactive Extensions largely mimic the .NET framework’s IObservable as the primary metaphor. Along with integration of IDisposable and ideas from IEnumerable and LINQ. Many implementations of reactive programming on imperative platforms continue to employ terminology like “Observable” and “Disposable” for the core communication and lifetime protocols, respectively, revealing their lineage from the Reactive Extensions.