6. Subject?

What is Subject?

런타임중(앱 구동중)에 Observable에 값을 추가하여 emit이 발생하게끔 해주는 대리인

공식문서 : Subject는 Observer 및 Observable 역할을 하는 ReactiveX의 일부 구현에서 사용할 수 있는 일종의 브리지 또는 프록시입니다.

공식문서를 봐도 역시나 무슨말인지 모르겠다 그림을 통해 개념을 살펴보자

subject_multicats

Observable과 Subject의 차이점은 두 객체 모두 subscribe이 가능하지만 Subject는 여러 Observer가 subscribe을 할수있는 multicast방식이다

예제코드를 살펴봐야 이해가 될 것 같다

let randomNumGenerator1 = Observable<Int>.create{ observer in
    observer.onNext(Int.random(in: 0 ..< 100))
    return Disposables.create()
}
    
randomNumGenerator1.subscribe(onNext: { (element) in
    print("observer 1 : \(element)")
})
randomNumGenerator1.subscribe(onNext: { (element) in
    print("observer 2 : \(element)")
})

/*  
observer 1 : 54
observer 2 : 69
*/

Observer가 Observable에 대해 독자적인 실행을 갖기 때문에 subscribe을 하는 순간 각각의 Observable이 실행되어 서로 다른값을 방출하게 되는 것이다

이번엔 subject의 multicast 방식을 예제코드로 살펴보자

let randomNumGenerator2 = BehaviorSubject(value: 0)
    randomNumGenerator2.onNext(Int.random(in: 0..<100))
    
randomNumGenerator2.subscribe(onNext: { (element) in
    print("observer subject 1 : \(element)")
})
randomNumGenerator2.subscribe(onNext: { (element) in
    print("observer subject 2 : \(element)")
})

/*
observer subject 1 : 92
observer subject 2 : 92
*/

Subject는 하나의 Observable실행이 여러 Observser에게 공유되는 방식이기 때문에 같은 값이 출력되는걸 확인할 수 있다

observable과 subject의 차이를 정리하자면 이렇다

observable_vs_subject

Subject의 종류

1. PublishSubject

  • empty상태로 시작
  • 새로운 이벤트만 subscriber에게 emit
  • subscribe된 시점 이후부터 발생한 이벤트를 방출한다
  • 구독된 순간 새로운 이벤트 수신을 알리고 싶을때 사용

publishsubject

에러가 발생하면 에러도 전달함

publishsubject_error

example code

let subject = PublishSubject<String>()
subject.onNext("Is anyone listening?")

let subscriptionOne = subject.subscribe(onNext: { string in
     print(string)
})

subject.on(.next("1"))
subject.onNext("2") //이벤트를 추가하는 두가지 메소드

let subscriptionTwo = subject.subscribe({ (event) in
  print("2)", event.element ?? event)
})

subject.onNext("3")

subscriptionOne.dispose()

subject.onNext("4")

subject.onCompleted()  
subject.onNext("5")   // completed 됐기 떄문에 출력되지 않음
subscriptionTwo.dispose()   
let disposeBag = DisposeBag()

subject.subscribe {   // 종료된 subject를 구독했기 때문에 completed만 방출됨
  print("3)", $0.element ?? $0)
}
.disposed(by: disposeBag)

subject.onNext("?") // dispose 됐기 떄문에 출력되지 않음

/*
1
2
3
2) 3
2) 4
2) completed
3) completed
*/

When we use PublishSubject?

  • 시간에 민감한 데이터를 모델링 할 경우
    • 10시에 알림을 보내야 하는데 10:01에 접속했을때 알림이 보내질 필요가 없는 경우

2. BehaviorSubject

  • 하나의 초기값으로 시작
  • publishSubject와 유사하지만 최신값만 새로운 subscriber에게 emit
  • subscribe가 발생하면 즉시 현재 저장된 값을 emit
  • 마지막 이벤트값을 저장하고 싶을때 이용한다

behaviorsubject

