336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

[목차]==================================================

1. Connectable Observable 정의 

2. Connectable Observable 사용 예제

======================================================

 

 

1. Connectable Observable 정의

 Connectable Observable은 구독을 하더라도 이템 방출을 시작하지 않는다는 점을 제외하면 일반적인 Observable과 비슷합니다. connect()을 호출했을 때에만 방출합니다. 이 방법으로 Subscriber들에게 Observable가 방출을 시작하기전에 Observable구독하도록 기다릴 수 있습니다.

  • ConnectableObservable.connect( ) — Connectable Observable에게 아이템 방출을 시작하라고 지시한다.
  • Observable.publish( ) — Observable을 Connectable Observable으로 변형시킨다.
  • Observable.replay( ) — 모든 Observer들에게 방출이 시작된 후에 구독을 했을 경우라도 같은 순서의 방출된 아이템을 볼 수 있도록 보장합니다.
  • ConnectableObservable.refCount( ) — Connectable Observable을 일반적인 Observable처럼 작동하도록 만듭니다.


 

2. Connectable Observable 사용 예제

아래의 예제코드는 같은 Observable을 구독하는 두개의 subscriber를 보여주는 코드입니다. 첫번째 케이스에서는 일반적인 Observable이고 두번째 케이스에서는 Connectable Observable으로 subscriber가 모두 구독한 이후 연결하였습니다.

 Observable firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

 

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

 

firstMillion.subscribe(
    { println("Subscriber #2:" + it); },       // onNext
    { println("Error: " + it.getMessage()); }, // onError
    { println("Sequence #2 complete"); }       // onCompleted
);

Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

 

 Observable firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

 

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
   { println("Subscriber #2:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #2 complete"); }       // onCompleted
);

firstMillion.connect();

Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete

 

 

 

출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.
블로그 이미지

차봉규

IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

,
336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

 

[목차]==================================================

1. onError 해주지 않으면 Crash가 발생 주의
2. subscribe 에서 에러 처리
3. Error 캐치 - onErrorReturn
4. Error 캐치 - OnErrorResumeNext
5. Retry
6. Retry 횟수 제한
7. RetryWhen

======================================================

 

 

RxJava & RxAndroid 사용 시 항상 에러 핸들러를 구독하고 제공하는지 항상 확인해야 합니다. 그렇지 않으면 특히 Scheduler를 적용할 때 스택 트레이스에 아무것도 없을 수 있습니다. 물론 RxJava & RxAndroid 에서 뭔가 잘못됐다고 알려주긴 하지만 어디서 발생했는지 찾을 방법이 없습니다. 항상 에러 콜백을 사용하고, 에러가 발생한다면 에러를 로그로 남겨서 예상치 못한 오류를 기록해야 합니다.

 

1. onError 해주지 않으면 Crash가 발생 주의

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log("on next: " + s);
            }
        });

 

2. subscribe 에서 에러 처리

 

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }

            @Override
            public void onError(Throwable e) {

                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }

            @Override
            public void onComplete() {
                log("completed");
            }
        });

[출력결과]
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

 

3. Error 캐치 - onErrorReturn
Observable 체인 안에서 발생한 Error 를 캐치해서, 대체할 Object로 변환하는 것으로 subscriber에 Error가 전달되는 것을 막을 수 있다.

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                return "return";
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }

            @Override
            public void onError(Throwable e) {
                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }

            @Override
            public void onComplete() {
                log("completed");
            }
        });

[출력결과]
subscribe
on next: emit 1
on next: emit 2
on next: return
completed


4. Error 캐치 - OnErrorResumeNext
Observable 체인에서 발생한 Error를 캐치해서, 그 안에서 다시 한 번 Observable를 호출하면 에러시 대체 Stream을 반환할 수 있다.

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> subscriber) throws Exception {
                log("subscribe");
                subscriber.onNext("emit 1");
                subscriber.onNext("emit 2");
                subscriber.onError(new Throwable());
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {
                return Observable.fromArray(new String[]{"resume 1", "resume 2"});
            }
        }).subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(String value) {
                log("on next: " + value);
            }

            @Override
            public void onError(Throwable e) {
                // 에러시 처리를 여기로 받음
                log("error:" + e);
            }

            @Override
            public void onComplete() {
                log("completed");
            }
        });

[출력결과]
subscribe
on next: emit 1
on next: emit 2
on next: resume 1
on next: resume 2
completed

 

5. Retry
Error가 일어났을 때, 자동으로 subscribe를 다시 해준다.

성공할때까지 계속... 무한루프 될 가능성이 있으므로 유의해야 한다

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        })
        .retry()
        .subscribe()

[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
.
.
.

반복


6. Retry 횟수 제한

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        })
        .retry(3)
        .subscribe()

[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

 

Retry 좀 더 구체적인 설정

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        })
        .retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(Integer integer, Throwable throwable) throws Exception {
                if (integer < 3) {
                    return true;
                }
                return throwable instanceof IllegalStateException;
            }
        })

        .subscribe(s -> log("on next: " + s)
            , e -> log("error:" + e)
            , () -> log("completed"));

[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
error:java.lang.Throwable

 

7. RetryWhen
보다 세밀하게 retry 처리를 제어하기 위한 함수.

 Observable.create(subscriber -> {

            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        return Observable.timer(3, TimeUnit.SECONDS);
                    }
                });
            }
        }).subscribe(s -> log("on next: " + s)
                        , e -> log("error:" + e)
                        , () -> log("completed"));

[출력결과]
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
.
.
.
반복

 

그냥 Error인채로 종료

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).subscribeOn(AndroidSchedulers.mainThread())
                .retryWhen(throwableObservable -> throwableObservable.flatMap(
                        throwable -> Observable.error(throwable)
                ))
.subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));

 

Error에 대한 처리를 하지 않고 Complete하기

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).onErrorResumeNext(throwable -> {Observable.empty();})
                .subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));

[출력결과]
subscribe
on next: emit 1
on next: emit 2
completed

 

3번 retry하고 종료
이 경우, 앞의 retry(count) 함수와의 차이는 retry(count)에서는 retry 횟수가 제한에 도달한 후에 error로 종료합니다만, 이 케이스는 completed 에서 종료한다는 점이다.

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).retryWhen(throwableObservable -> throwableObservable.take(3))
                .subscribe(s -> log("on next: " + s)
                , e -> log("error:" + e)
                , () -> log("completed"));

[출력결과]
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
subscribe
on next: emit 1
on next: emit 2
completed


3초 retry를 3번 하고 종료하기

 Observable.create(subscriber -> {
            log("subscribe");
            subscriber.onNext("emit 1");
            subscriber.onNext("emit 2");
            subscriber.onError(new Throwable());
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.take(3).flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                        return Observable.timer(3, TimeUnit.SECONDS);
                    }
                });
        
    }
        }).subscribe(s -> log("on next: " + s)
                        , e -> log("error:" + e)
                        , () -> log("completed"));

[출력결과]
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
// 3초 후
subscribe
on next: emit 1
on next: emit 2
completed


 

 


 출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

 

블로그 이미지

차봉규

IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

,
336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

 

[목차]==================================================

1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류

1) RxJava가 제공하는 Scheduler

2) RxAndroid는 제공하는 Scheduler

2. 독립적으로 사용 가능한 Scheduler

3. 사용법 예제

1) Scheduler 안한 경우 기본 동작
2) subscribeOn() API만 사용한 경우 동작
3) observerOn() API만 사용한 경우 동작
4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작
5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작

======================================================

 

 

RxAndroid에서는 Scheduler를 통해 어느 쓰레드에서 실행이 될지 결정 할 수 있습니다.
Scheduler는 subsctibeOn(), observeOn() 에서 각각 지정할 수 있는데

subsctibeOn()observable의 작업을 시작하는 쓰레드를 선택 할 수 있습니다.( 중복해서 적을 경우 가장 마지막에 적힌 스레드에서 시작합니다.)

observeOn()은 이후에 나오는 Operator, subscribe의 Scheduler를 변경 할 수 있습니다.

 

1. RxJava와 RxAndroid에서 제공하는 Scheduler 종류

1) RxJava가 제공하는 Scheduler

  • Schedulers.computation()

이벤트 룹에서 간단한 연산이나 콜백 처리를 위해서 쓰는 것입니다. I/O 처리를 여기에서 해서는 안됩니다.

RxComputationThreadPool라는 별도의 스레드 풀에서 돌아갑니다. 최대 cpu갯수 ?개의 스레드 풀이 순환하면서 실행됩니다.

  • Schedulers.from(executor)

특정 executor를 스케쥴러로 사용합니다

  • Schedulers.immediate

현재 스레드에서 즉시 수행합니다.
observeOn()이 여러번 쓰였을 경우 immediate()를 선언한 바로 윗쪽의 스레드를 따라갑니다.

  • Schedulers.io()

동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케줄러입니다. 자체적인 스레드 풀에 의존합니다.
자체적인 스레드 풀 CachedThreadPool을 사용합니다. API 호출 등 네트워크를 사용한 호출 시 사용됩니다.

  • Schedulers.newThread()

항상 새로운 스레드를 만드는 스케쥴러입니다.

  • Schedulers.trampoline()

큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러

 

※ 일부 오퍼레이터들은 자체적으로 어떤 스케쥴러를 사용할지 지정합니다. 예를 들어 buffer 오퍼레이터는 Schedulers.computation()에 의존하며 repeat은 Schedulers.trampoline()를 사용합니다.

 

 

2) RxAndroid는 제공하는 Scheduler

  • AndroidSchedulers.mainThread()

안드로이드의 UI 스레드에서 동작

  • HandlerScheduler.from(handler)

특정 핸들러 handler에 의존하여 동작

 

※ 안드로이드에 특화된 스케쥴러입니다. 보통은 RxAndroid가 제공하는 AndroidSchedulers.mainThread()와 RxJava가 제공하는 Schedulers.io()를 조합해서 Schedulers.io()에서 수행한 결과를 AndroidSchedulers.mainThread()에서 받아 UI에 반영하는 패턴등이 일반적으로 쓰입니다.

 

 

2. 독립적으로 사용 가능한 Scheduler

Scheduler는 Observable, Operator, Subscriber 모델 밖에서 별도로 사용할 수 있습니다.

 worker = Schedulers.newThread().createWorker();


worker.schedule(new Action0() {

    @Override
    public void call() {
        realmJob();
    }

});

 

 Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                log("worker: " + Thread.currentThread().getName());
            }
        });

[출력결과]
worker: RxNewThreadScheduler-1

 


3. 사용법 예제

1) Scheduler 안한 경우 기본 동작

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));

[출력결과]
subscribe:main
map: main
on next: main


2) subscribeOn() API만 사용한 경우 동작

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));

[출력결과]
subscribe:RxComputationThreadPool-1
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1


3) observerOn() API만 사용한 경우 동작

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).observeOn(Schedulers.computation()).map(s1 -> {
            log("map: " + Thread.currentThread().getName());
            return s1;
        }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));

[출력결과]
subscribe:main
map: RxComputationThreadPool-1
on next: RxComputationThreadPool-1


4) subscribeOn() & observerOn() 1번씩 API 사용한 경우 동작

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
                .map(s1 -> {
                        log("map: " + Thread.currentThread().getName());
                        return s1;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));

