Spark实战(5)_Spark Core核心编程

in #cn7 years ago

Spark版本

cdh5.9.0集成的spark的版本1.6.0,集成的hadoop版本2.6.0。查看的网址:

http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/

如果用cdh5.9.0 parcels离线安装自带的spark(on yarn),启动时提示缺少包,需要修改spark-env.sh的配置SPARK_DIST_CLASSPATH,里面默认的配置为在线用rpm方式安装的配置,修改为/opt/clouderra/parcels/CDH/lib。

Spark运行模式

Spark 的执行模式有 local、Yarn、Standalone、Mesos四类。

开发和测试用 local 模式,其实就是用多线程模似分布式执行。

如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署。

当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos。

Standalone模式,即独立模式,master/slave(worker),自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。需要启动Master和Worker守护进程,即服务端进程,就好比Mapreduce的JobTracker和TaskTracker。

Spark on Yarn: 把Spark作业的调度和资源分配交给Yarn,Yarn相当于Spark集群的Master,Spark无自己的守护进程,仅仅作为客户端存在。

MR(Hive)、Storm、Tez、Spark,期望这些作业有统一的调度和资源分配的角色,Yarn(MR2)。

Spark组件

Cluster Manager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。

Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。

Driver:运行Application的main()函数并创建SparkContext。

Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。

SparkContext:整个应用的上下文,控制应用的生命周期,提交作业的入口。

RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

DAG Scheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler(NodeManager)中。

TaskScheduler:将任务(Task)分发给Executor执行。

Stage:一个Spark作业一般包含一到多个Stage。

Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。

Spark集群部署

配置免密钥登录

CDH中用CM进行集群管理,集群直接互联是通过ssh协议,但我们不需要配置ssh免密匙访问,因为CM中配置了通过相同帐户密码访问。用Apache Spark的话,必须配置ssh免密匙访问。

安装Scala和Spark

修改环境变量

vi /etc/profile

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export SPARK_HOME=/opt/soft/spark2.0/spark-2.2.0-bin-hadoop2.6
export SCALA_HOME=/opt/soft/spark2.0/scala-2.11.8
export JAVA_HOME=/opt/soft/jdk1.8.0_131
export HADOOP_CONF_DIR=/etc/hadoop/conf

修改$SPARK_HOME/conf
mv slaves.template slaves,slaves里配置工作节点主机名列表。

mv spark-env.sh.template spark-env.sh,spark-env.sh配置一些环境变量,由于我们用Yarn模式,这里面不用配置。如果是standalone模式呢?

分布式模式运行测试,

spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--num-executors 1 \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--conf "spark.app.name=SparkPi" \
/export/servers/spark-2.0.2-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.0.2.jar

报错信息,设置HADOOP_CONF_DIR环境变量即可。

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

如果内存不足,报错的话,在cm里进行yarn的配置,如下2个设置为2g:
yarn.scheduler.maximum-allocation-mbyarn.nodemanager.resource.memory-mb
保存后,部署客户端配置,把cm界面里修改过的参数同步到每个节点的xml配置文件里,重启Yarn服务。

RDD

Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。在Spark 中,对数据的所有操作不外乎创建RDD和操作RDD 。

对数据(Seq)的操作,比如List:转换(Transform),map、filter,返回List类型,数据转换/加工过程。Action:head、tail、count ,返回不同类型,即我们需要的结果。

RDD就是类似集合类(Iterable),具有和集合类几乎完全相同的操作(Transform和Action)。而在这一切背后,Spark 会自动将RDD 中的数据分发到集群上,并将操作并行化执行。

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。

RDD是类似Iterable的数据结构。

idea中进行spark core开发,需要在工程中新建一个lib目录,把spark的包复制进去,在Project Structure中的Libraries中把包加进来。

创建RDD

用户可以使用两种方法创建RDD:

  • 用SparkContext的parallelize(Seq)把Seq转为RDD。该方式常用于学习和实验。
  • 读外部数据,通常是读HDFS、消息队列等。
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) : RDD[T]

numSlices是并行度,具有初始值所以调用时可以只给一个参数。比如可以parallelize(seq),可以parallelize(seq,10),并行度为10意味着Spark把数据分割为10份,放在集群上运行。defaultParallelism是机器CPU个数。

