Create Your Own RxJS Operators

In today’s article we will dig deeper into RxJS operators and see how we can create our own.

What is an Operator?

An operator is a function that must return another function. We will call that function an Observable transformer function. It takes in as input an Observable and returns a new Observable. Observable.pipe() can chain a list of operator transformer functions.

Let’s see an example:

import { Observable, of } from 'rxjs'
import { map, count } from 'rxjs/operators'

const o = of(1, 2, 3, 4)
const transformer1 = map(n => 2 * n)
const transformer2 = count()

o.pipe(transformer1, transformer2)
  .subscribe(n => console.log(n)) //Prints 4

This shows how the map and count operators return a transformer function.

So, basically an operator is a wrapper over the transformer function. Most of the magic happens in the transformer function.

Custom Transformer Function

Let’s write our own transformer function that does the same job as count. It will count the number of emissions made by the source Observable. The new Observable emits the total count when the source Observable completes.

function my_count(srcObservable: Observable): Observable {
  return new Observable(subscriber => {
    let counter = 0

    srcObservable.subscribe(_ => ++counter,
      err => subscriber.error(err),
      () => {
        subscriber.next(counter)
        subscriber.complete()
      })

    return () => subscriber.unsubscribe()
  })
}

Now we can use the function from pipe.

const o = of(1, 2, 3, 4)

o.pipe(my_count).subscribe(n => console.log(n))

To make this a proper operator we just need to return the transformer function from a wrapper function.

function my_count_op() : (srcObservable: Observable) => Observable {
  return my_count
}

Use the operator from pipe.

const o = of(1, 2, 3, 4)

o.pipe(my_count_op()).subscribe(n => console.log(n))

Our Own map Operator

With what we know so far we can roll out our own map operator.

function my_map(projection_fn: (src: T_in) => T_out) {
  return (srcObservable: Observable) => {
    return new Observable(subscriber => {
      srcObservable.subscribe((data: T_in) => {
        subscriber.next(projection_fn(data))
      },
      err => subscriber.error(err),
      () => subscriber.complete())
    })
  }
}

Use the map:

const o = of(1, 2, 3, 4)

o.pipe(my_map(n => 2 * n)).subscribe(n => console.log(n))

Create a Composite Operator

If you see yourself piping the same set of operators all the time you can make your life easier by creating a composite operator.

In the example below we create an operator that counts the number of even numbers emitted.

function count_even() {
  return (src:Observable) => src.pipe(
    filter(n => n % 2 == 0), 
    count()
  ) 
}

//Use the operator
of(1, 2, 3, 4)
  .pipe(count_even())
  .subscribe(numEven => console.log(numEven))
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.