[출력결과]
subscribe:RxCachedThreadScheduler-1
map: main
on next: main


5) subscribeOn() & observerOn() 다수 API 사용한 경우 동작

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                log("subscribe:" + Thread.currentThread().getName());
                e.onNext("next");
            }
        }).subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .observeOn(AndroidSchedulers.mainThread()).map(s1 -> {
                        log("map1: " + Thread.currentThread().getName());
                        return s1;
         }).observeOn(Schedulers.newThread()).map(s2 -> {
                        log("map2: " + Thread.currentThread().getName());
                        return s2;
         }).observeOn(Schedulers.single()).map(s3 -> {
                        log("map3: " + Thread.currentThread().getName());
                        return s3;
         }).subscribe(s -> log("on next: " + Thread.currentThread().getName()));

[출력결과]
subscribe:RxCachedThreadScheduler-1
map1: main
map2: RxNewThreadScheduler-1
map3: RxSingleScheduler-1
on next: RxSingleScheduler-1

 



 

출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

 

블로그 이미지

차봉규

IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

,
336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

[목차]==================================================

1. PublishSubject

2. BehaviorSubject

3. ReplaySubject

4. AsyncSubject

======================================================

 

 

Subject 는 Observable + Subscriber 로 종종 표현되고는 합니다만, 정확한 표현은 아니라고 생각합니다. (왜냐하면 일반적인 Subscriber 처럼 subscribe()에 직접 사용되는 경우는 거의 없습니다.)
물론 Subject 는 Observable 과 Subscriber 를 모두 implementation 하고 있으니 틀린 이야기는 아닙니다만,
저는 Subject 는 이벤트를 전달받아 구독자들에게 이벤트를 전파하는 중간다리라고 하는게 좀 더 정확한 표현이라고 생각합니다.
onNext()로 전달받은 이벤트를 구독자들에게 전파하며, onCompleted()나 onError()를 받으면 이것 역시 구독자들에게 전파하고 Observable로의 역할을 종료하게 됩니다.
Android 에서는 EventBus 와 같은 형태로도 사용이 가능합니다. 즉 RxJava 를 사용하면 다른 EventBus 라이브러리가 불필요해집니다.

EventBus 언급을 한것에서 살짝 힌트를 받으셨겠지만 Subject 들은 보통 onCompleted 와 같이 종료하는 과정이 없이, 액티비티 라이프사이클(또는 앱 라이프사이클)과 동일하게 살아서 이벤트를 전파하는 역할로 자주 사용됩니다.

여기서는 PublishSubject 와 BehaviorSubject 만 언급하고 넘어가겠습니다만, 다른 Subject 들도 무척 유용하니 기회가 된다면 한번씩 사용해보시는걸 권장해드립니다.

 

1. PublishSubject

PublishSubject 를 구독한 시점으로부터 이후에 발생하는 이벤트들을 전달받음

     public void publishSubject() {
        PublishSubject<String> subject = PublishSubject.create();

        subject.onNext("첫번째 호출");
        subject.onNext("두번째 호출");

 

        subject.subscribe(text -> {
            System.out.println("onNext : " + text);
        });

 

        subject.onNext("세번째 호출");
        subject.onNext("네번째 호출");
    }

     // 결과
    onNext : 세번째 호출
    onNext : 네번째 호출

 

 

2. BehaviorSubject

BehaviorSubject 는 PublishSubject 와 비슷합니다만, 구독전에 한건이라도 이벤트가 발생했다면 구독시점에 해당 이벤트도 같이 전달받음

가장 최근에 관찰된 아이템과 그 후에 관찰된 나머지 아이템을 구독하는 옵저버에게 발행

     public void behaviorSubject() {
        BehaviorSubject<String> subject = BehaviorSubject.create();

        subject.onNext("첫번째 호출");
        subject.onNext("두번째 호출");

        subject.subscribe(text -> {
            System.out.println("onNext : " + text);
        });

        subject.onNext("세번째 호출");
        subject.onNext("네번째 호출");
    }

     // 결과
    onNext : 두번째 호출
    onNext : 세번째 호출
    onNext : 네번째 호출

 

 

3. ReplaySubject

관찰한 모든 아이템을 버퍼에 저장하고 구독하는 옵저버에게 재생

(보완필요)


4. AsyncSubject

옵저버블이 완료됐을 때 구독하고 있는 각 옵저버에게 관찰한 마지막 아이템만을 발행

(보완필요)

 

 

 

출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다

블로그 이미지

차봉규

IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

,
336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

[목차]==================================================

1. Observable 변형 및 가공 

1) map()

2) flatMap()

3) Filter()

4) 기타

2. Observable 합성

1) zip()

2) 기타

======================================================

 

 

1. Observable 변형 및 가공

발생되는 이벤트를 다른 형태로 변형하기를 원하실 수도 있습니다.
가장 많이 사용되는건 map 과 flatMap 입니다.

 

1) map()

map() 함수를 사용하여 전달받은 이벤트를 다른값으로 변경합니다.

     public void map() {
        Observable.from(new String[] { "개미", "매", "마루" })
            .map(text -> "** " + text + " **")
            .subscribe(
                text -> System.out.println("onNext : " + text),
                e -> System.out.println("onError"),
                () -> System.out.println("onCompleted"));
    }

     // 결과
    onNext : ** 개미 **
    onNext : ** 매 **
    onNext : ** 마루 **
    onCompleted

 

Observable.just("hello world")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + "RxJava";
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, ">>> onNextAction  : " + s);
            }
        })

 

2) flatMap()