에러가 발생하면 에러도 전달함

behaviosubject_error

example code


enum MyError: Error {
    case anError
}

let subject = BehaviorSubject(value: "Initial value")
// BehaviorSubject는 초기값이 필수 이므로 초기화값을 삽입
let disposeBag = DisposeBag()

subject.onNext("X")
subject
  .subscribe {
    print(label: "1) ", event: $0)
  }
  .disposed(by: disposeBag)

subject.onError(MyError.anError)

subject
  .subscribe {
    print(label: "2) ", event: $0)
  }
  .disposed(by: disposeBag)

/*
1) X
1) error(anError)
2) error(anError)
*/

When we use BehaviorSubject

  • 뷰를 가장 최신의 데이터로 미리 채우기에 용이
    • 유저 프로필 화면을 BehaviorSubject에 바인딩

3. ReplaySubject

  • 버퍼사이즈를 지정하여 버퍼 사이즈만큼 새로운 subscriber에게 emit
  • 버퍼 사이즈는 메모리에 할당 되므로 사이즈가 커질수록 메모리에 큰 부하를 주는것을 주의해야함
  • 구독 이후 이벤트는 버퍼에 상관없이 모두 받음

replaysubject

example code

let subject = ReplaySubject<String>.create(bufferSize: 2)

let disposeBag = DisposeBag()

subject.onNext("1")
subject.onNext("2")
subject.onNext("3") //buffer size를 2로 가지는 subject를 만든 후 1, 2, 3 세 개의 요소를 subject에 추가한다

subject
  .subscribe {
    print(label: "1) ", event: $0)
  }
  .disposed(by: disposeBag)

subject
  .subscribe {
    print(label: "2) ", event: $0)
  }
  .disposed(by: disposeBag) //이 subject에 대해 2개의 구독자를 추가한다.

  /*
  1) 2
  2) 3
  3) 2
  4) 3
  */

subject.onNext("4") // 버퍼 사이즈가 2이므로 subscribe하면 항상 2개씩 emit

subject
  .subscribe {
    print(label: "3) ", event: $0)
  }
  .disposed(by: disposeBag)

subject.onError(MyError.anError) 

/*
1) 4
2) 4
1) error(anError)
2) error(anError)
3) 3
3) 4
3) error(anError)

4. PublishRelay & BehavioRelay

  • PublishRelay
    • PublishSubject를 단순히 wrap한 것이며 .next만 가능하고 기능은 동일하다
  • BehavioRelay
    • BehavioSubject를 단순히 wrap한 것이며 .next만 가능하고 기능은 동일하다
  • relay의 추가는 .accept()로 접근
  • error, onCompleted() 사용 불가

Relay의 핵심은 끝나지 않음을 보장하는것

example code

//PublishRelay
let relay = PublishRelay<String>()
    
let disposeBag = DisposeBag()
    
relay.accept("Knock knock, anyone home?")
    
relay
    .subscribe(onNext: {
        print($0)
    })
  .disposed(by: disposeBag)

relay.accept("1")
// 1

//BehavioRelay

let relay = BehaviorRelay(value: "Initial value")
let disposeBag = DisposeBag()
  
relay.accept("New initial value")
  
relay
    .subscribe {
        print("1)", $0)
    }.disposed(by: disposeBag)
    //1) next(New initial value)

 relay.accept("1")
//1) next(1)

relay
    .subscribe {
        print("2)", $0)
    }.disposed(by: disposeBag)
    // 2) next(1)

relay.accept("2")
//2) next(2)

5. AsyncSubject

  • completed 이벤트가 발생하기 전까지는 어떤 구독자에게도 이벤트를 전달하지 않는다
  • completed 이벤트가 발생하면 마지막 이벤트를 방출하고 종료된다
  • 만약 error 이벤트로 종료되어도 오류를 방출하고 종료된다

asyncsubject

에러가 발생하면 에러도 전달함

asyncsubject


References

[RxSwift] 3. Subjects

[RxSwift] Subject