# 查看CPU的个数
cat /proc/cpuinfo| grep "processor"| wc -l

RDD的操作

RDD有两种类型的操作:Transform操作和Action操作。注:就是Iterable类中的函数,Transform返回Iterable本身类型,Action返回新类型。

Iterable: Seq、Map

对应到

RDD:单元素RDD、PairRDD

Transform操作会由一个RDD生成一个新的RDD,这个过程中不进行实质计算,只有当第一次Action操作时才会真正计算。称作Lazy计算,惰性计算。

Action操作会对RDD计算出一个结果,可以把结果返回,或把结果存储到外部存储系统(如HDFS)中。

RDD是类似Iterable的数据结构,也具有Iterable类的Map()、filter()、flatMap()等高阶函数。

Action操作

collect():把数据返回驱动器程序中最简单、最常见的操作, 通常在单元测试中使用,数据量不能太大,因为放在内存中,数据量大会内存溢出。

reduce():类似sum(),如:val sum = rdd.reduce((x, y) => x + y),结果同sum。

fold():和reduce()类似,多一个“初始值”,当初始值为零时效果同reduce()。fold(0) = reduce()

take(n):返回RDD中的n个元素,并且尝试只访问尽量少的分区。

top(n):从RDD中获取前几个元素。

count():用来返回元素的个数。

countByValue():返回一个从各值到值对应的计数的映射表。

sum():返回汇总。

fold(n)的执行原理:每个分区里进行这样的计算:初始值+sum(元素),最后进行:初始值+sum(分区sum值),初始值累加次数为分区数+1次。

// 集群模式执行的话,通常用数据之前需要调rdd.collect()
rdd.collect().foreach(println)

// 集群模式用它可能拿不全
rdd.foreach(println)

持久化函数persist()

Spark提供rdd的persist()函数来解决这个重复计算的问题,persist()把需要重复使用的rdd存起来,这样仅第一个Action操作才会计算,其他Action操作不需要再计算。

当我们执行rdd的persist()时,计算出RDD的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。

rdd的persist()有5种持久化级别,分别是:来自org.apache.spark.storage.StorageLevel的定义。

级别使用的空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER部分部分如果数据在内存中放不下,则溢写到磁盘上,在内存中存放序列化后的数据
DISK_ONLY
val rdd1 = rdd.map(x => x+1)
rdd1.persist(StorageLevel.DISK_ONLY)
println(rdd1.first())
println(rdd1.count())
println(rdd1.sum())
println(rdd1.collect().mkString(","))
rdd1.unpersist()  //释放缓存,必须手工释放

如果觉得数据过于重要,怕存一份有风险,则可以存2份:

rdd1.persist(StorageLevel.MEMORY_ONLY_2)

注意

如果要缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。但是对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。不必担心你的作业因为缓存了太多数据而被打断。

如果MEMORY_ONLY内存不足的时候,Spark会自动用硬盘来承载。

WordCount案例

package com.padluo.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("WordCount")
      .set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)

    //    val lines = sc.textFile("hdfs://spark01:9000/user/spark/spark.txt", minPartitions = 1)
    //    val lines = sc.textFile("/home/hadoop/spark.txt", minPartitions = 1)
    //    val lines = sc.textFile("C:/Users/Administrator/Desktop/spark.txt", minPartitions = 1)
    val lines = sc.textFile("file:///D:/Java/idea/IdeaProjects/spark-study/spark-core/resources/spark.txt", minPartitions = 1)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    val wordCount = pairs.reduceByKey(_ + _)
    wordCount.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
    println(lines.count())
    println(lines.first())
  }
}

List方式创建RDD,

从本地文件/HDFS文件读取文件创建RDD,注意本地文件路径和HDFS文件路径的写法。

val rdd = sc.textFile("file:///c:/spark.txt")
val rdd = sc.textFile("hdfs://ip:8020/...")

PairRDD

通常多列RDD会转为PairRDD进行操作,这样就可以用PairRDD的Transform和Action操作。

PairRDD的创建

  • 可以通过sc.parallelize创建
  • 程序中其他RDD转的

