RxJs Notlarım

Cem Topkaya
6 min readAug 2, 2020

--

Özetle bilmeniz gerekenler:

2009 Yılında verilerin artması hayatın akışının hızlanmasına karşı Reactive Extensions (Rx) for .NET diye bir yaklaşımla başlıyor hikaye. Önce Microsoft bunu .Net için geliştiriyor ve sonra diğer diller için uygulanıyor. Tafsilatlı anlatım için şu videoyu izleyebilirsiniz. Ayrıca bu makale de çok güzel anlatıyor Reactive Programlamayı Javascript örnekleriyle.

Çizim GPL lisanslıdır, özgürce kullanabilirsiniz.

Microsoft tarafındaki Reactive Extensions konusunu anlatan videoda dikkatimi çekenlerin bir kısmı:

  • Cold Observable: Bir akışa abone olduğunuz andan sonraki akışları görmeniz, geçmişi değil sonraki akışları alabilmenizi sağlayan nesneler.
  • Hot Observable: Aboneliğiniz başladığında dahi sizden önceki tüm akışları alabilmenizi sağlayan nesneler.
  • Önceden IEnumerable vardı ve var
  • Şimdi IObservable var
  • Abonelikten çıkmak için IDisposable arayüzü kullanılmış
const observer = {  next: nesne => console.log(`Gelen nesne: ${nesne}`) ,  error: hata => console.log(`Hata oluştu: ${hata}`) ,  complete: () => console.log(`Akış tamamlandı`) ,}

Marble Diagram

Yukarıdaki gibi gösterim şekline marble diyagramı deniyor. Tüm rxjs operatörlerinin etkileşimli (interactive) diyagramını görmek için bu adreste bekleniyorsunuz.

RxJs Operatörlerinden Hangisini Kullanacağız

Çok ama çooook operatör var RxJs içinde:

Tonla operatör var ve bunların arasında hangisini tercih edeceğinizi bulmak için bir kolaylık sağlanmış bu adreste.

Operatörler

timer

import { timer } from 'rxjs';ilk_yayim_kac_saniye_sonra_yapilacak = 1000
ilkinden_sonra_kac_saniye_araliklarla_yayim_yapilacak = 2000
source = timer(
ilk_yayim_kac_saniye_sonra_yapilacak,
ilkinden_sonra_kac_saniye_araliklarla_yayim_yapilacak
);

Veya tarih verilebilir

import { timer } from 'rxjs';const d1 = new Date()
d1.setMilliseconds(d1.getMilliseconds()+10000)
// şimdiden 10 saniye sonra başlasın
// 2 saniyede bir yayım yapsın
const source = timer(d1,2000);

merge

Şart gözetmeksizin akışların hangisine veri gelirse çıkış akışına eklenir. Böylece akışlar tek bir çıkışta birleştirilmiş olur.

combineLatest

“Sonuncuyla Birleştir” anlamındaki CombineLatest, akışlardan birine (burada sayı veya harf) herhangi bir eleman düştüğü anda birleştirme yapar ve yayımlar.

// a, b ve c akışlarına her eleman geldiğinde 
// o anki son elemanları yayımlar
combineLatest([a$, b$, c$])

Fermuarda her tırnağın karşısındaki tırnakla birleştirilmesi gibidir ancak tırnakların karşılıklı olması gerekmez !

  1. Önce A harfi alttaki akışa düştü ama birleştirme için üstteki akışa da bir eleman düşmesini bekledi.
  2. Üstteki akışa 1 gelince her iki akışta eleman oldu. 1 ve A elemanları akışlarda o zamana kadarki son elemanlar oldukları için birleştirdi ve 1A yazdı.
  3. Alttaki akışa C harfi düştüğü anda her iki akışta o an bulunan son elemanları birleştirdi ve 1C yazdı.
  4. Alttaki akışa B harfi düştüğü anda her iki akışta o an bulunan son elemanları birleştirdi ve 1B yazdı.
  5. Üstteki akışa 2 rakamı düştüğü anda her iki akışta o an bulunan son elemanları birleştirdi ve 2B yazdı.

withLatesFrom

CombineLatest’a benzer ancak farkı şudur: herhangi bir akışa eleman düşünce değil, “istediğiniz bir akışa eleman düşünce” tetiklenir.

// yayımlama işi, tetikleyici akışa eleman düşünce olacak
// tüm akışların tetiklendiği andaki son elemanlarını yayımlar
tetikleyiciAkis$.pipe( withLatestFrom(akis1$, akis2$) )

zip