flatMap()은 전달받은 이벤트로부터 다른 Observable 들을 생성하고, 그 Observable 들에서 발생한 이벤트들을 쭉 펼쳐서 전파합니다.

    public void flatMap() {
        Observable.from(new String[] { "개미", "매", "마루" })
            .flatMap(
                text -> Observable.from(new String[] { text + " 사랑합니다.", text + " 고맙습니다." })

            )
            .subscribe(
                text -> System.out.println("onNext : " + text),
                e -> System.out.println("onError"),
                () -> System.out.println("onCompleted"));
    }

 

    // 결과
    onNext : 개미 사랑합니다.
    onNext : 개미 고맙습니다.
    onNext : 매 사랑합니다.
    onNext : 매 고맙습니다.
    onNext : 마루 사랑합니다.
    onNext : 마루 고맙습니다.
    onCompleted

 

3) Filter()

데이터 filter 역할

  Observable.just("hello world")
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                if (s.contains("hello")) {
                    return true;
                } else {
                    return false;
                }
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, ">>> onNextAction  : " + s);
            }
        });

위와 같이 filter 함수를 보면 observable 을 통해서 넘어온 데이터에 hello 가 포함되어있는지를 확인 하고 있다.

포함되어 있지 않으면 false 가 넘어가게 되어서 subscribe 가 호출되지 않는다.

 

4) 기타

map() / 특정 Func 객체를 받아서 옵저버블이 발행하는 모든 값에 Func 객체를 적용
flatMap() / 모든 발행한 값을 하나의 최종 옵저버블로 병합하는 방식으로 시퀀스를 플랫하게 만드는 방법을 제공
concatMap() / 발행한 값을 병합하는 대신, 발행한 값을 연쇄적으로 연결할 수 있는 플랫한 함수를 제공해 flatMap()의 인터리빙 문제를 해결
flatMapIterable() / 소스 아이템과 생성된 옵저버블 간에 쌍이 되는 것이 아니라 소스 아이템과 생성된 이터러블이 한 쌍이 됨
switchMap() / 소스 옵저버블에서 새로운 아이템을 발행할 때마다 먼저 발행한 아이템이 생성한 옵저버블의 구독을 해지하고 미러링을 중지한 다음 새로 발행한 아이템을 미러링
scan() / 옵저버블이 발행하는 각 아이템에 함수를 적용해 함수 결과를 계산한 다음, 결과를 옵저버블 시퀀스에 다시 주입해 다음번에 발행되는 값과 함께 사용하기 위해 대기
groupBy() / 특정 기준에 따라 리스트의 요소를 분류하는 함수
buffer() / 소스 옵저버블을 새로운 옵저버블로 변환하며, 새로운 옵저버블은 하나의 아이템이 아닌 리스트 형태로 값을 발행
window() / buffer()와 유사하지만, 리스트가 아닌 옵저버블을 발행
cast() / map() 연산자의 특수 버전으로 소스 옵저버블의 각 아이템을 다른 Class로 캐스팅해 새로운 타입으로 변환

 

filter(return E) / return E값이 true일 때만 값이 발행되고 옵저버에게 전달
take(n) / 옵저버블을 1번째부터 n번째까지만 발행 -> 쉽게 알 수 있는 아이템의 소규모 그룹을 얻기 위해 사용
takeLast(n) / 옵저버블의 마지막 요소 n개만큼 발행
distinct() / 특정 값을 단 한번만 처리
distinctUntilChanged() / 모든 중복 값은 무시하고 새로운 값만 발행
first() / 옵저버블이 발행한 요소에서 첫번째 요소만을 발행
last() / 옵저버블이 발행한 요소에서 마지막 요소만을 발행
skip(n) / 옵저버블이 발행한 요소에서 처음 n개의 요소를 숨김
skipLast(n) / 옵저버블이 발행한 요소에서 마지막 n개의 요소를 숨김
elementAt(n) / 시퀀스에서 n번째 요소만을 발행한 후 시퀀스를 완료 (elementAtOrDefault() 는 해당 시퀀스가 없을 경우도 포함)
sample(n, time) / n(time)마다 마지막으로 발행된 값을 발행 / 첫 번째 아이템을 발행시키기 위해서는 throttleFirst() 사용
timeout(n, time) / 옵저버블 시퀀스 소스에 반영하며 지정한 시간 내에 아무런 값을 받지 못할 경우 에러를 발행
debounce(n, time) / 옵저버블에서 아이템이 발행된 다음 바로 뒤따라서 발행된 아이템을 필터링하고, 옵저버블에서 일정 시간 동안 다른 아이템이 발행되지 않으면 아이템을 발행

 

 

 

2. Observable 합성

두개 이상의 Observable 을 합성해야 하는 경우도 있다.
data-flow 에 기반한 개발에서 매우 자주 언급되고 사용되기는 하지만, 일반적인 비동기작업에서는 자주 사용하는 개념은 아니다.
하지만 알고 있으면 종종 사용하게되는 유용한 도구들이니 한번쯤 살펴보고 넘어가시는걸 추천

 

1) zip()

네트워크 작업으로 사용자의 프로필과 프로필 이미지를 동시에 요청하고, 그 결과를 합성해서 화면에 표현해준다거나 하는 형태의 작업이 필요한 경우 zip() 유용하게 사용

     public void zip() {
        Observable.zip(
            Observable.just("개미"),
            Observable.just("gaemi.jpg"),
            (profile, image) -> "프로필 : " + profile + ", 이미지 : " + image
        ).subscribe(
            print -> System.out.println("onNext : " + print),
            e -> System.out.println("onError"),
            () -> System.out.println("onCompleted")
        );
    }

     // 결과
    onNext : 프로필 : 개미, 이미지 : gaemi.jpg
    onCompleted

 

2) 기타

