Options
All
  • Public
  • Public/Protected
  • All
Menu

Class Bus<V>

An EventStream that allows you to push values into the stream.

It also allows plugging other streams into the Bus, as inputs. The Bus practically merges all plugged-in streams and the values pushed using the push method.

Type parameters

  • V

Hierarchy

Index

Constructors

constructor

  • new Bus(): Bus

Properties

desc

desc: Desc

Contains a structured version of what toString returns. The structured description is an object that contains the fields context, method and args. For example, for Bacon.fromArray([1,2,3]).desc you'd get

{ context: "Bacon", method: "fromArray", args: [[1,2,3]] }

id

id: number = ++idCounter

Unique numeric id of this Observable. Implemented using a simple counter starting from 1.

Methods

awaiting

  • awaiting(other: Observable<any>): Property<boolean>
  • Creates a Property that indicates whether observable is awaiting otherObservable, i.e. has produced a value after the latest value from otherObservable. This is handy for keeping track whether we are currently awaiting an AJAX response:

    var showAjaxIndicator = ajaxRequest.awaiting(ajaxResponse)

    Parameters

    • other: Observable<any>

    Returns Property<boolean>

bufferWithCount

  • Buffers stream events with given count. The buffer is flushed when it contains the given number of elements or the source stream ends.

    So, if you buffer a stream of [1, 2, 3, 4, 5] with count 2, you'll get output events with values [1, 2], [3, 4] and [5].

    Parameters

    • count: number

    Returns EventStream<V[]>

bufferWithTime

  • Buffers stream events with given delay. The buffer is flushed at most once in the given interval. So, if your input contains [1,2,3,4,5,6,7], then you might get two events containing [1,2,3,4] and [5,6,7] respectively, given that the flush occurs between numbers 4 and 5.

    Also works with a given "defer-function" instead of a delay. Here's a simple example, which is equivalent to stream.bufferWithTime(10):

    stream.bufferWithTime(function(f) { setTimeout(f, 10) })

    Parameters

    Returns EventStream<V[]>

bufferWithTimeOrCount

  • Buffers stream events and flushes when either the buffer contains the given number elements or the given amount of milliseconds has passed since last buffered event.

    Parameters

    • Optional delay: number | DelayFunction

      in milliseconds or as a function

    • Optional count: undefined | number

      maximum buffer size

    Returns EventStream<V[]>

bufferingThrottle

  • bufferingThrottle(minimumInterval: number): this
  • Throttles the observable using a buffer so that at most one value event in minimumInterval is issued. Unlike throttle, it doesn't discard the excessive events but buffers them instead, outputting them with a rate of at most one value per minimumInterval.

    Example:

    var throttled = source.bufferingThrottle(2)
    source:    asdf----asdf----
    throttled: a-s-d-f-a-s-d-f-

    Parameters

    • minimumInterval: number

    Returns this

changes

combine

  • Combines the latest values of the two streams or properties using a two-arg function. Similarly to scan, you can use a method name instead, so you could do a.combine(b, ".concat") for two properties with array value. The result is a Property.

    Type parameters

    • V2

    • R

    Parameters

    • right: Observable<V2>
    • f: Function2<V, V2, R>

    Returns Property<R>

concat

  • concat(other: Observable<V>, options?: EventStreamOptions): EventStream<V>
  • concat<V2>(other: Observable<V2>, options?: EventStreamOptions): EventStream<V | V2>
  • Concatenates two streams/properties into one stream/property so that it will deliver events from this observable until it ends and then deliver events from other. This means too that events from other, occurring before the end of this observable will not be included in the result stream/property.

    Parameters

    • other: Observable<V>
    • Optional options: EventStreamOptions

    Returns EventStream<V>

  • Type parameters

    • V2

    Parameters

    • other: Observable<V2>
    • Optional options: EventStreamOptions

    Returns EventStream<V | V2>

