Spark #6 - Spark Dönüşümler, Aksiyon ve Fonksiyonlar
RDD’ler üzerinde gerçekleştirilen bir operasyon sonunda yeni bir RDD elde ettiğimiz operasyonlara transfer/dönüşüm (transformation) operasyonları adı verilir. Tamamı için geçerli olmasa da, çoğu dönüşüm RDD üzerindeki her bir veri üzerinde tek tek çalışır. Dönüşümler tembel (lazy) olarak hesaplanır. Yani dönüşüm işlemi bir aksiyon (action) uygulandığında gerçekleştirilir. Bir RDD üzerinde gerçekleştirilen dönüşüm, kaynak RDD ‘ yi değiştirmez. Farklı yeni bir RDD nesnesi oluşturulur. (Ancak hesaplanmaz!) Kaynak RDD başka operasyonlar için kullanılabilir. Örneğin: inputRDD=sc.textFile(“log.tct”) errorsRDD=inputRDD.filter(lambda x:”error” in x) warningsRDD=inputRDD.filter(lambda x:”error” in x) errAndwarnRDD=errorsRDD.union(warningsRDD)
Yukarıdaki örnekte inputRDD kullanılarak iki farklı RDD oluşturulmuştur. Union ile bu iki farklı RDD birleştirilmiştir. Aslında Union yerine aynı anda he error hem de warningleri filtreleyen tek bir lambda yazmak mümkündür. Fakat Union “lineage” kavramının anlaşılması açısından önemlidir. Spark RDD’ ler arasındaki tüm ilişki ve bağımlılıkları saklar. Bu ilişkileri gösteren graf’a lineage (bağımlılık) graf adı verilir.
Dönüşüm sonrasında oluşan RDD’ler üzerinde çalıştırılabilecek iki önemli operasyon take() ve collect() aksiyonlarıdır. Take ile büyük bir veri setinden oluşan RDD içerisinden küçük/az sayıda örnek almak mümkündür. Python: inputRDD=sc.textFile(“log.tct”) errorsRDD=inputRDD.filter(lambda x:”error” in x) warningsRDD=inputRDD.filter(lambda x:”error” in x) errAndwarnRDD=errorsRDD.union(warningsRDD) print “Filtrelenmiş RDD “ + errAndwarnRDD.count() + “ satirdan oluşmaktadır.” print “Hatalı satırların 10 tanesi:” for satir in errAndwarnRDD.take(10) print satir
Aynı örnek Java ile geliştirildiğinde (önceki yazılarımızdan RDD nin javada nasıl oluşturulduğuna bakabilirsiniz) System.out.Println(“ Hatali satirlarin 10 tanesi…”); for(String satir: errAndwarnRDD.take(10)) System.out.Println(satir);
Benzer şekilde RDD üzerinde collect() fonksiyonu çağırılarak, düğümler üzerindeki tüm bilgiler tek düğüm (Driver çalıştıran düğüm) üzerinde birleştirilebilir. Collect kullanarak RDD nin tamamının hafızaya aktarılması mümkündür. Ancak collect kullanıldığında driver düğümünü çalıştıran makinenin hafızasının ilgili RDD ‘ nin sığabileceği kadar büyük olması gereklidir. Çoğu durumda RDD’ ler driver hafızasına sığmayacak kadar büyüktür. Eğer programımız RDD’ yi filtreliyor ve gerçekten küçük boyutlu bir veri setine indirgiyor ise, bu bilgileri sürekli elimizin altında erişilebilir durumda tutmak için collect kullanılabilir. Hafızaya sığmayacak RDD’ leri saklanması isteniyor ise saveasTextFile(), saveasSequenceFile() gibi komutlar kullanılarak RDD’ nin Amazon s3 veya HDFS gibi dağıtık bir dosya sisteminde saklanması mümkündür. Son not olarak daha önce bahsi geçen bir konunun altını çizelim: RDD’ler üzerinde çalıştırılan her aksion RDD’ nin en baştan hesaplanması anlamına gelir. Eğer bazı ara değerleri saklamak ve daha hızlı kullanmak istiyorsak, .persist() veya .cache() kullanılabiliriz.