merge() / 옵저버블이 발행한 아이템을 병합하는 방식으로 2개 이상의 옵저버블을 결합
zip() / 여러 옵저버블에서 발행한 아이템을 결합하고, 특정한 함수인 Func에 따라 아이템을 변환한 다음 새로운 값을 발행
join() / merge()와 zip()은 발행한 아이템의 도메인에서 동작하기 때문에 값을 처리하는 방법을 결정하기 전에 시간을 고려해야 하는 시나리오가 있을 수 있는데, 해당 함수를 사용함으로 타임 윈도와 함께 동작해 두 옵저버블의 아이템을 결합 / 간단한 상황에서는 문자열과 동작하며, 단순히 발행한 문자열열을 하나의 최종 문자열로 조인하는 연산자도 있다.
combineLatest() / zip()은 두 옵저버블의 가장 최근 언집된 아이템에서 동작하는 대신, 해당 함수는 가장 최근에 발행한 아이템에서 동작
and(), then(), when()  /해당 함수들을 사용함으로써 패턴과 플랜 같은 구조체를 사용해 발행한 아이템을 결합
switch() / 옵저버블을 발행하는 옵저버블을 가장 최근 발행한 옵저버블을 발행하는 옵저버블로 변환
startWith() / 옵저버블이 아이템 발행을 시작하기 전에 인자로 전달받은 아이템의 시퀀스로 발행
 

 

 

출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다.

블로그 이미지

차봉규

IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

,
336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

 

[목차]==================================================

1. Observable을 만드는 Operator 목록

2. Create 

3. Defer
4. Empty/Never/Throw

5. From
6. Interval
7. Just
8. Range
9. Repeat
10. Start
11. Timer

======================================================

 

 

Observable을 좀 더 쉽게 만들 수 있는 방법이 있습니다.
바로 미리  생성된 Operator를 사용하는 것 입니다

 

 

1. Observable을 만드는 Operator 목록

 

just( ) — convert an object or several objects into an Observable that emits that object or those objects
from( ) — convert an Iterable, a Future, or an Array into an Observable
repeat( ) — create an Observable that emits a particular item or sequence of items repeatedly
repeatWhen( ) — create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable
create( ) — create an Observable from scratch by means of a function
defer( ) — do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
range( ) — create an Observable that emits a range of sequential integers
interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval
timer( ) — create an Observable that emits a single item after a given delay
empty( ) — create an Observable that emits nothing and then completes
error( ) — create an Observable that emits nothing and then signals an error
never( ) — create an Observable that emits nothing at all

 package tiii.com.rxandroid;

import android.os.Bundle;
import android.widget.TextView;

import com.trello.rxlifecycle.components.support.RxAppCompatActivity;

import rx.Observable;
import rx.functions.Action1;

 

public class MainActivity extends RxAppCompatActivity {
    public static final String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable
                .just("Hello RxAndroid !!")
                .compose(this.<String>bindToLifecycle())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        ((TextView) findViewById(R.id.textview)).setText(s);
                    }
                });
    }
}

 

 

2. Create 

  • 수동으로 옵져버 메소드 호출하여 새로운 Observable을 생성
  • 언뜻본다면 큰 문제는 없어 보인다.
    물론 Observable.from() 를 사용한다면 subscriber.onComplete() 와 같은 코드를 신경쓰지 않고 더 편하게 사용이 가능하다
    하지만 Observable.just() 나 Observable.from() 와 같은 경우 발행되는 Item 들이 observable 생성시점에 이미 정해져있어야 한다.
    즉 Database 상에서 데이터를 읽어 오는 작업과 같이 비용이 큰 작업들을 비동기로 처리하고자 할 때에는 적절하지 않다.
  • 이처럼 Observable.create() 만으로는 간단한 비동기 처리 흐름을 만들기는 어려운 작업이며 실수할 여지가 매우 많다.
    그래서 많은 개발자들은 Observable.create() 대신 Observable.defer() 를 사용하는걸 추천한다.


Create

 

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<super String> subscriber) {
        try {
            subscriber.onNext("Hello_Create");
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);

 

 

3. Defer 

  • 구독하기 전까지 Observable을 생성하지 않습니다. 그리고 각각의 옵져버에게 매번 새로운 Observable을 생성합니다.
  • 다른 생성 오퍼레이터와 다른 점이 뭔지 애매했었는데 데이타스트림이 메모리에 할당되는 타이밍이 다른 것이 였습니다. 다른 오퍼레이터들은 오퍼레이터를 선언하는 순간 메모리에 할당 되지만 defer는 subscribe가 호출 될 때에 할당 된다고 합니다.

Defer

 

 Observable.defer(() -> {

    return SomethingLongTask(); //return Observable<String>
})
.compose(mMainView.ActivityLifecycleProvider().bindToLifecycle());
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
            Log.d(TAG, s);
            mMainView.TextChange(s);
        },
        throwable -> throwable.printStackTrace(),
        () -> {
            Log.d(TAG, "onComplete");
            LogTextView();
        }
);

 

  • Observable.defer() 비동기로 observable 을 생성하고 하위 스트림에서 사용할 수 있도록 해준다.

 Observable.defer(() -> {
    List<Book> books = dao.findAll();
    return Observable.just(books);
  })

  .subscribeOn(Schedulers.io())
  .subscribe(books -> {
    // Next Step
  }, throwable -> {
    // Error handling
  });

 여기서는 dao.findAll() 을 통해 반환된 결과를 Observable.just() 를 사용하여 새로운 작업흐름을 만들고 반환하였다.
observable 을 만들기 위한 observable 이 필요하다는 점에서 추가되는 비용이 있지만 Observable.create() 보다는 훨씬 간결한 코드가 만들어졌다.
명시적으로 onComplete() 나 onError() 를 처리해줄 필요가 없으며, subscriber 의 구독상태를 확인할 필요도 없다.


 