debounce

  • debounce(minimumInterval: number): this
  • Throttles stream/property by given amount of milliseconds, but so that event is only emitted after the given "quiet period". Does not affect emitting the initial value of a Property. The difference of throttle and debounce is the same as it is in the same methods in jQuery.

    Example:

    source:             asdf----asdf----
    source.debounce(2): -----f-------f--

    Parameters

    • minimumInterval: number

    Returns this

debounceImmediate

  • debounceImmediate(minimumInterval: number): this
  • Passes the first event in the stream through, but after that, only passes events after a given number of milliseconds have passed since previous output.

    Example:

    source:                      asdf----asdf----
    source.debounceImmediate(2): a-d-----a-d-----

    Parameters

    • minimumInterval: number

    Returns this

decode

  • Decodes input using the given mapping. Is a bit like a switch-case or the decode function in Oracle SQL. For example, the following would map the value 1 into the string "mike" and the value 2 into the value of the who property.

    property.decode({1 : "mike", 2 : who})

    This is actually based on combineTemplate so you can compose static and dynamic data quite freely, as in

    property.decode({1 : { type: "mike" }, 2 : { type: "other", whoThen : who }})

    The return value of decode is always a Property.

    Type parameters

    • T: Record<any, any>

    Parameters

    • cases: T

    Returns Property<DecodedValueOf<T>>

delay

  • delay(delayMs: number): this
  • Delays the stream/property by given amount of milliseconds. Does not delay the initial value of a Property.

    var delayed = source.delay(2)
    source:    asdf----asdf----
    delayed:   --asdf----asdf--

    Parameters

    • delayMs: number

    Returns this

deps

  • deps(): Observable<any>[]
  • Returns the an array of dependencies that the Observable has. For instance, for a.map(function() {}).deps(), would return [a]. This method returns the "visible" dependencies only, skipping internal details. This method is thus suitable for visualization tools. Internally, many combinator functions depend on other combinators to create intermediate Observables that the result will actually depend on. The deps method will skip these internal dependencies. See also: internalDeps

    Returns Observable<any>[]

diff

  • Returns a Property that represents the result of a comparison between the previous and current value of the Observable. For the initial value of the Observable, the previous value will be the given start.

    Example:

    var distance = function (a,b) { return a - b }
    Bacon.sequentially(1, [1,2,3]).diff(0, distance)

    This would result to following elements in the result stream:

    0 - 1 = -1 1 - 2 = -1 2 - 3 = -1

    Type parameters

    • V2

    Parameters

    Returns Property<V2>

doAction

  • Returns a stream/property where the function f is executed for each value, before dispatching to subscribers. This is useful for debugging, but also for stuff like calling the preventDefault() method for events. In fact, you can also use a property-extractor string instead of a function, as in ".preventDefault".

    Please note that for Properties, it's not guaranteed that the function will be called exactly once per event; when a Property loses all of its subscribers it will re-emit its current value when a new subscriber is added.

    Parameters

    Returns this

doEnd

doError

  • Returns a stream/property where the function f is executed for each error, before dispatching to subscribers. That is, same as doAction but for errors.

    Parameters

    Returns this

doLog

  • doLog(...args: any[]): this
  • Logs each value of the Observable to the console. doLog() behaves like log but does not subscribe to the event stream. You can think of doLog() as a logger function that – unlike log() – is safe to use in production. doLog() is safe, because it does not cause the same surprising side-effects as log() does.

    Parameters

    • Rest ...args: any[]

    Returns this

end

  • Ends the stream. Sends an End event to all subscribers. After this call, there'll be no more events to the subscribers. Also, the push, error and plug methods have no effect.

    Returns Reply

endAsValue

  • endAsValue(): Observable<{}>

endOnError

  • endOnError(predicate?: Predicate<any>): this
  • Returns a stream/property that ends the on first Error event. The error is included in the output of the returned Observable.

    Parameters

    • Default value predicate: Predicate<any> = x => true

      optional predicate function to determine whether to end on a given error

    Returns this