pairRDD的元素不是Map,而是Tuple2。

PairRDD的操作

reduceByKey(func),合并具有相同键的值。

groupByKey(),对具有相同键的值进行分组。

mapValues(func),对pariRDD中的每个值应用一个函数而不改变键。mapValues(func)函数,功能类似于map{case (x, y): (x,func(y))}

// pariRDD使用map和filter,应结合case
pairRdd.map{
  case (k, v) => (k, v + 1)
}

// key保持不变,value加1
pairRdd.mapValues(v => v + 1)

// key进行分组,value两两相加
pairRdd.reduceByKey(_ + _)
package com.padluo.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object PairRddTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("PairRddTest")
      .setMaster("local")
      .set("spark.testing.memory", "471859200")
    val sc = new SparkContext(conf) //Driver类

    val rdd = sc.parallelize(List((2, 3), (4, 5), (3, 2), (2, 1)))
    val rdd2 = sc.parallelize(List(2, 3, 5)).map(i => (i, i + 2)) // 创建pairRdd主要方式

    rdd2.map {
      case (k, v) => (k, v + 1)
    } // i=>i+1 返回单元素,rdd2执行map函数需要返回和rdd2相同(PairRDD)类型
    rdd2.filter {
      case (k, v) => v % 2 == 0
    }
    // (2,3),(4,5),(3,2),(2,1)  -》(2,4),(4,6),(3,3),(2,2)
    rdd.mapValues(i => i + 1).foreach(println _)
    rdd.reduceByKey(_ + _).foreach(println _) // (2,3),(4,5),(3,2),(2,1) ->(2,4),(4,5),(3,2)
    rdd.groupByKey().foreach(println)
    println(rdd.groupByKey().getClass)
  }
}

案例:计算每个键对应的平均值

package com.padluo.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object AvgTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("PairRddTest")
      .setMaster("local")
      .set("spark.testing.memory", "471859200")
    val sc = new SparkContext(conf) //Driver类

    val rdd = sc.parallelize(List((3, 4), (2, 4), (3, 2), (2, 6)))
    rdd.mapValues(i => (i, 1)) // (3, (4, 1)
      .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .mapValues(i => i._1 / i._2)
      .foreach(println)
  }
}

Join案例

join,对两个RDD进行内连接。

leftOuterJoin,对两个RDD进行左连接。

rightOuterJoin,对两个RDD进行右连接。

效果同sql里,可以两个rdd想象成2张数据表(包含2个字段)。

package com.padluo.spark.core

import org.apache.spark.{SparkConf, SparkContext}

object JoinTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("PairRddTest")
      .setMaster("local")
      .set("spark.testing.memory", "471859200")
    val sc = new SparkContext(conf) //Driver类

    val table1 = sc.parallelize(List(("k1", 1), ("k2", 2), ("k3", 3))) // 看做两个字段
    val table2 = sc.parallelize(List(("k1", 10), ("k2", 20), ("k4", 30)))

    table1.join(table2).foreach(println) //("k1",(1,10)),("k2",(2,20))
    println("-----")
    table1.leftOuterJoin(table2).foreach(println)
    println("-----")
    table1.rightOuterJoin(table2).foreach(println)
  }
}

电商流量统计案例实战

注意点,

  • cache()和persist()的区别。
  • 分区key的设置后后面RDD操作key的对应关系。
  • 结果存储到HDFS。
  • 缓存的释放。
