Intro to streams: control-flow and data-flow in Luau
This is (part 1 of) a conceptual intro to Stream.luau
, a (very new) library I wrote for managing the complexity of event-driven programming in Roblox (with types!).
It is directly inspired by Rx (Nevermore), which itself is a port of RxJS to lua (also see the standalone version by Anaminus). See the “IAQ” (immediately anticipated questions) section at the end for comparisons to observables and Rx. For now we will not assume prior familiarity with observables.
What is a Stream?
It’s a function (there are no classes in this library - just typed-functions), which takes a listener: (T...) -> ()
(a function that can be fed arguments of the appropriate type) and returns a CleanupTask. A CleanupTask is usually something like an RBXScriptConnection
, and cleaning it means to disconnect it.
export type CleanupTask = -- omitted
export type Stream<T...> = (listener: (T...) -> ()) -> CleanupTask
-- example: `Stream<number,string>` is the type `(listener: (number, string) -> ()) -> CleanupTask`
When a stream : Stream<number>
is called with a listener
, it can call listener(x)
with any number x
whenever it likes (synchronously or asynchronously), but it must cancel all future calls once it’s CleanupTask
is “cleaned”.
Here is a toy-example to illustrate (we use the terminology “emits x
” when the stream calls the listener with x
).
local function myStream(listener: (number) -> ()): CleanupTask
-- Emits synchronously
listener(1)
listener(2)
-- Emits asynchronously
local value = 3
local thread = task.spawn(function()
while true do
task.wait(1)
listener(value)
value += 1
end
end)
-- This is the cleanup - `clean(thread)` will stop this stream.
return thread
end
local cleanup = myStream(print) -- prints 1 and 2 immediately
-- will print 3,4,5,6... every second until cleanup is called
task.wait(4.5)
clean(cleanup)
Streams are about control-flow and data-flow. By listening to a stream, i.e. giving it a callback/behaviour, you yield control to the stream to decide when and with-what-data that behaviour is executed. In this sense, they can be thought of as a common generalisation of for-loops and events, which both provide data from some source, and execute a behaviour synchronously (for-loops) or asynchronously (events).
But do not be mistaken, they are strictly more powerful than either concept!
The reason is that setting up control-flow (connecting to events/streams), while managing the timely clean up of all connections, can explode in complexity. You will do one of the following:
- Grit your teeth and pollute your code with layers of housekeeping logic
- Half-grit your teeth and implement the correct control flow, but with memory leaks (not disconnecting connections)
- Fail to implement the control flow correctly (slack-jawed?)
The first cause of complexity, which we will explore in this post, is the need to connect to a dynamically-defined event (Spoiler: it is solved by switchMap
!).
Dynamically defined events
Suppose the local player’s character-model has a “Damage” attribute which we want to display on the screen. A first approximation might look like:
local function updateDamageGui(damage: number?)
-- do stuff
-- ...
-- ...
end
updateDamageGui(Players.LocalPlayer.Character:GetAttribute("Damage"))
local connection = Players.LocalPlayer.Character:GetAttributeChangedSignal("Damage"):Connect(function()
updateDamageGui(Players.LocalPlayer.Character:GetAttribute("Damage"))
end)
-- disconnect the connection when done displaying
If only it was that simple! Problems:
Players.LocalPlayer.Character
might be nil (and will be on startup), and therefore we do not have static access to the “PropertyChanged” event - it is dependent on the character instance, so it is a dynamically defined event.- The call to
updateDamageGui
has been duplicated, hence the abstraction into a function. Problem 2 is not so-bad here, but, as we’ll see, can worsen arbitrarily when behaviour is scattered amongst a mess of housekeeping logic.
Problem 1 is terrible. Let’s chuck a :Wait()
on it!
local character = Players.LocalPlayer.Character or Players.LocalPlayer.CharacterAdded:Wait()
updateDamageGui(character:GetAttribute("Damage"))
local connection = character:GetAttributeChangedSignal("Damage"):Connect(function()
updateDamageGui(character:GetAttribute("Damage"))
end)
Oh no, our damage stops updating after our character respawns, because Players.LocalPlayer.Character
is a different instance now!
Let’s jump ahead to a fully-correct implementation. Grit your teeth!
local attributeConnection = nil
local function handleCharacter(character: Model?)
if character == nil then
updateDamageGui(nil)
return
end
updateDamageGui(character:GetAttribute("Damage"))
if attributeConnection ~= nil then
attributeConnection:Disconnect()
end
attributeConnection = character:GetAttributeChangedSignal("Damage"):Connect(function()
updateDamageGui(character:GetAttribute("Damage"))
end)
end
handleCharacter(Players.LocalPlayer.Character)
local characterConnection = Players.LocalPlayer:GetPropertyChangedSignal("Character"):Connect(function()
handleCharacter(Players.LocalPlayer.Character)
end)
-- Disconnect attributeConnection (if not nil!) and characterConnection when done displaying
🤮 Problems:
- This is a mess: our important behaviour is getting harder to locate
- We are polluting our local variables with housekeeping variables, which are obtaining more complex names to distinguish themselves.
- When we are done with this behaviour, we have to disconnect our connections in a specific way (
attributeConnection
if it’s not nil and thencharacterConnection
). If the nature of the behaviour changes, we must maintain the cleanup logic accordingly, which likely exists in another location.
Let’s start by separating the behaviour from the housekeeping, and encapsulate the cleanup logic.
local function onDamageAttribute(callback: (number?) -> ()): CleanupTask
-- We have replaced `updateDamageGui` with `callback`.
local attributeConnection = nil
local function handleCharacter(character: Model?)
if character == nil then
callback(nil)
return
end
callback(character:GetAttribute("Damage"))
if attributeConnection ~= nil then
attributeConnection:Disconnect()
end
attributeConnection = character:GetAttributeChangedSignal("Damage"):Connect(function()
callback(character:GetAttribute("Damage"))
end)
end
handleCharacter(Players.LocalPlayer.Character)
local characterConnection = Players.LocalPlayer:GetPropertyChangedSignal("Character"):Connect(function()
handleCharacter(Players.LocalPlayer.Character)
end)
-- Wrapped cleanup logic (made idempotent)
local cleanup = function()
if attributeConnection ~= nil then
attributeConnection:Disconnect()
attributeConnection = nil
end
if characterConnection ~= nil then
characterConnection:Disconnect()
characterConnection = nil
end
end
return cleanup
end
local cleanup = onDamageAttribute(updateDamageGui)
-- call `clean(cleanup)` when done displaying
🤔
We have separated our housekeeping logic from our behaviour, and encapsulated the cleanup logic.
Hiding/moving complexity is not virtuous in itself, but we will find that we can
decompose onDamageAttribute
into reusable and composable parts, which clearly express the
intended behaviour.
From here on, we start using stream terminology. We see onDamageAttribute
has type Stream<number?> = ((number?) -> ()) -> CleanupTask
, so we’ll call it damageStream
, and instead of callback
, we opt for the term listener
.
Here is the full decomposition of onDamageAttribute
, with nothing omitted (except clean
).
local characterStream: Stream<Model?> = function(listener: (Model?) -> ())
listener(Players.LocalPlayer.Character)
return Players.LocalPlayer:GetPropertyChangedSignal("Character"):Connect(function()
listener(Players.LocalPlayer.Character)
end)
end
-- Make a stream which emits an Attribute of an instance (immediately and on-change)
local function attributeOf(instance: Instance, name: string): Stream<any?>
return function(listener: (any?) -> ())
listener(instance:GetAttribute(name))
return instance:GetAttributeChangedSignal(name):Connect(function()
listener(instance:GetAttribute(name))
end)
end
end
-- A Stream<T?> which just emits nil once (immediately)
local function nilOnce<T>(listener: (T?) -> ()): CleanupTask
listener(nil)
return nil
end
local damageStreamStream: Stream<Stream<number?>> = function(listener: (Stream<number?>) -> ()): CleanUpTask
-- The return here returns the CleanUpTask of the character stream
-- (in this case, a :GetPropertyChangedSignal("Character") connection)
return characterStream(function(character)
if character then
local innerStream = attributeOf(character, "Damage")
listener(innerStream)
else
listener(nilOnce)
end
end)
end
-- (this logic is just what switchAll does)
-- Note that `cleanupInner`, `cleanupStream` will be what we previously called `attributeConnection` and `characterConnection`
local damageStream: Stream<number?> = function(listener: (number?) -> ())
local cleanupInner = nil
local cleanupStream = damageStreamStream(function(innerStream: Stream<number?>): ()
clean(cleanupInner)
cleanupInner = nil
cleanupInner = innerStream(listener)
end)
return function()
clean(cleanupInner)
cleanupInner = nil
clean(cleanupStream)
cleanupStream = nil
end
end
local cleanup = damageStream(updateDamageGui)
-- call `clean(cleanup)`
Now we have some simple components here that we can extract into reusable library functions, such as attributeOf
and propertyOf
(a generalisation of characterStream
). What is the best way to decompose/understand the creation of damageStream
from characterStream
via damageStreamStream
?
The answer is switchMap(fn)(characterStream)
, where fn
maps characters to damage streams (or nilOnce
). Internally, switchMap
maps the emitted characters to damage streams, and uses switchAll
to emit from the latest damage stream.
local function switchAll<T...>(stream: Stream<Stream<T...>>): Stream<T...>
return function(listener: (T...) -> ()): CleanupTask
local cleanupInner = nil
local cleanupStream = stream(function(innerStream: Stream<T...>): ()
clean(cleanupInner)
cleanupInner = nil
cleanupInner = innerStream(listener)
end)
return function()
clean(cleanupInner)
cleanupInner = nil
clean(cleanupStream)
cleanupStream = nil
end
end
end
local function switchMap<T...,U...>(fn: (T...) -> Stream<U...>): (Stream<T...>) -> Stream<U...>
return function(source: Stream<T...>): Stream<U...>
return switchAll(function(streamListener: (Stream<U...>) -> ()): CleanupTask
-- Apply fn to every emitted value to get a stream, and give it to the streamListener
return source(function(...: T...): ()
streamListener(fn(...))
end)
end)
end
end
local characterStream: Stream<Model?> = propertyOf(Players.LocalPlayer, "Character")
local function characterToDamageStream(character: Model)
if character then
return attributeOf(character, "Damage")
else
return nilOnce
end
end
local damageStream: Stream<number?> = switchMap(characterToDamageStream)(characterStream)
This kind of combination of switchMap
with attributeOf
or propertyOf
is common enough that we provide a shorthand toAttribute(name): (Stream<Instance?>) -> Stream<any?>
(resp. toProperty
) for it, which allows a terse/idiomatic presentation.
local damageStream = pipe1(
propertyOf(Players.LocalPlayer, "Character"),
toAttribute("Damage")
)
local cleanup = listen(damageStream, updateDamageGui)
Some notes:
- This definition of
damageStream
evaluates totoAttribute("Damage")(propertyOf(Players.LocalPlayer, "Character"))
. Piping is just a way to reverse application syntax, so that stream-transformers are sequenced after the stream. listen
is again just syntax, it just doesdamageStream(updateDamageGui)
- don’t use it if you don’t like it- Since
updateDamageGui
is no longer duplicated, we can simply write it’s contents within the listen call, like this:local cleanup = listen(damageStream, function(damage: number?) -- do stuff -- ... -- ... end)
Okay, what if our requirements evolve, and instead, we have an ObjectValue
pointing at a Player?
, and we want to display their damage? Here’s how it might look using the functions in this library.
local selectedPlayer = Instance.new("ObjectValue")
local damageStream = pipe2(
fromValueBase(selectedPlayer),
toProperty("Character"),
toAttribute("Damage")
)
local cleanup = listen(damageStream, updateDamageGui)
Imagine writing all of the housekeeping logic for this in the first style!
Reflection
In reflection: is this good programming?
The real test is whether working with these abstractions in practice is easier in the long run than working with the original mess.
- Is it flexible and maintainable?
- Is it easy to understand and debug?
- How much overhead does it add - both cognitively and computationally?
I have written Stream.luau
with these things in mind. In particular, streams are just functions, and their internal state is entirely implemented through local variables in their closure. In many cases, it is straightforward to take an abstractly presented stream, such as our damageStream
example, and repeatedly beta-reduce (for every function f(x)
, and expression exp
, replace f(exp)
with the body of f
with x
replaced by exp
) until the code resembles the original barebones implementation (with attributeConnection
and characterConnection
). This is basically reversing the process of abstraction that we followed - and this thought process is useful for keeping your feet on the ground about what you are doing.
I deliberately avoid using abstractions like maids, signals, brios inside this library - because the cleanup logic is not that difficult to write manually! We are writing a library to do housekeeping for the user, we do not need housekeeping help ourselves. I have also done this to try make stepping-through-the-code-debugging a less miserable experience - it’s easier if we’re not venturing into the guts of a maid every other line.
This library of functions is not an API. I am not doing any complex optimisations under the hood to make up for abstraction overhead. Instead, I’ve tried to walk the road of non-pessimism and wrote the dead-simplest code I could to implement the behaviour. I recommend just reading all of the code and changing whatever design decisions you disagree with.
From my experience with Rx, I don’t think this goes without saying: don’t use streams as a universal hammer for every problem, and try not to get too big-brained about fancy ways of combining streams together. It is possible to do some very complicated stream-piping karate to construct the perfect stream to feed into a short listener function, where in-fact you could have just moved more code into your listener function for a more readable result, or just split your problem into two streams. Sometimes the most readable stream is one you define from scratch (it’s just a function!) - you don’t need to build every stream out of stream primitives and transformers. Conversely, you should use the library functions when they do some real, non-trivial work for you, like switchMap
and combineLatest
(or it’s typed interfaces: combine1
, compute2
etc).
We’ve only discussed dynamically defined events, and their solution switchMap
.
Next time we’ll talk about managing lifetimes with listenTidyEach
in the simplest possible way.
Future posts:
- Lifetimes (
listenTidyEach
,eachPlayer
,eachChildOf
, …) combineLatest
(state management)mount
/new
(the bones of a flexible, reactive UI framework in about 400 LOC)- a fruitful relationship with the luau typechecker
IAQ (immediately anticipated questions)
Is this battle-tested?
Nope. I’ve barely used it yet.
How do I install it?
Just copy it: Stream.luau
What’s the difference between streams and observables, and why did you write this if we already have Rx?
In short: streams+listeners are a simplified version of observables+subscribers, where stream(listener)
corresponds to observable:Subscribe(onFire)
. The :Subscribe(onFire, onFail, onComplete)
method constructs a Subscriber
object using the provided functions, and passes it internally to observable._onSuscribe(sub)
. So a stream is just an _onSubscribe
function, and a listener is just an onFire
function.
This library is a (typed) distillation of the core concepts in Rx, Brio and Blend that I personally have found useful for programming in Roblox, which are: connecting to dynamically defined events, binding creation+cleanup or behaviour to lifetimes, declarative instance creation, and reactive state management.
I concluded that I could achieve all of this without subscribers having an onFail
and onComplete
, which, in Rx, spend most of their time being passed around, while onFire
is where all the interesting stuff happens. Using onFail
and onComplete
makes observables more comparable to promises, which you may find beneficial.
The removal of onFail
and onComplete
causes many simplifications - there is no need for a subscriber to have state, so it’s essentially just a wrapper around the onFire
callback-function - therefore why not make it just a function? The choice to make the observable/stream object itself a function, rather than a class-object, is less essential but has the following motivations:
- I originally just wanted to write a typed version of Rx, because my most frequent and annoying bugs are all type errors (usually not handling nil). Typing for OOP classes/objects in Luau is not-so-well supported - there are multiple ways to achieve it, but there are tradeoffs and complications with each. On the other-hand, typing for function inputs/outputs is dead simple and quite reliable.
- In most contexts I don’t like to think of streams as “objects” or “data”. Listening to a stream is closer to using a compiler macro that inserts the listener logic between the housekeeping logic. Why introduce an object for this?
- I would like to claim that not allocating a table for every stream (like observables/maids/brios) is an advantage for streams over observables, but I lack the proper knowledge of luau to back this up (nor have I tested it - yet).
- Step-through-the-code debugging is important to me, not just for fixing bugs but understanding what my code is actually doing (how much non-essential busywork is it doing?). So reducing non-fundamental operations like entering an object constructor or entering a class method just to call the real-function stored in the object, makes a big difference to that experience.
A compromise, that you may not like to give up, is that streams are less inspectable at runtime - the best you can do is typeof(thing) == "function"
, and it’s not possible to distinguish a stream from any other function. You may have a programming style that necessitates object inspection like this, in which case you could go ahead and refactor everything to use a constructor like this.
export type Stream<T...> = { ClassName: "Stream", onListen: ((T...) -> ()) -> CleanupTask }
local function newStream<T...>(onListen: ((T...) -> ()) -> CleanupTask): Stream<T...>
return { ClassName = "Stream", onListen = onListen }
end
-- then replace every `stream(listener)` with `stream.onListen(listener)`
-- Alternatively (up to you to give a type for the output)
local function newStream<T...>(onListen: ((T...) -> ()) -> CleanupTask)
return setmetatable({ ClassName = "Stream" }, { __call = function(_, ...) return onListen(...) end})
end
For my use-cases (including mount
), I’ve found it sufficient to just assume functions are streams.
Should I use this?
If you want a third-party-maintainer to promise you a quality developer-experience and provide versioned-updates, you probably shouldn’t.
I will probably update the gist a few times in the coming weeks, and eventually integrate it into my own work, but Stream.luau
aims to be dead-simple in implementation, so that you can debug and fix problems yourself.
You can also chop it up if you don’t like that it’s all dumped in one file, and make whatever changes you think will make it a better fit for your specific project or programming style. If there are useful transformers in RxMarbles or ReactiveX you find useful, just add them (but remember there’s no fail/complete - unless you add them :P).
-Billy