Birden fazla akışı birleştirirken akışların her bir elemanına karşılık gelen elemanı eşleştirir

  1. Üst akıştaki eleman 1 ancak aynı anda alt akışta eleman yok.
  2. Alt akışa A gelince üst akıştaki 1 elemanıyla eşleştirilir.
  3. Üst akışa 2 gelince alt akışta karşılığı gelinceye kadar bekliyor.
  4. Alt akışa B gelince üst akıştaki karşılığı 2 ile eşleştirilir 2B olur.
  5. Alt akışa C gelir ama üstte karşılığı yok. Bekleyecek!
  6. Alt akışa D gelir ama üstte karşılığı yok. Bekleyecek!
  7. Üst akışa 3 gelince alttaki karşılığı C ile eşleştirilir. 3C olur ama D beklemeye devam eder.

forkJoin

Eldeki akışların tamamlanmasını bekler ve son elemanlarını birleştirir.

// Tüm akışlar tamamlandığında son elemanlardan yayım yapar
forkJoin([a$, b$, c$, d$])

filter

Javascript içinde dizilerin filter fonksiyonu gibi çalışır. Süzgeçten geçebilenleri bırakır.

scan & reduce

unicast — multicast

Observable varsayılan olarak bir tek noktaya yayın yapar. Her abone için aşağıdaki örnekte keyfi bir değer üretilmiş oldu.

import * as Rx from "rxjs"; const observable = Rx.Observable.create((observer) => {
observer.next(Math.random());
});
// subscription 1
observable.subscribe((data) => {
console.log(data); // 0.24957144215097515 (random number)
});
// subscription 2
observable.subscribe((data) => {
console.log(data); // 0.004617340049055896 (random number)
});

Ancak Subject kullandığımızda tüm aboneler aynı veriye ulaşmış oldular.

/**
* MultiCast için Subject veya BehaviorSubject kullanılır.
*/
const subject = new Rx.Subject();// 1. abone
subject.subscribe(d=>console.log('MultiCast: ',d));
// 2. abone
subject.subscribe(d=>console.log('MultiCast: ',d));
subject.next(Math.random());

concatMap (SERİ), mergeMap (PARALEL) ve switchMap

import { of } from 'rxjs'; 
import { map,concatMap,mergeMap,switchMap, tap, delay } from 'rxjs/operators';
function getRandomInt(max) {
return Math.floor(Math.random() * Math.floor(max));
}
function divLog(val, target){
const app = document.getElementById(target);
app.innerHTML += val+"<br>"
}
const a = of(1,2,3).pipe(
map(x => `Hello ${x}!`)
).subscribe(x=>divLog(x,"normal"));
/*****************************************************************
* concatMap (SERİ Olarak işlem yapar)
*
* Akışa düşen elemanın (observable)
* işlemi tamamlanmadan diğerine geçmez
*/
of(1,2,3).pipe(
tap(x=>divLog("Tap: "+x,"concatMap")),
concatMap(x => of(`Hello ${x}!`).pipe(delay(getRandomInt(5000))))
).subscribe(x=>divLog(x,"concatMap"));
/*
Tap: 1
Tap: 2
Tap: 3
Hello 1!
Hello 2!
Hello 3!
*/
/*****************************************************************
* mergeMap (PARALEL olarak işlem yapar)
* Akışa düşen observable elemanın
* işleminin tamamlanmasını beklemeden diğerine geçer
*/
of(1,2,3).pipe(
tap(x=>divLog("Tap: "+x,"mergeMap")),
mergeMap(x => of(`Hello ${x}!`).pipe(delay(getRandomInt(5000))))
).subscribe(x=>divLog(x,"mergeMap"));
/*
Tap: 1
Tap: 2
Tap: 3
Hello 2!
Hello 3!
Hello 1!
*/
/*****************************************************************
* switchMap ()
* Akışa düşen elemanın (observable) işlemini başlatır
* ancak yeni bir eleman düştüğünde önceki işlemi iptal eder
* ve yenisini işlemeye başlar.
* Bu yüzden sadece sonuncu elemanı göreceğiz çünkü 1 ve 2. işler
* 3. eleman düştüğü anda iptal edilecektir.
*/
of(1,2,3).pipe(
tap(x=>divLog("Tap: "+x,"switchMap")),
switchMap(x => of(`Hello ${x}!`).pipe(delay(getRandomInt(5000))))
).subscribe(x=>divLog(x,"switchMap"));
/*
switchMap
Tap: 1
Tap: 2
Tap: 3
Hello 3!
*/

--

--

Cem Topkaya
Cem Topkaya

Written by Cem Topkaya

Evlat, kardeş, ağabey, eş, baba, müzik sever, öğrenmek ister, paylaşmaya can atar, iyi biri olmaya çalışır, hakkı geçenlerden helallik ister vs.

No responses yet