error

  • error(error: any): Reply
  • Pushes an error to this stream.

    Parameters

    • error: any

    Returns Reply

errors

  • errors(): this

filter

  • Filters values using given predicate function. Instead of a function, you can use a constant value (true to include all, false to exclude all).

    You can also filter values based on the value of a property. Event will be included in output if and only if the property holds true at the time of the event.

    Parameters

    Returns this

first

  • first(): this

firstToPromise

  • firstToPromise(PromiseCtr?: Function): Promise<V>
  • Returns a Promise which will be resolved with the first event coming from an Observable. Like toPromise, the global ES6 promise implementation will be used unless a promise constructor is given.

    Parameters

    • Optional PromiseCtr: Function

    Returns Promise<V>

flatMap

  • For each element in the source stream, spawn a new stream/property using the function f. Collect events from each of the spawned streams into the result stream/property. Note that instead of a function, you can provide a stream/property too. Also, the return value of function f can be either an Observable (stream/property) or a constant value.

    stream.flatMap() can be used conveniently with Bacon.once() and Bacon.never() for converting and filtering at the same time, including only some of the results.

    Example - converting strings to integers, skipping empty values:

    stream.flatMap(function(text) {
    return (text != "") ? parseInt(text) : Bacon.never()
    })

    Type parameters

    • V2

    Parameters

    Returns EventStream<V2>

flatMapConcat

flatMapError

flatMapEvent

flatMapFirst

flatMapLatest

flatMapWithConcurrencyLimit

flatScan

  • Scans stream with given seed value and accumulator function, resulting to a Property. Difference to scan is that the function f can return an EventStream or a Property instead of a pure value, meaning that you can use flatScan for asynchronous updates of state. It serializes updates so that that the next update will be queued until the previous one has completed.

    Type parameters

    • V2

      state and result type

    Parameters

    • seed: V2

      initial value to start with

    • f: Function2<V2, V, Observable<V2>>

      transition function from previous state and new value to next state

    Returns Property<V2>

fold

forEach

  • An alias for onValue.

    Subscribes a given handler function to the observable. Function will be called for each new value (not for errors or stream end).

    Parameters

    • Default value f: Sink<V> = nullSink

    Returns Unsub

groupBy

  • Groups stream events to new streams by keyF. Optional limitF can be provided to limit grouped stream life. Stream transformed by limitF is passed on if provided. limitF gets grouped stream and the original event causing the stream to start as parameters.

    Calculator for grouped consecutive values until group is cancelled:

    var events = [
    {id: 1, type: "add", val: 3 },
    {id: 2, type: "add", val: -1 },
    {id: 1, type: "add", val: 2 },
    {id: 2, type: "cancel"},
    {id: 3, type: "add", val: 2 },
    {id: 3, type: "cancel"},
    {id: 1, type: "add", val: 1 },
    {id: 1, type: "add", val: 2 },
    {id: 1, type: "cancel"}
    ]
    
    function keyF(event) {
    return event.id
    }
    
    function limitF(groupedStream, groupStartingEvent) {
    var cancel = groupedStream.filter(function(x) { return x.type === "cancel"}).take(1)
    var adds = groupedStream.filter(function(x) { return x.type === "add" })
    return adds.takeUntil(cancel).map(".val")
    }
    
    Bacon.sequentially(2, events)
    .groupBy(keyF, limitF)
    .flatMap(function(groupedStream) {
    return groupedStream.fold(0, function(acc, x) { return acc + x })
    })
    .onValue(function(sum) {
    console.log(sum)
    // returns [-1, 2, 8] in an order
    })

    Type parameters

    • V2

    Parameters

    Returns EventStream<EventStream<V2>>

holdWhen

inspect

  • inspect(): string

