성장, 그리고 노력

부족하더라도 어제보다 더 잘해지자. 노력은 절대 배신하지 않는다.

RxJS

다시 보는 RxJS - 기본편 + Observable 만들기

제이콥(JACOB) 2020. 8. 10. 04:55

 이전 회사에서 RxJS를 직접 사용하여 프로젝트를 진행했었다. 하지만, 그때는 경험이 적어 솔직히 많은 이해를 하지 못했던 거 같다. 

다음 회사에서는 또 RxJS를 사용하지 않아 그나마 알던 사실도 까먹었다.

 

 몇일 전 우연히 사내 발표를 하게 되었다. 주제를 고민하던 중 RxJS가 눈에 들어왔다. 다시 RxJS를 보니 새로웠다. 이해가 예전보다 많이 된다고 해야하나... 처음 봤을 때는 어렵다고만 느꼈고, 외우기 급급했지만, 지금 보니 정말 멋진 컨셉과 매력이 넘치는 기술이었다. 

출처: npmjs

 그리고 npm에서 다운로드 횟수를 보니, 작년에 내가 사용할 때보다 더 많은 사람이 다운로드 받고 사용하고 있었다. 

 

 발표 주제는 바로 이거다. 다시 한번 RxJS를 공부해 보자.

 


먼저 RxJS에 대한 이야기를 하기전에 promise와 함수형 프로그래밍(FP)을 다시 짚어보자.

Promise

 비동기 작업을 할때 흔히 promise를 사용한다. promise는 비동기와 장기 실행 연산  그리고 결과나 에러를 구독할 수 있는 미래 값을 래핑 한 데이터 유형으로 구독자가 결괏값을 받게 될 때 프로세스가 완료된 것으로 간주된다. promise가 실행된 후에는 값을 변경할 수 없으므로 불변 타입이다.

 하지만 프라미스의 경우 마우스 움직임이나 파일 스트림의 바이트 시퀀스처럼 둘 이상의 값을 생성하는 데이터 소스를 처리할 수 없다. 또한 에러 발생시 재실행하는 기능 또한 가지고 있지 않으며, 불변이기 때문에 취소할 수도 없다.

 

함수형 프로그래밍(FP)

 FP는 함수로 선언적이고 불변하며 부가 작용(side effect)이 없는 프로그램 만들기를 강조하는 소프트웨어 패러다임이다.

 함수형 프로그램의 특징으로는 선언적(declarative), 불변성(immutable), 부가작용이 없음(side effect free), 지연 평가(lazy evaluation) 등이 있다. 


 

RxJS (Reactive Extensions for JavaScript)

 RxJS는 파일 읽기, HTTP 호출, 키 입력, 마우스 움직임 등의 이벤트 소스를 비동기적으로 처리하는 멋진 라이브러리(Library)이다. 콜백이나 promise 기반 라이브러리를 같은 방식으로 대체할 수 있다. 또한 다른 자바스크립트 라이브러리 및 프레임 워크와 함께 사용할 수 있으며, angular의 경우 기본적으로 지원된다.

 

Observable

옵저버블은 옵저버를 인자로 받고, 함수를 반환하는 함수이다.

 스트림(Stream)이라고 들어본 적이 있을 것이다. 스트림은 시간이 지남에 따라 발생하는 일련의 이벤트를 말한다. 그런 스트림에는 항상 데이터 생산자가 있어야 하며, 생성자(Producer)는 데이터 소스이다. 이러한 생산자를 옵저버 패턴에서는 서브젝트(Subject)라고 정의하고, RxJS에서는 관찰할 수 있는 무언가라는 의미로 옵저버블(Observable)이라 부른다.

 생산자는 이벤트 방출에만 관여하고 이벤트 처리에는 관여하지 않는다.

이전 글에서 발췌

Observer

 소비자가 소비할 이벤트에 대해 생 산자를 듣기 시작하면 스트림이 생기게 되고, 이 시점에 스트림은 이벤트 푸시하기 시작한다. 여기서 소비자를 옵저버(Observer)로 부른다. 스트림은 옵저버블에서 옵저버로만 이동하며, 다른 방향으로는 이동하지 않는다.

 기본적으로 RxJS에서 옵저버는 next(), error() 및 complete() 메서드가 있는 인터페이스로 정의된다.

const observer = {
 next: name => console.log(`Hello, ${name}`),
 error: err => console.log(err),
 complete: () => console.log("complete!"),
}

 

Piping