package com.padluo.spark.sql

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object VisitCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("VisitCount")
      .setMaster("local")
      .set("spark.testing.memory", "471859200")
    val sc = new SparkContext(conf)
    val linesRDD = sc.textFile("D:\\Java\\idea\\IdeaProjects\\spark-study\\spark-sql\\src\\main\\datasource\\2015082818")
      .cache() // cache和persist的区别
      .filter(line => line.length > 0)
      .map { line =>
        val arr = line.split("\t") // 需要转义吗?"\t"还是"\\t"
      val date = arr(17).substring(0, 10)
        val provinceId = arr(23)
        val url = arr(1)
        val guid = arr(5)
        (date + "_" + provinceId, (guid, url))
      }.filter(line => line._2._2.length > 5) // length(url) > 5
      // 把数据通过key【date + "_" + provinceId】【date_provinceId_guid】???进行分区
      // 分区字段要注意,和后面groupByKey/reduceByKey的key对应
      // 相同key【date + "_" + provinceId】的数据会存储在同一个节点上
      .partitionBy(new HashPartitioner(10))
      .persist(StorageLevel.MEMORY_ONLY)

    linesRDD.map(line => (line._1 + "_" + line._2._1, 1)) // (date_provinceId_guid, 1)
      .reduceByKey(_ + _) // (date_provinceId_guid, pv)
      .map { i =>
      val arr = i._1.split("_")
      val dateProvinceId = arr(0) + "_" + arr(1)
      val guidCount = 1
      val pv = i._2
      (dateProvinceId, (guidCount, pv))
    }.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .map { i =>
        val arr = i._1.split("_")
        val date = if (arr.length >= 1) arr(0) else "Nil"
        val provinceId = if (arr.length >= 2) arr(1) else "Nil"
        val pv = i._2._2
        val uv = i._2._1
        (date, provinceId, pv, uv)
      }.foreach(println)
    // .saveAsTextFile("hdfs://master:8020/user/root/VisitCount")

    linesRDD.unpersist() // 释放缓存
    sc.stop()
  }
}

Spark优化

数据分区

在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。

Mapreduce框架的性能开支主要在哪里?

  • IO:大量读写文件
  • 网络传输:压缩(大文件变小文件从而减少网络传输,但是增加CPU计算负载)。

网络传输主要在shuffle阶段,shuffle的原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。

Spark把RDD进行分片(分区),放在集群上并行计算。同一个RDD,分片100个,集群有10个节点,平均一个节点10个分区。

对于sum型的计算:先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以此时shuffle过程中只需要传输分区sum,网络开销很小。

但对于Join类的计算,需要把数据本身进行shuffle,网络开销很大。

Spark是如何优化这个问题的?Spark把key-value RDD通过key的hashcode进行分区,且保证相同的key存储在同一个节点上。key的分布不均衡决定了有的分区大,有的分区小。这样对该RDD进行key聚合时,不需要shuffle过程。

Join操作时,通常把用的频繁的大表事先进行分区,如:

val linesRDD = sc.textFile("D:\\Java\\idea\\IdeaProjects\\spark-study\\spark-sql\\src\\main\\datasource\\2015082818")
      .cache() // cache和persist的区别
      .filter(line => line.length > 0)

      .map { line =>
        val arr = line.split("\t") // 需要转义吗?"\t"还是"\\t"
      val date = arr(17).substring(0, 10)
        val provinceId = arr(23)
        val url = arr(1)
        val guid = arr(5)
        (date + "_" + provinceId, (guid, url))
      }.filter(line => line._2._2.length > 5) // length(url) > 5
      // 把数据通过key【date + "_" + provinceId】【date_provinceId_guid】???进行分区
      // 分区字段要注意,和后面groupByKey/reduceByKey的key对应
      // 相同key【date + "_" + provinceId】的数据会存储在同一个节点上
      .partitionBy(new HashPartitioner(10))
      .persist(StorageLevel.MEMORY_ONLY)

进行join时,仅需要对另一个小数据量的表进行shuffle过程。

能够从数据分区中获益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()以及lookup(),基于key的操作都会获益。

而对于诸如join()这样的二元操作,预先进行数据分区会让其中至少一个RDD(使用已知分区器的那个RDD)不发生数据shuffle。如果两个RDD使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个RDD是通过mapValues()从另一个RDD中创建出来的,这两个RDD就会拥有相同的键和分区方式),跨节点的数据shuffle就不会发生了。

参数优化

进行spark-submit时,会给每个作业分配资源。处理的数据量越大,需要分配的资源越多。


本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


微信公众号「padluo」,分享数据科学家的自我修养,既然遇见,不如一起成长。

数据分析


读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群

Sort:  

@padluo, 佩服!佩服!

@padluo, steemit上我觉得只需要静静读你的贴就值了~~~ img

BTW, @cn-naughty.boy 淘气包,听说你给我准备了巧克力...