internalDeps

  • internalDeps(): any[]
  • Returns the true dependencies of the observable, including the intermediate "hidden" Observables. This method is for Bacon.js internal purposes but could be useful for debugging/analysis tools as well. See also: deps

    Returns any[]

last

  • last(): this
  • Takes the last element from the stream. None, if stream is empty.

    Note:* neverEndingStream.last() creates the stream which doesn't produce any events and never ends.

    Returns this

log

  • log(...args: any[]): this
  • Logs each value of the Observable to the console. It optionally takes arguments to pass to console.log() alongside each value. To assist with chaining, it returns the original Observable. Note that as a side-effect, the observable will have a constant listener and will not be garbage-collected. So, use this for debugging only and remove from production code. For example:

    myStream.log("New event in myStream")

    or just

    myStream.log()

    Parameters

    • Rest ...args: any[]

    Returns this

map

  • Maps values using given function, returning a new stream/property. Instead of a function, you can also provide a Property, in which case each element in the source stream will be mapped to the current value of the given property.

    Type parameters

    • V2

    Parameters

    Returns EventStream<V2>

  • Maps values using given function, returning a new stream/property. Instead of a function, you can also provide a Property, in which case each element in the source stream will be mapped to the current value of the given property.

    Type parameters

    • V2

    Parameters

    Returns EventStream<V2>

mapEnd

  • Adds an extra Next event just before End. The value is created by calling the given function when the source stream ends. Instead of a function, a static value can be used.

    Parameters

    Returns this

mapError

  • Maps errors using given function. More specifically, feeds the "error" field of the error event to the function and produces a Next event based on the return value.

    Parameters

    Returns this

merge

name

  • name(name: string): this
  • Sets the name of the observable. Overrides the default implementation of toString and inspect. Returns the same observable, with mutated name.

    Parameters

    • name: string

    Returns this

not

onEnd

  • Subscribes a callback to stream end. The function will be called when the stream ends. Just like subscribe, this method returns a function for unsubscribing.

    Parameters

    • Default value f: VoidSink = nullVoidSink

    Returns Unsub

onError

  • Subscribes a handler to error events. The function will be called for each error in the stream. Just like subscribe, this method returns a function for unsubscribing.

    Parameters

    • Default value f: Sink<any> = nullSink

    Returns Unsub

onValue

  • Subscribes a given handler function to the observable. Function will be called for each new value. This is the simplest way to assign a side-effect to an observable. The difference to the subscribe method is that the actual stream values are received, instead of Event objects. Just like subscribe, this method returns a function for unsubscribing. stream.onValue and property.onValue behave similarly, except that the latter also pushes the initial value of the property, in case there is one.

    Parameters

    • Default value f: Sink<V> = nullSink

    Returns Unsub

onValues

  • onValues(f: Function): Unsub
  • Like onValue, but splits the value (assuming its an array) as function arguments to f. Only applicable for observables with arrays as values.

    Parameters

    • f: Function

    Returns Unsub

plug

  • plug<V2>(input: Observable<V2>): undefined | (Anonymous function)
  • Plugs the given stream as an input to the Bus. All events from the given stream will be delivered to the subscribers of the Bus. Returns a function that can be used to unplug the same stream.

    The plug method practically allows you to merge in other streams after the creation of the Bus.

    Type parameters

    • V2: V

    Parameters

    • input: Observable<V2>

    Returns undefined | (Anonymous function)

    a function that can be called to "unplug" the source from Bus.

push

  • Pushes a new value to the stream.

    Parameters

    • value: V

    Returns Reply

reduce

sampledBy