Pipeable 연산자는 Observable을 입력으로 사용하고 다른 Observable을 반환하는 함수입니다. 이것은 순수한 작업이다. 이전 Observable은 수정되지 않은 상태로 유지된다.

 파이프는 RxJS의 연산(데이터 조작이나 편집)이 이루어 지는 곳이다. 실제로 pipe라는 오퍼레이터가 제공되며, pipe 안에서 데이터 조작이 이루어지기 때문에 관심사 분리도 된다. 

  참고로 RxJS 6 이전에서는 점-체이닝(dot-chaining) 방식으로 파이프가 구성되었지만, RxJS 6에서부터는 pipe라는 오퍼레이터가 제공되며, 아래와 같이 사용된다.

import { range } from 'rxjs';
import { map, filter, scan } from 'rxjs/operators';
 
const source$ = range(0, 10);
 
source$.pipe( // Observable에 내장되어 있음
  filter(x => x % 2 === 0),
  map(x => x + x),
  scan((acc, x) => acc + x, 0)
)
.subscribe(x => console.log(x))
 
// Logs:
// 0
// 4
// 12
// 24
// 40

 

스트림을 중지하는 방법

 스트림을 중지하지 않으면 메모리 누수 문제가 발생하기 때문에 이 부분을 고려해야 한다.

  • of(), from() 같은 완성 연산자를 사용한다.
  • complete() 메서드를 호출한다.
  • 옵저버에게 오류를 던저서 스트림을 자동으로 중지시킨다.
  • teardown 함수를 실행한다(unsubscribe)

타입 스크립트로 Observable 혼자 구현해보기

정말 동일하게 구현하는 건 무리지만, 비슷한 느낌으로 구현해볼 수는 있다.

 

간단하게 interface를 선언했다.

export interface Observer {
    next: (value: any) => void,
    error: (err: any) => void,
    complete: () => void;
}

RxJS 기저 중에는 이터레이터 패턴을 따르기 때문에 공통의 인터페이스를 제공하는 것은 꼭 지켜야 한다고 생각이 들었기 때문에 공통 인터페이스부터 정의하였다. (실제 RxJS에서는 옵저버가 항상 저 인터페이스를 제공하는 건 아니고 옵셔널 한 값이다. 일부 콜백이 없더라도 정상적으로 실행된다.)

import { Observer } from './interfaces';

export function Observable(events: any[]) {
    const INTERVAL = 1 * 1000;
    let timerId: number | undefined;

    return {
        subscribe: (observer: Observer) => {
            timerId = setInterval(() => {
                if(events.length === 0) {
                    observer.complete();
                    clearInterval(timerId)
                    timerId = undefined;
                } else {
                    observer.next(events.shift())
                }
            }, INTERVAL);
            return {
                unsubscribe: () => {
                    if(timerId !== undefined ) {
                        clearInterval(timerId)
                    }
                }
            }
        }
    }
}

마지막으로 간단하게 테스트!

import { Observable } from "./core/observable";

console.log("Jacob Playgroud");

Observable([1, 2, 3]).subscribe({
    next: console.log,
    error: (err) => console.log(err),
    complete: () => console.log('Complete!'),
});

여기서 데이터 소스는 [1, 2, 3]이다. 

const observer = {
 next: ...,
 error: ...,
 complete: ...,
}

이런식으로 옵저버 객체를 만들어서 넘겨도 되지만, 위와 같이 그냥 쓰는 경우가 더 많다. 또한 이건 처음에 쫌 헷갈릴 수 있는 부분인데, 실제 RxJS에서는 옵저버의 인터페이스는 옵셔널이기 때문에 옵저버블을 구독할 때 아래와 같이 쓰기도 한다.

Observable([1, 2, 3]).subscribe( (x) => console.log(x) );

그래서 위와 같이 쓴다면 next 함수에 내부적으로 옵저버블의 subscribe()에서 첫 번째 콜백 인자로 next 핸들러를 사용하여 옵저버를 생성한다. 

 

 

simple-test-observable - CodeSandbox

simple-test-observable by JungKyuHyun

codesandbox.io


발표 자료를 만들기 전에 기본적인 내용만 간단하게 다시 정리해 보았다.

작년보다 확실히 이해가 좀 더 많이 되었다.

계속 공부하다 보면 더 많이 이해되겠지...!

 

반응형

'RxJS' 카테고리의 다른 글

RxJS 기초 - 스트림(Stream)  (0) 2020.02.01
왜 RxJS를 사용해야 하는가?  (0) 2020.02.01
[RxJS] Redux-observable - Epic (에픽) + 배경지식  (2) 2019.12.09
[RxJS] Subjects  (0) 2019.12.08