مفاهیم Observable و Observer در RxJava ، ما چندین نوع از Observable ها یا انواع Observable در RxJava داریم که نحوه فراهم کردن داده توسط هرکدام از آنها متفاوت با دیگری ست.با اینکه تمامی Observable ها داده ها را منتشر یا emit میکنند اما بسته به استفاده مورد نیاز بهتر است که بهترین و نزدیک ترین گزینه را انتخاب کنیم تا کدهای بهینه تری داشته باشیم.
ما در RxJava2 انواع Observable های زیر را داریم.
در مقابل هرکدام از تولید کننده های داده فوق ، ما مشاهده گر یا observer های زیر را داریم
هر Observeble یا تولید کننده داده در تعداد داده تولیدی با یکدیگر متفاوت است.
میتوانیم تفاوت را در جدول زیر مشاهده کنیم.
خب حالا بریم سراغ مقایسه های ملموس تر از مفهوم هر Observable تا دقیق تر متوجه تفاوت emit داده ها شویم.
در RxJava میتوان گفت بیشترین استفاده به Observable تعلق میگیرد. Observable میتواند یک یا چند داده emit یا منتشر کند.همچنین میتوانیم یک لیست را بصورت یکجا و یکباره در Observable دریافت کنیم اما اگر میخواهید که یک یا تعدادی operator برروی داده های emit شده اعمال کنید بهتر است که بصورت تکی یا single داده ها را emit کنید.
// emitting single Note
Observable
// emitting list of notes at once, but in this case considering Single Observable is best option
Observable>
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import java.util.ArrayList;
import java.util.List;
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class ObserverActivity extends AppCompatActivity {
private static final String TAG = ObserverActivity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_observer);
Observable<Note> notesObservable = getNotesObservable();
Observer<Note> notesObserver = getNotesObserver();
notesObservable.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribeWith(notesObserver);
}
private Observer<Note> getNotesObserver() {
return new Observer<Note>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onNext(Note note) {
Log.d(TAG, "onNext: " + note.getNote());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
}
private Observable<Note> getNotesObservable() {
final List<Note> notes = prepareNotes();
return Observable.create(new ObservableOnSubscribe<Note>() {
@Override
public void subscribe(ObservableEmitter<Note> emitter) throws Exception {
for (Note note : notes) {
if (!emitter.isDisposed()) {
emitter.onNext(note);
}
}
// all notes are emitted
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
private List<Note> prepareNotes() {
List<Note> notes = new ArrayList<>();
notes.add(new Note(1, "Buy tooth paste!"));
notes.add(new Note(2, "Call brother!"));
notes.add(new Note(3, "Watch Narcos tonight!"));
notes.add(new Note(4, "Pay power bill!"));
return notes;
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
کلاس قالب
public class Note {
int id;
String note;
// getters an setters
}
خروجی
همچنین بخوانید: کتابخانه RxJava را کاربردی بیاموزیم
onSubscribe
! onNext: Buy tooth paste
! onNext: Call brother
! onNext: Watch Narcos tonight
! onNext: Pay power bill
onComplete
برخلاف Observable در Single ما تنها یک داده را منتشر یا emit میکنیم و یا خطای رخ داده را اعلام میکنیم.یکی از موارد استفاده Single را میتوان زمان اتصال به وب سرویس ها دانست. چرا که در این درخواست ها تنها response گرفتن یا نگرفتن و مشاهده خطا در یک مرحله مدنظر است.
نکته
در مفهوم SingleObserver ما دیگر onNext نخواهیم داشت و در عوض آن ما از onSuccess برای گرفتن ریسپانس استفاده میکنیم
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class SingleObserverActivity extends AppCompatActivity {
private static final String TAG = SingleObserverActivity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_single_observer);
Single<Note> noteObservable = getNoteObservable();
SingleObserver<Note> singleObserver = getSingleObserver();
noteObservable
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(singleObserver);
}
private SingleObserver<Note> getSingleObserver() {
return new SingleObserver<Note>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onSuccess(Note note) {
Log.e(TAG, "onSuccess: " + note.getNote());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
};
}
private Single<Note> getNoteObservable() {
return Single.create(new SingleOnSubscribe<Note>() {
@Override
public void subscribe(SingleEmitter<Note> emitter) throws Exception {
Note note = new Note(1, "Buy milk!");
emitter.onSuccess(note);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
خروجی
onSubscribe
onSuccess: Buy milk!
زمانی از Maybe Observable استفاده کنید که مطمئن نیستید داده ای حتما emit می شود.چرا که Maybe Observable زمانی استفاده می شود که داده ای که شما انتظارش را دارید که emit یا منتشر شود کاملا اختیاری یا Optional است.
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class MaybeObserverActivity extends AppCompatActivity {
private static final String TAG = MaybeObserverActivity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_maybe_observer);
Maybe<Note> noteObservable = getNoteObservable();
MaybeObserver<Note> noteObserver = getNoteObserver();
noteObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(noteObserver);
}
private MaybeObserver<Note> getNoteObserver() {
return new MaybeObserver<Note>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onSuccess(Note note) {
Log.d(TAG, "onSuccess: " + note.getNote());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
}
/**
* Emits optional data (0 or 1 emission)
* But for now it emits 1 Note always
*/
private Maybe<Note> getNoteObservable() {
return Maybe.create(new MaybeOnSubscribe<Note>() {
@Override
public void subscribe(MaybeEmitter<Note> emitter) throws Exception {
Note note = new Note(1, "Call brother!");
if (!emitter.isDisposed()) {
emitter.onSuccess(note);
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
زمانی از Completable Observable استفاده کنید که انتظار دریافت هیچ داده ای را ندارید. و تنها می خواهیم مطلع شوید که درخواست موفق بوده یا شکست خورده.از موارد استفاده Completable Observable میتوان به درخواست هایی مثل به روز رسانی داده های سرور با PUT اشاره کرد که نیازی به دریافت داده خاصی از وب سرویس نیست و تنها اطلاع از موفق بودن یا نبودن درخواست کافی ست.
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class CompletableObserverActivity extends AppCompatActivity {
private static final String TAG = CompletableObserverActivity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_completable_observer);
Note note = new Note(1, "Home work!");
Completable completableObservable = updateNote(note);
CompletableObserver completableObserver = completableObserver();
completableObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(completableObserver);
}
/**
* Assume this making PUT request to server to update the Note
*/
private Completable updateNote(Note note) {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
if (!emitter.isDisposed()) {
Thread.sleep(1000);
emitter.onComplete();
}
}
});
}
private CompletableObserver completableObserver() {
return new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: Note updated successfully!");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
};
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
خروجی
همچنین بخوانید: آموزش الگوی MVVM و RxJava به صورت پروژه محور + سورس کد
onSubscribe
! onComplete: Note updated successfully
با استفاده از Flowable observable شما می توانید مقدار بسیار بیشتری از آنچه Observer ها میتوانند بررسی کنند را مدیریت کنید.طبق داکیومنت اصلی ، Flowable میتواند برای سورس هایی که داده ی تولیدی آنها بیش از 10K است مورد استفاده قرار گیرد.
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Flowable;
import io.reactivex.SingleObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.schedulers.Schedulers;
public class FlowableObserverActivity extends AppCompatActivity {
private static final String TAG = FlowableObserverActivity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_flowable_observer);
Flowable<Integer> flowableObservable = getFlowableObservable();
SingleObserver<Integer> observer = getFlowableObserver();
flowableObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.reduce(0, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer result, Integer number) {
//Log.e(TAG, "Result: " + result + ", new number: " + number);
return result + number;
}
})
.subscribe(observer);
}
private SingleObserver<Integer> getFlowableObserver() {
return new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onSuccess(Integer integer) {
Log.d(TAG, "onSuccess: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
};
}
private Flowable<Integer> getFlowableObservable() {
return Flowable.range(1, 100);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
}
خروجی
onSubscribe
onSuccess: 5050
امیدوارم مقاله مفهوم های Observable و Observer در RxJava برای شما مفید بوده باشد.
1 Comment
واقعا عالی بود.
ممنون از شما