本文以WordCount为例, 画图说明spark程序的执行过程
WordCount就是统计一段数据中每个单词出现的次数, 例如hello spark hello you
这段文本中hello
出现2次, spark
出现1次, you
出现1次. 先上完整代码: object WordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://xxx:9000/spark.txt", 3); val words = lines.flatMap { line => line.split("\s+") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey { _ + _ } wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times.")) }}
上面几行代码就把hdfs上的spark.txt
中每个单词出现的个数计算完成.
val conf = new SparkConf().setAppName("WordCount");val sc = new SparkContext(conf)
创建完SparkContext之后, spark.txt的文件数如何被spark处理的呢,让我们一起看一下:
首先我们假设spark.txt在hdfs上对应着3个文件,文件内容都一样,sc.textFile("hdfs://xxx:9000/spark.txt", 3)
也执行了最小分区数为3. 然后wordcount执行过程如下: 
说明:- 绿,红,黄色箭头的地方发生了`Shuffer,把整个任务分成了2个Stage(2个蓝色虚线框)
- 红色虚线框代表一个Partition窄依赖(每个分区只被子RDD的一个分区所使用)的运行过程, 多个partition是并行执行的
- reduceByKey会先把每个Partition中的数据预聚合(groupByKey不会)
- Stage中的数据都是在内存中,不像MapReduce会频繁写磁盘,速度很快.
- 补充:其实
textFile,flatMap,map,reduceByKey
等transformation操作都是lazy的,程序执行到这里不会立即执行,只有再触发action操作的时候才会执行,此例中为wordCounts.foreach
这个action操作.
--Posted from Rpc