4. Scheduler?

What is Scheduler?

공식 문서 : Scheduler는 작업 수행을 위한 메커니즘(Context)을 추상화한다

무슨말인지 감이 안온다 그림을 살펴보자

scheduler

한마디로 Cocoa에서의 MainQueueRxSwift에서는 Main Scheduler란 말이다

그래도 아직 감이 안오니 관련 연산자를 살펴보자

SubscribeOn / ObserveOn

subscribe(on:)

  • sequence가 시작할 scheduler를 지정한다
  • 특정 scheduler에서 dispose시키고 싶을때 사용함
  • observable 체인에서 위아래로 영향을 끼친다
  • 여러번 사용 가능하지만 제일 처음에 지정한 연산자만 실행됨

example code

exampleObservable
			.map({ str in
				print("map - \(str) -> \(str.lowercased()) Thread : \(Thread.current.threadName)")
				return String(stringLiteral: str.lowercased())
			})
			.subscribe(on: concurrentScheduler)
			.map({ str in
				print("map - \(str) -> \(str.uppercased()) Thread : \(Thread.current.threadName)")
				return String(stringLiteral: str.uppercased())
			})
			.subscribe(onNext: {

				print("onNext - String : \($0) Thread : \(Thread.current.threadName)")
			}, onError: nil ,
			   
			   onCompleted: {
				print("onCompleted - Thread : \(Thread.current.threadName)")

			}).disposed(by: disposeBag)

/*
Observable - rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
map - Element 1 -> element 1 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
map - element 1 -> ELEMENT 1 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
onNext - String : ELEMENT 1 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
map - Element 2 -> element 2 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
map - element 2 -> ELEMENT 2 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
onNext - String : ELEMENT 2 Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
onCompleted - Thread : rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
*/

onbserve(on:)

  • sequence를 어느 scheduler에서 관찰할 것인지를 결정한다
  • 각각의 operator들을 다른 scheduler에 지정하고싶을때 사용한다
  • subscribeOn과 다르게 여러번 사용 가능함
  • observable 체인 아래로만 영향을 끼친다

example code

exampleObservable
			.subscribe(on: concurrentScheduler)
			.observe(on: mainScheduler)
			.map({ str in
				print("map - \(str) -> \(str.uppercased()) Thread : \(Thread.current.threadName)")
				return String(stringLiteral: $0.uppercased())
			})
			.subscribe( onNext: {

				print("onNext - String : \($0) Thread : \(Thread.current.threadName)")

			}, onError: nil ,
			   
			   onCompleted: {

				print("onCompleted - Thread : \(Thread.current.threadName)")

			}).disposed(by: disposeBag)

/*
Observable - rxswift.queue.DispatchQoS(qosClass: Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
map - Element 1 -> ELEMENT 1 Thread : OperationQueue: NSOperationQueue Main Queue
onNext - String : ELEMENT 1 Thread : OperationQueue: NSOperationQueue Main Queue
map - Element 2 -> ELEMENT 2 Thread : OperationQueue: NSOperationQueue Main Queue
onNext - String : ELEMENT 2 Thread : OperationQueue: NSOperationQueue Main Queue
onCompleted - Thread : OperationQueue: NSOperationQueue Main Queue
*/

Tip : 일반적으로 subscribe(on:)은 Background Thread에서, observe(on:)은 Main Thread에서 사용

보통 scheduler를 사용할때 데이터 작업은 백그라운드 스레드에서 실행시키고 UI작업은 메인스레드에서 실행 하기 때문에, 다음과같은 확장 메서드를 정의하여 사용하면 편리하다

extension ObservableType {
	func addSchedulers() -> Self{
		return self
			.subscribe(on: ConcurrentDispatchQueueScheduler.init(qos: .background))
			.observe(on: MainScheduler.instance) as! Self
	}
}

exampleObservable
		.addSchedulers()
		.subscribe(onNext: { text in
				print("onNext: \(text)")
		}, onError: nil,
		   
			onCompleted: {
				print("onCompleted: "
		}).disposed(by: disposeBag)
								

Scheduler의 종류

kind_of_scheduler

1. MainScheduler (Serial scheduler)

  • MainThread에서 실행되어야 할 작업에서 사용 (주로 UI)
  • MainSchedule.instance vs MainSchedule.asyncInstance
  • 보이는 그대로 MainSchedule.instance는 synchronous하게, MainSchedule.asyncInstance는 asynchronous하게 main thread에 이벤트를 전달한다.
  • MainSchedule.asyncInstance는 자주 쓰이지 않고, 가능하면 쓰는걸 회피하는 것이 좋다.하지만 하나의 event가 같은 pipeline안에서 다른 event를 트리거 할 때 first event가 끝나기 전에 second event가 불린다고 warning이 뜨므로 이런 경우에 사용하면 된다.

2. CurrentThreadScheduler (Serial scheduler)

  • 현재 있는 쓰레드에서 작업이 실행된다.
  • 만약 CurrentThreadScheduler.instance.schedule(state) { }를 어떤 스레드에서 처음으로 호출했다면, 그 예정된 행동은 즉시 실행될 것이고 모든 재귀적 예정된 액션들이 임시로 저장되는 숨겨진 큐가 생성된다
  • 만약 콜 스택의 몇몇 부모 프레임이 이미 CurrentThreadScheduler.instance.schedule(state) { }를 실행중이라면, 예정된 액션은 저장되고 현재 실행중인 액션과 모든 전에 저장되었던 액션이 실행 종료되고 나서 실행된다

3. SerialDispatchQueueScheduler (Serial scheduler)

  • 특정한 dispatch_queue_t에서 실행되어야 할 작업을 처리한다.
  • 메인 스케줄러는 SerialDispatchQueueScheduler의 인스턴스 중 하나이다

4. ConcurrentDispatchQueueScheduler (Concurrent scheduler)

  • 특정한 dispatch_queue_t에서 실행되어야 할 작업을 처리한다.
  • Serial Queue에 작업을 보내도 문제가 생기지 않는다.
  • 보통 백그라운드에서 처리해야 할 때 적합하다.

5. OperationQueueScheduler (Concurrent scheduler)

  • NSOperationQueue에서 실행되어야 할 작업을 처리한다.
  • 백그라운드에서 작업을 조정하고 싶을 때 사용하며 maxConcurrentOperationCount로 작업 조정 가능

6. TestScheduler

  • 테스트용 scheduler
  • RxTest에 포함되어 있으며 production code에는 사용하지 않음

7. Custom Scheduler

  • 직접 커스텀해 생성하는 scheduler

References

[ReactiveX] Schedulers

[RxSwift] Scheduler

[순한맛 RxSwift] 순한맛 RxSwift [4] - Schedulers