scan

  • Scans stream/property with given seed value and accumulator function, resulting to a Property. For example, you might use zero as seed and a "plus" function as the accumulator to create an "integral" property. Instead of a function, you can also supply a method name such as ".concat", in which case this method is called on the accumulator value and the new stream value is used as argument.

    Example:

    var plus = function (a,b) { return a + b }
    Bacon.sequentially(1, [1,2,3]).scan(0, plus)

    This would result to following elements in the result stream:

    seed value = 0 0 + 1 = 1 1 + 2 = 3 3 + 3 = 6

    When applied to a Property as in r = p.scan(seed, f), there's a (hopefully insignificant) catch: The starting value for r depends on whether p has an initial value when scan is applied. If there's no initial value, this works identically to EventStream.scan: the seed will be the initial value of r. However, if r already has a current/initial value x, the seed won't be output as is. Instead, the initial value of r will be f(seed, x). This makes sense, because there can only be 1 initial value for a Property at a time.

    Type parameters

    • V2

    Parameters

    Returns Property<V2>

skip

  • skip(count: number): this

skipDuplicates

  • skipDuplicates(isEqual?: Equals<V>): this
  • Drops consecutive equal elements. So, from [1, 2, 2, 1] you'd get [1, 2, 1]. Uses the === operator for equality checking by default. If the isEqual argument is supplied, checks by calling isEqual(oldValue, newValue). For instance, to do a deep comparison,you can use the isEqual function from underscore.js like stream.skipDuplicates(_.isEqual).

    Parameters

    Returns this

skipErrors

  • skipErrors(): this

skipUntil

  • skipUntil(starter: Observable<any>): this
  • Skips elements from the source, until a value event appears in the given starter stream/property. In other words, starts delivering values from the source after first value appears in starter.

    Parameters

    • starter: Observable<any>

    Returns this

skipWhile

  • Skips elements until the given predicate function returns falsy once, and then lets all events pass through. Instead of a predicate you can also pass in a Property<boolean> to skip elements while the Property holds a truthy value.

    Parameters

    Returns this

slidingWindow

  • slidingWindow(maxValues: number, minValues?: number): Property<V[]>
  • Returns a Property that represents a "sliding window" into the history of the values of the Observable. The result Property will have a value that is an array containing the last n values of the original observable, where n is at most the value of the max argument, and at least the value of the min argument. If the min argument is omitted, there's no lower limit of values.

    For example, if you have a stream s with value a sequence 1 - 2 - 3 - 4 - 5, the respective values in s.slidingWindow(2) would be [] - [1] - [1,2] - [2,3] - [3,4] - [4,5]. The values of s.slidingWindow(2,2)would be [1,2] - [2,3] - [3,4] - [4,5].

    Parameters

    • maxValues: number
    • Default value minValues: number = 0

    Returns Property<V[]>

startWith

subscribe

  • subscribes given handler function to event stream. Function will receive event objects for all new value, end and error events in the stream. The subscribe() call returns a unsubscribe function that you can call to unsubscribe. You can also unsubscribe by returning Bacon.noMore from the handler function as a reply to an Event. stream.subscribe and property.subscribe behave similarly, except that the latter also pushes the initial value of the property, in case there is one.

    Parameters

    • Default value sink: EventSink<V> = nullSink

      the handler function

    Returns Unsub

take

  • take(count: number): this
  • Takes at most n values from the stream and then ends the stream. If the stream has fewer than n values then it is unaffected. Equal to Bacon.never() if n <= 0.

    Parameters

    • count: number

    Returns this

takeUntil

  • takeUntil(stopper: Observable<any>): this
  • Takes elements from source until a value event appears in the other stream. If other stream ends without value, it is ignored.

    Parameters

    • stopper: Observable<any>

    Returns this

takeWhile

  • Takes while given predicate function holds true, and then ends. Alternatively, you can supply a boolean Property to take elements while the Property holds true.

    Parameters

    Returns this

throttle

  • throttle(minimumInterval: number): this
  • Throttles stream/property by given amount of milliseconds. Events are emitted with the minimum interval of delay. The implementation is based on stream.bufferWithTime. Does not affect emitting the initial value of a Property.

    Example:

    var throttled = source.throttle(2)
    source:    asdf----asdf----
    throttled: --s--f----s--f--

    Parameters

    • minimumInterval: number

    Returns this

