spark
Apache spark
-------------
general purpose
in memory
compute engine
spark--replacement of mr
--plug and play compute engine.
-needs two things to work with
--storage--local storage,s3,hdfs
--resource management--yarn,mesos,kubernetes
In memory:
------------------
2 io operation required in spark(ideally)
10 to 100 times faster than mr
-------------------------
General purpose:
--------------------
--all things-cleaning,querying,machine learning,data ingestion
MR--high latency--more disk read and write
spark--low latency
--------------------------------
RDD:(resilient distributed dataset)
-----------
basic unit--holds the data in spark
directed acyclic graph
two 2 kinds of operations:
-----------------------------
Transformation are lazy
Action are not
collect--action
when execute spark code--transformations called dag created.
in spark--1 driver node and all worker nodes
rdd--no of block = no of partitions in memory distributed.
--distributed
--in memory
resilient--if we loose rdd then we can recover it back
--can recovered from lineage graph.
rdd1
|map
rdd2
|filter
|
rdd3
if rdd3--lost--then chk for parent rdd using lineage graph and apply filter
lineage graph--keeps trakes of transformation--once action is called
rdds are immutable--once loadded data cant be changed.
if it is not immutable then--we can't be able to regenerate in case of a failure.
why transformation are lazy?
if its not lazy then
rdd1 = load file1 from hdfs(file is 1 gb)
rdd1.print(line1)
it will load whole file to momory and print--so not optimized.
as lazy
so one line comes to memory--so optimized.
when rdd--filled with data--called--materialized
in map--works on each record
--no of i/p = no of o/p
rdd1 = load file
rdd2= map
rdd3 = filter
rdd3.println
spark internally moved the filter up and map pushed down
this is called predicate push down
if u want to write in python
pyspark
for scala--
spark-shell
for java no interactive.
use in ide and bundle it.
sc--spark context--when u use sc then the code will run in spark cluster.--to achive paralelism.
--entry point to spark cluster.
5+8
13--running in local
---------------------
flatmap--takes each line as input.
val rdd1 = sc.textFile()
val rdd2 = rdd1.flatMap(x=> x.split(" "))
when split --then we get a array.--all the words in a single array.
val rdd3 = rdd2.map(x=>(x,1))
val rdd4 = rdd3.reduceByKey((x,y) => x+y)
left side x, y---rows..
sort the same values-- and add.
reducebyKey --works on two parameter.
(always works on value)
localhost:4040--spark ui
ui comes of when evey u are connected to spark-shell
base rdd-- first rdd
rdd.collect
(in array it will show)
to see in new line:
rdd.collect.foreach.println
pyspark
-------------
in python no val or var
remove val
in scala we have anonoymous function--called lambda in python
put lambda :
rdd4.saveAsTextFile() (non existance directory)
rdd1 = sc.textFile("file:///home/cloudera/file1")
here file://-- for local path
then path.
when-- no paramer in function--then remove bracket--but python give parameter.
==================================
in eclipse:
either write main method or use extends App.
in eclipse--sc not avialable
set logging line
---------------------
Logger.getLogger("org").setLevel(Level.ERROR)
(setting logging level to error)
val sc = new SparkContext("local[*]","wordcount")
(ctrl+shift+O) to import library(add external library)
here * means use all core-- all CPu--dual core
val input = sc.textFile("")
val words = input.flatMap(x => x.split(""))
val wordMap = words.map(x => (x,1))(this is a tuple)
val finalCount = wordMap.reduceByKey((a,b) => a+b)
finalCount.collect.foreach(println)
spark 2.4.4--compatible with 2.11
2.4.2 spark--2.12 scala
2.4.3 spark--then 2.11 scala version
----------------------------
when u are writing in ide-- dag is visible till thr timr job is running.
as soon as job terminates or finishes--won't be visible
-- in company-- admin will set up--history server--where u can see the dag(on one port)
scala.io.StdIn.readLine()
(once program completed -- holds the screen..
if we want to find the top 10 words..
then sorting on values.
there is no transformation as sortByValues. but we have sortByKey.
so we will give the input as (big,2) as (2,big)
by deafult-- sortByKey sort on asc order to make desc
sortByKey(false)
or u can use sortBy(x => x._2)
(sort on 2nd column.)
worditout.com
Comments
Post a Comment