spark

 Apache spark

-------------

general purpose

in memory

compute engine


spark--replacement of mr

--plug and play compute engine.

-needs two things to work with

--storage--local storage,s3,hdfs

--resource management--yarn,mesos,kubernetes


In memory:

------------------


2 io operation required in spark(ideally)


10 to 100 times faster than mr


-------------------------


General purpose:

--------------------


--all things-cleaning,querying,machine learning,data ingestion


MR--high latency--more disk read and write

spark--low latency

--------------------------------


RDD:(resilient distributed dataset)

-----------

basic unit--holds the data in spark


directed acyclic graph


two 2 kinds of operations:

-----------------------------

Transformation are lazy

Action are not


collect--action


when execute spark code--transformations called dag created.


in spark--1 driver node and all worker nodes


rdd--no of block = no of partitions in memory distributed.


--distributed 

--in memory

resilient--if we loose rdd then we can recover it back


--can recovered from lineage graph.


rdd1

|map

rdd2

|filter

|

rdd3


if rdd3--lost--then chk for parent rdd using lineage graph and apply filter


lineage graph--keeps trakes of transformation--once action is called


rdds are immutable--once loadded data cant be changed.


if it is not immutable then--we can't be able to regenerate in case of a failure.


why transformation are lazy?

if its not lazy then


rdd1 = load file1 from hdfs(file is 1 gb)

rdd1.print(line1)


it will load whole file to momory and print--so not optimized.


as lazy

so one line comes to memory--so optimized.


when rdd--filled with data--called--materialized


in map--works on each record

--no of i/p = no of o/p


rdd1 = load file

rdd2= map

rdd3 = filter

rdd3.println


spark internally moved the filter up and map pushed down


this is called predicate push down


if u want to write in python


pyspark

for scala--

spark-shell


for java no interactive.

use in ide and bundle it.


sc--spark context--when u use sc then the code will run in spark cluster.--to achive paralelism.

--entry point to spark cluster.


5+8

13--running in local


---------------------


flatmap--takes each line as input.


val rdd1 = sc.textFile()

val rdd2 = rdd1.flatMap(x=> x.split(" "))


when split --then we get a array.--all the words in a single array.


val rdd3 = rdd2.map(x=>(x,1))


val rdd4 = rdd3.reduceByKey((x,y) => x+y)


left side x, y---rows..

sort the same values-- and add.


reducebyKey --works on two parameter.

(always works on value)

localhost:4040--spark ui


ui comes of when evey u are connected to spark-shell


base rdd-- first rdd


rdd.collect

(in array it will show)


to see in new line:

rdd.collect.foreach.println


pyspark

-------------


in python no val or var


remove val


in scala we have anonoymous function--called lambda in python


put lambda :


rdd4.saveAsTextFile()  (non existance directory)


rdd1 = sc.textFile("file:///home/cloudera/file1")


here file://-- for local path

then path.


when-- no paramer in function--then remove bracket--but python give parameter.

==================================


in eclipse:


either write main method or use extends App.


in eclipse--sc not avialable


set logging line

---------------------


Logger.getLogger("org").setLevel(Level.ERROR)

(setting logging level to error)



val sc = new SparkContext("local[*]","wordcount")

(ctrl+shift+O) to import library(add external library)

here * means use all core-- all CPu--dual core


val input = sc.textFile("")


val words = input.flatMap(x => x.split(""))


val wordMap = words.map(x => (x,1))(this is a tuple)


val finalCount = wordMap.reduceByKey((a,b) => a+b)


finalCount.collect.foreach(println)



spark 2.4.4--compatible with 2.11

2.4.2 spark--2.12 scala


2.4.3 spark--then 2.11 scala version



----------------------------

when u are writing in ide-- dag is visible till thr timr job is running.

as soon as job terminates or finishes--won't be visible


-- in company-- admin will set up--history server--where u can see the dag(on one port)


scala.io.StdIn.readLine()

(once program completed -- holds the screen..


if we want to find the top 10 words..

then sorting on values.


there is no transformation as sortByValues. but we have sortByKey.


so we will give the input as (big,2) as (2,big)


by deafult-- sortByKey sort on asc order to make desc


sortByKey(false)


or u can use sortBy(x => x._2)

(sort on 2nd column.)


worditout.com


Comments

Popular posts from this blog

scala-4