toEventStream

  • toEventStream(): this

toPromise

  • toPromise(PromiseCtr?: Function): Promise<V>
  • Returns a Promise which will be resolved with the last event coming from an Observable. The global ES6 promise implementation will be used unless a promise constructor is given. Use a shim if you need to support legacy browsers or platforms. caniuse promises.

    See also firstToPromise.

    Parameters

    • Optional PromiseCtr: Function

    Returns Promise<V>

toProperty

  • Creates a Property based on the EventStream.

    Without arguments, you'll get a Property without an initial value. The Property will get its first actual value from the stream, and after that it'll always have a current value.

    You can also give an initial value that will be used as the current value until the first value comes from the stream.

    Parameters

    • Optional initValue: V

    Returns Property<V>

toString

  • toString(): string
  • Returns a textual description of the Observable. For instance, Bacon.once(1).map(function() {}).toString() would return "Bacon.once(1).map(function)".

    Returns string

transform

withDesc

  • withDesc(desc?: Desc): this

withDescription

  • withDescription(context: any, method: string, ...args: any[]): this
  • Sets the structured description of the observable. The toString and inspect methods use this data recursively to create a string representation for the observable. This method is probably useful for Bacon core / library / plugin development only.

    For example:

    var src = Bacon.once(1) var obs = src.map(function(x) { return -x }) console.log(obs.toString()) --> Bacon.once(1).map(function) obs.withDescription(src, "times", -1) console.log(obs.toString()) --> Bacon.once(1).times(-1)

    The method returns the same observable with mutated description.

    Parameters

    • context: any
    • method: string
    • Rest ...args: any[]

    Returns this

withLatestFrom

  • Creates an EventStream/Property by sampling a given samplee stream/property value at each event from the this stream/property.

    Type parameters

    • V2

      type of values in the samplee

    • R

      type of values in the result

    Parameters

    • samplee: Observable<V2>
    • f: Function2<V, V2, R>

      function to select/calculate the result value based on the value in the source stream and the samplee

    Returns EventStream<R>

withStateMachine

  • withStateMachine<State, Out>(initState: State, f: StateF<V, State, Out>): EventStream<Out>
  • Lets you run a state machine on an observable. Give it an initial state object and a state transformation function that processes each incoming event and returns an array containing the next state and an array of output events. Here's an example where we calculate the total sum of all numbers in the stream and output the value on stream end:

    Bacon.fromArray([1,2,3])
    .withStateMachine(0, function(sum, event) {
    if (event.hasValue)
    return [sum + event.value, []]
    else if (event.isEnd)
    return [undefined, [new Bacon.Next(sum), event]]
    else
    return [sum, [event]]
    })

    Type parameters

    • State

      type of machine state

    • Out

      type of values to be emitted

    Parameters

    • initState: State

      initial state for the state machine

    • f: StateF<V, State, Out>

      the function that defines the state machine

    Returns EventStream<Out>

zip

  • Returns an EventStream with elements pair-wise lined up with events from this and the other EventStream or Property. A zipped stream will publish only when it has a value from each source and will only produce values up to when any single source ends.

    The given function f is used to create the result value from value in the two sources. If no function is given, the values are zipped into an array.

    Be careful not to have too much "drift" between streams. If one stream produces many more values than some other excessive buffering will occur inside the zipped observable.

    Example 1:

    var x = Bacon.fromArray([1, 2])
    var y = Bacon.fromArray([3, 4])
    x.zip(y, function(x, y) { return x + y })
    
    # produces values 4, 6

    See also zipWith and zipAsArray for zipping more than 2 sources.

    Type parameters

    • V2

    • R

    Parameters

    • other: Observable<V2>
    • f: Function2<V, V2, R>

    Returns EventStream<R>

Generated using TypeDoc