4. Empty/Never/Throw

매우 정확하고 제한적인 행동의 Observable을 생성합니다.
1) Empty

  • 방출하는 아이템이 없고 정상적으로 종료되는 옵저버블을 생성합니다.
    Empty
    2) Never
  • 방출하는 아이템이 없고 종료되지 않는 옵저버블을 생성합니다.
    Never
    3) Throw
  • 방출하는 아이템이 없고 에러를 발생하여 종료되는 옵저버블을 생성합니다.

Throw

 

5. From

  • 배열이나 Iterable의 요소를 순차적으로 방출 시키는 Observable으로 변환

에서

 

6. Interval

  • 특정한 시간 간격으로 아이템을 방출하는 Observable을 생성합니다.
  • 일정시간 마다 반복적인 작업이 필요할 때 사용

Interval

 

  •  

    7. Just

    • 오브젝트나 오브젝트셋을 바로 방출하는 Oservable으로 변환
    • 만약에 아무것도 하지 않는 옵저버블을 만들기 위해 null 을 넣는다면 null을 방출하는 옵저버블이 만들어 집니다.
    • 아무것도 하지 않은 옵저버블을 원하신다면 empty를 사용하시면 됩니다.

    Just

     


    8. Range

    • 정수의 순차적인 범위를 가지고 있는 Observable을 생성
    • Interval과 비슷하지만 반복횟수의 제한이 있습니다. m개 만큼의 반복

    Range

     


    9. Repeat

    • 일정 횟수를 반복하는 Observable을 생성
    • 이 오퍼레이터는 단독으로 사용되지 않고 다른 오퍼레이터 뒤에 붙여서 사용되며 .Repeat(n) 바로 앞 오퍼레이터를 일정횟수 만큼 반복

    Repeat

     

    10. Start

    Start

     

    11. Timer

    • 일정 시간의 딜레이 이후에 단일 항목을 방출하는 Observable을 생성

    Timer

     

  •  

     

    출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

    블로그 이미지

    차봉규

    IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

    ,
    336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

     

    [목차]==================================================

    1. Rxandroid 메모리누수

    2. RxLifecycle 사용법(compose)

    3. RxJava의 Lifecycle 사용방법 - CompositeSubscription

    4. MVP 패턴에서의 CompositeSubscription

    ======================================================

     

    1. Rxandroid 메모리누수

    • Rxandroid 는 메모리누수에 대한 문제점이 있는데,
      Observable이 Context를 복사해서 유지하고 있기 때문에 엑티비티가 종료 될때 unsubscribe하지 않으면 안된다. 기본 예제에서는 명시적으로 onPause 나 onDestory에 명시적으로 unsubscribe()를 호출하도록 되어 있다.(참고로 정상적으로 onComplete()나 onError()콜백으로 진입 하게 되면 알아서 unsubscribe()된다)

    •  => 이러한 부분을 쉽게 사용하기 위해서 RxLifecycle 라이브러리를 사용하면 쉽게 사용 할 수 있다.

    2. RxLifecycle 사용법(compose)

    • Activity의 상속을 RxAppCompatActivity으로 변경하고 compose를 사용하여 Rxlifecycle을 적용해 줍니다.

    • onCreate 에서 subscribe을 하면 onDestory 에서 자동으로 unsubscribe 되고 onResume 에서 subscribe을 하면 onPause 에서 자동으로 unsubscribe 됩니다.

    • 만약에 종료되는 시점은 바꾸고 싶다면 직접 bindUntilEvent 선언하여 조정할 수 있습니다.

    • 프로젝트에 의존성 추가

     

     

     

    package tiii.com.rxandroid;

    import android.os.Bundle;
    import android.util.Log;
    import android.widget.TextView;

    import com.trello.rxlifecycle.ActivityEvent;
    import com.trello.rxlifecycle.components.support.RxAppCompatActivity;

    import rx.Observable;
    import rx.Subscriber;

    public class MainActivity extends RxAppCompatActivity {
        public static final String TAG = MainActivity.class.getSimpleName();

        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);

            Observable<String> simpleObservable =
                    Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            subscriber.onNext("Hello RxAndroid !!");
                            subscriber.onCompleted();
                        }
                    })
                    // `this.<String>` is necessary if you're compiling on JDK7 or below.
                    //
                    // If you're using JDK8+, then you can safely remove
                    .compose(this.<String>bindToLifecycle());
                    //.compose(this.<String>bindUntilEvent(ActivityEvent.DESTROY));


            simpleObservable.subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "complete!");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "error: " + e.getMessage());
                }

                @Override
                public void onNext(String text) {
                    ((TextView) findViewById(R.id.textview)).setText(text);
                }
            });
        }
    }

     

     

    3. RxJava의 Lifecycle 사용방법 - CompositeSubscription

    • RxJava에는 CompositeSubscription이라는 클래스를 제공 한다. 이 클래스는 생성된 Subscription인스턴스를 하나로 관리 해주는 클래스 이다.
    • CompositeSubscription클래스의 remove()메소드를 보면, Removes a Subscription from this CompositeSubscription, and unsubscribes the Subscription.이라고 되어 있다. 이 CompositeSubscription에 add()된 Subscription에 remove()메소드를 콜하면 CompositeSubscripnt에서 제거 되면 동시에 unsubscribe된다고 명시 되어 있다
      Activity나 Fragment등의 onCreate()메소드 에서는 멤버변수로 존재 하는 CompositeSubscription인스턴스를 생성 하는 기능이 들어 간다.

    • 어떠한 작업을 위해서 subscribe()를 하게 되면 Subscription인스턴스를 변수로 정의 한 다음 CompositeSubscription에 add()해 준다. Activity나 Fragment등의 onDestroy()메소등에서는 생성된 인스턴스로 존재 하는 CompositeSubscription인스턴스를 unsubscribe()한다. Activity나 Fragment에서 생성되어지고 add()된 모든 Rx의 subscribe들은 이제 unubcribe()될 것 이다.

     public class MainActivity
        extends Activity {
      private CompositeSubscription compositeSubscription;

      @Override
      protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        compositeSubscription = new CompositeSubscription();

        Observable<String> observable = Observable.create(
            (Observable.OnSubscribe<String>) subscriber -> {
              ...
            }
        );
        Subscription subscription = observable
            .observeOn(Schedulers.computation())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(
                s -> {
                  ...
                }
            );
        compositeSubscription.add(subscription);
      }

      @Override
      protected void onDestroy() {
        if (compositeSubscription != null) {
          compositeSubscription.unsubscribe();
        }

        super.onDestroy();
      }
    }

     

    4. MVP 패턴에서의 CompositeSubscription

    • 위의 3번 방법에서는 Activity나 Fragment에 바로 Subscription을 생성하고 멤버변수로 CompositeSubscription을 생성하여 add(), unsubscribe()하는 것을 알 수 있다.

    • 하지만, MVP패턴에서는 Subscription을 실제로 생성하고 비동기 작업을 요청 하는곳이 Activity나 Fragment가 아닌 Presenter에서 하게 된다. Presenter에 CompositeSubscription을 멤버로 두고 관리하게 하는건 무리가 없을것이다. 하지만 Activity나 Fragment의 Lifecycle에 맞추어 Subscription을 관리 하려 하는 목적에 어긋난다.

    • 아래 소스는 Presenter의 부모 클래스로서 CompositeSubscription에 Subscriber를 등록 하고 destroy()메소드를 액티비티나 프래그먼트의 라이프 사이클에 맞추어 콜 하는 그 예 이다.

     public class BasePresenter {
      private CompositeSubscription compositeSubscriptionl;
     
      public BasePresenter() {
        this.compositeSubscriptionl = new CompositeSubscription();
      }
     
      public <T> void addSubscriber(@NonNull Subscriber<T> subscriber) {
        if (compositeSubscriptionl != null) {
          compositeSubscriptionl.add(subscriber);
        }
      }
     
      public void destroy() {
        if (compositeSubscriptionl != null) {
          compositeSubscriptionl.unsubscribe();
        }
      }
    }

    1) Presenter의 부모 클래스를 만들고 CompositeSubscription의 멤버변수를 추가 한다.
    2) 부모 클래스의 생성자에서는 CompositeSubscription의 인스턴스를 생성 한다.
    3) 부모 클래스에는 만들게 될 Subscriber의 인스턴스를 add()하는 메소드와 lifecycle의 onDestroy()에 맞춰 unsubscribe()하는 메소드인destroy()`메소드를 추가 한다.
    4) 앞으로 만들게 되는 모든 Presenter들은 부모 Presenter를 상속해서 만든다.
    5) 그리고 Activity나 Fragment를 상속한 부모 클래스들을 또 만들고, onCreate()메소드 군 에서는 presenter의 인스턴스를 생성 한다.
    6) 또한 onDestroy()에서는 presenter의 destroy()메소드를 꼭 호출 하여 생성된 모든 Subscription을 unsubscribe()하게 해 준다.

    이러한 부분이 번거롭다면, 그냥 Trello에서 개발 한 RxLifecycle을 사용 하자!

     

     

     

     


     

    출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

    블로그 이미지

    차봉규

    IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

    ,
    336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

    [목차]==================================================

    1. Gradle 설정

    2. Rx의 핵심개념

    Observable

    Subscriber

    Subscription

    Scheduler

    ======================================================

     


     

    1. Gradle 설정

    안드로이드 스튜디오에서 프로젝트를 만들고 app 디렉토리의 build.gradle 파일을 열어 봅시다.

    dependencies {

        compile fileTree(dir: 'libs', include: ['*.jar'])

        testCompile 'junit:junit:4.12'

        compile 'com.android.support:appcompat-v7:23.1.1'

        compile 'io.reactivex:rxandroid:1.1.0'

    }

    Gradle 설정에서 가장 중요한 부분은 의존성입니다.

    안드로이드도 자바 환경이기 때문에 rxjava를 포함하지 않는 것에 의아할 수 있습니다. RxAndorid는 RxJava에 대한 의존성을 가지고 있고 RxAndroid를 의존성에 포함하면 안드로이드 개발 환경에 문제가 없습니다. 혹시 새로운 버전을 적용하길 원한다면 명시적으로 지정할 수 있습니다.

    지금은 Rxandroid는 1.1.0 이고 Rxjava는 1.1.1 이라 별도로 의존성을 추가해 줬다.
    그리고 build.gradle 파일을 동기화 해주면 적용은 끝난다. 
    1.0 이하 버전에서는 각종 컴포넌트의 이벤트를 처리 부분이 포함되어 있었으나 
    라이브러리의 경량화와 유지보수등의 문제로 서드파티 라이브러리로 빼버렸다고 한다. 아래 외에도 다양한 라이브러리가 있으니 gitHub를 찾아보면 됨


    RxLifecycle - Lifecycle handling APIs for Android apps using RxJava
    RxBinding - RxJava binding APIs for Android's UI widgets.
    SqlBrite - A lightweight wrapper around SQLiteOpenHelper and ContentResolver which introduces reactive stream semantics to queries.
    Android-ReactiveLocation - Library that wraps location play services API boilerplate with a reactive friendly API.
    rx-preferences - Reactive SharedPreferences for Android

    2. Rx의 핵심개념

    • Observable 아이템을 발행하는 일
    • Subscriber(Observer) : 발행한 아이템을 소비 / Subscriber는 onNext() , onComplete(), onError()를 각각 가지고 있는데, onNext()는 여러번 호출 될 수 있고 이후에 onComplete(), onError()이 처리 / Observable 안에는 Subscriber를 가지고 있어서 onNext를 호출하면 다음에 있는 Observable 이나 Subscriber가 받을 수 있음
    • Subscription : Observable이 방출한 아이템을 Observer가 구독해서 반응을 합니다. 바로 이 때 Observable과 Observer의 연결을 Subscription을 통해서 이루어 집니다.
    • Scheduler : 멀티쓰레드, 비동기 방식 등에 유용하며, Scheduler는 해당 Observable, Operator, Subscriber를 어떤 스레드에서 수행할지 결정하는 것입니다. 스케줄러가 어떤 부분을 맞게 되는지는 subscribeOn과 observeOn으로 지정합니다
    • Observable과 Subscriber를 주목

    데이터의 강을 만드는 Observable과 강에서 데이터를 하나씩 건지는 Subscriber가 리액티브 프로그래밍의 가장 핵심적인 요소입니다. Observable은 데이터를 제공하는 생산자로 세가지 유형의 행동을 합니다.

    onNext - 새로운 데이터를 전달한다.

    onCompleted - 스트림의 종료.

    onError - 에러 신호를 전달한다


    package io.realm.simpleobservable;


    import android.os.Bundle;

    import android.support.v7.app.ActionBarActivity;

    import android.util.Log;

    import android.widget.TextView;


    import rx.Observable;

    import rx.Subscriber;


    public class MainActivity extends ActionBarActivity {

        private static final String TAG = MainActivity.class.getName();


        @Override

        protected void onCreate(Bundle savedInstanceState) {


            super.onCreate(savedInstanceState);

            setContentView(R.layout.activity_main);


            Observable<String> simpleObservable =

                    Observable.create(new Observable.OnSubscribe<String>() {

                        @Override

                        public void call(Subscriber<? super String> subscriber) {

                            subscriber.onNext("Hello RxAndroid !!");

                            subscriber.onCompleted();

                        }

                    });


            simpleObservable

                    .subscribe(new Subscriber<String>() {

                        @Overridea

                        public void onCompleted() {

                            Log.d(TAG, "complete!");

                        }


                        @Override

                        public void onError(Throwable e) {

                            Log.e(TAG, "error: " + e.getMessage());

                        }


                        @Override

                        public void onNext(String text) {

                            ((TextView) findViewById(R.id.textView)).setText(text);

                        }

                    });

        }

    }


     

     

     

    출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

    블로그 이미지

    차봉규

    IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

    ,
    336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

    [목차]==================================================

    1. Rx(Reactive Extensions)의 등장

    2. Rx을 이해하기 위한 준비

    3. Rx를 어디에 적용하고 사용할 수 있을까?

    ======================================================

     


     

    1. Rx(Reactive Extensions)의 등장


    여러 이슈를 처리해줄 적절한 도구는 닷넷 진영에서 등장했습니다. 마이크로소프트는 옵저버 패턴과 LINQ 스타일 문법을 확장하여 비동기처리와 이벤트 기반 프로그래밍을 할 수 있다는 것을 발견하고 연구진은 이를 정립하여 반응형 확장(Rx, Reactive Extensions)을 공개하였습니다.


    반응성 확장은 곧 여러 기술 기반 회사들의 호응을 얻었다. 넷플릭스(Netflix)는 Rx를 자바(RxJava) 환경에 옮겼고, 사운드클라우드(SoundColud)의 마티아스 캐플러(Matthias Käppler)는 RxJava를 안드로이드까지 (RxAndroid) 확장합니다.



    2. Rx을 이해하기 위한 준비

    • RX는 Observer pattern, Iterator pattern함수형 프로그래밍으로부터 나온 최고의 아이디어 조합이다
    • CREATE(만들고)’, ‘COMBINE(조합하고)’, ‘LISTEN(들어라? - 소비하라가 맞겠죠?)’
    • 일단 Rx를 제대로 이해하기 위해서는 아래 항목을 알고 있다면 쉽게 이해가 될거다.
    observer pattern
    Iterator pattern
    Functional programming
    MVVM model
    Dataflow programming

    3. Rx를 어디에 적용하고 사용할 수 있을까?

    • 비동기 통신을 순차적으로 보내고 싶다. (A작업이 끝나고 B가 시작됐으면 좋겠다.)
    • 콜백을 받았는데 받는 화면이 사라져서 null에러를 뿜으면서 죽는다.
    • 핸들러랑 콜백 지옥에 빠져서 디버깅도 힘들고 햇깔린다.
    • 두개의 비동기 처리가 완료된 후에 결과 값을 합쳐서 하나로 만들고 싶다.
    • 버튼을 연타로 눌러서 이벤트가 중복실행 된다.
    • 쓰레드 관리가 힘듭니다.

     

    출처 : 인터넷에서 RxAndroid 검색하여 필요한 정보를 다양한 사이트에서 종합하여 작성된 것입니다. 많은 사이트 내용을 종합하여 공부하여 작성하다보니 일일이 나열하지 못하였습니다. ㅈㅅ(_ _) 이글은 자유롭게 퍼 가셔서 도움이 되었으면 좋겠습니다. 감사합니다. 

    블로그 이미지

    차봉규

    IT개발 공부하려고 만든 블로그 입니다^^ 부족한점 많더라고...도움 많이 부탁해요

    ,