点击查看【bilibili】


1 课程目标

1.1 目标1:(初级)熟练使用scala编写Spark程序

1.2 目标2:(中级)动手编写一个简易Spark通信框架

1.3 目标3:(高级)为阅读Spark内核源码做准备

2 Scala概述

2.1 什么是Scala

函数式编程:
函数式编程是一种编程思想,主要的思想把运算过程尽量写成一系列的函数调用。
Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各种特性。Scala运行于Java平台(Java虚拟机),并兼容现有的Java程序。
scala是对java的进一步封装,基于java来开发的。
part1编程基础 - 图2

2.2 为什么要学Scala

  1. 优雅:这是框架设计师第一个要考虑的问题,框架的用户是应用开发程序员,API是否优雅直接影响用户体验。
  2. 速度快:Scala语言表达能力强,一行代码抵得上Java多行,开发速度快;Scala是静态编译的,所以和JRuby,Groovy比起来速度会快很多。
  3. 能融合到Hadoop生态圈:Hadoop现在是大数据事实标准,Spark并不是要取代Hadoop,而是要完善Hadoop生态。JVM语言大部分可能会想到Java,但Java做出来的API太丑,或者想实现一个优雅的API太费劲。

part1编程基础 - 图3
4,Spark的开发语言,掌握好scala,就能更轻松的学好spark。
python,java,scala,R
原生的API,速度更快。
spark函数式编程初体验

3 Scala开发环境

3.1 安装JDK

因为Scala是运行在JVM平台上的,所以安装Scala之前要安装JDK
使用 # java -version 来验证
确保已安装jdk1.8+

3.2 安装Scala

3.2.1 Windows安装Scala编译器

访问Scala官网http://scala-lang.org/download/2.11.8.html 下载Scala编译器安装包,目前最新版本是2.12.x,但是目前大多数的框架都是用2.11.x编写开发的,Spark2.x使用的就是2.11.x,所以本课程使用2.11.x版本
安装方式一:下载scala-2.11.8.msi后双击打开一路next运行安装。
安装方式二:直接使用免安装版的,解压即可。
part1编程基础 - 图4
安装完成之后,配置环境变量SCALA_HOME和PATH:
part1编程基础 - 图5 part1编程基础 - 图6
可以在cmd窗口下验证: 输入scala -version 查看scala版本
part1编程基础 - 图7
输入scala 可进入scala shell交互模式
part1编程基础 - 图8
输入:q 或:quit退出scala交互命令行。
该交互模式,有一个高大上的名称:REPL
Read Evaluate Print Loop
(读取-求值-打印-循环)

3.2.2 Linux中安装Scala编译器

下载Scala地址https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
1,上传并解压Scala到指定目录
# tar -zxvf scala-2.11.8.tgz -C /usr/local/
2,创建一个软连接(可选项)
# ln -s /usr/local/scala-2.11.8 /usr/local/scala
3,配置环境变量,将scala加入到PATH中
# vi /etc/profile

export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin
  1. public class test{
  2. public static void main(String [] args){
  3. System.out.println(“hello java”);
  4. }
  5. }

3.3 Linux下运行第一个scala程序

3.3.1 代码编写:

vim ScalaTest

object ScalaTest {
def main(args: Array[String]) :Unit={
println(“hello scala”)
}
}

3.3.2 代码编译:

scalac ScalaTest

3.3.3 代码运行:

scala ScalaTest
part1编程基础 - 图9

运行流程(类似java):
先编译(scalac),再执行(scala)
scala中,不强制要求源文件和类名一致。

3.4 IDEA安装

目前Scala的开发工具主要有两种:Eclipse和IDEA,这两个开发工具都有相应的Scala插件,如果使用Eclipse,直接到官网下载即可
http://scala-ide.org/download/sdk.html 不推荐使用该种方式

IDEA的Scala插件更优秀,有逼格的Scala攻城狮都选择IDEA(只需一次,就会爱上她)
下载地址:http://www.jetbrains.com/idea/download/
下载社区免费版,点击下一步安装即可,安装时如果有网络可以选择在线安装Scala插件。这里我们使用离线安装Scala插件:

  1. 安装IDEA,点击下一步即可。由于我们离线安装插件,所以点击Skip All and Set Default
  2. 下载IEDA的scala插件,地址http://plugins.jetbrains.com/?idea_ce

part1编程基础 - 图10

3.5 Scala插件离线安装

  1. 安装Scala插件:Configure -> Plugins -> Install plugin from disk -> 选择Scala插件 -> OK -> 重启IDEA

part1编程基础 - 图11
part1编程基础 - 图12
part1编程基础 - 图13

选择好插件后,重启IDEA:
part1编程基础 - 图14

3.6 IDEA 创建Scala工程

安装完成后, 双击打开IDEA, 创建一个新的项目(Create New Project)
part1编程基础 - 图15
选中左侧的Scala -> IDEA -> Next
part1编程基础 - 图16
输入项目名称 -> 点击Finish完成即可
part1编程基础 - 图17

3.7 IDEA常用配置:

Ctrl+Alt + s 进入到settings配置页面。
修改字体:
part1编程基础 - 图18
修改字符集:
part1编程基础 - 图19

如果每次启动IDEA,直接进入到项目页面,没有引导页面,修改配置如下:
part1编程基础 - 图20

修改安装目录的idea.properties文件

# idea.config.path=${user.home}/.IdeaIC/config

idea.config.path=d:/profiles/develop/IDEACache/config

#——————————————————————————————————-
# Uncomment this option if you want to customize path to IDE system folder. Make sure you’re using forward slashes.
#——————————————————————————————————-
# idea.system.path=${user.home}/.IdeaIC/system

idea.system.path=d:/profiles/develop/IDEACache/system

maven修改本地仓库地址:
part1编程基础 - 图21

4 Scala基础

4.1 函数式编程初体验Spark-Shell之WordCount

part1编程基础 - 图22
Q1: 对上述文件内容使用Spark进行单词个数统计?
part1编程基础 - 图23
Q2: 对上述输出结果进行降序 ?
part1编程基础 - 图24
上述代码,暂不需要练习

4.2 常用类型

Scala和java一样,
有7种数值类型:Byte、Char、Short、Int、Long、Float和Double(没有基本类型和包装类型的区分)
2种非数值类型: Boolean 和 Unit
注意:Unit表示无值,相当于java中的void。用作不返回任何结果的类型。Unit对象只有一个实例值,写成()。(小括号)
String是属于引用类型
part1编程基础 - 图25

4.3 声明变量

String a = “”
ab=””
val var
定义变量使用var 或者 val关键字
语法: var|val 变量名称 (: 数据类型) = 变量值
使用val定义的变量是不可变的,相当于java中用final修饰的变量
使用var定义的变量是可变的,在Scala中推荐使用val
Scala编译器会自动推断变量的类型,必要的时候可以指定类型
lazy val 变量名
lazy关键字修饰的变量,是一个惰性变量,实现延迟加载(懒加载),在使用的时候才会加载。
part1编程基础 - 图26
lazy关键字不能修饰 var类型的变量
part1编程基础 - 图27

object VariableTest {
def main(args: Array[String]) {
// 使用val定义的变量值是不可变的,相当于java里用final修饰的变量
//变量名在前,类型在后
val name: String = “nvshen”
// 使用var定义的变量是可变的,在Scala中鼓励使用val
var age = 18
//Scala编译器会自动推断变量的类型,可以省略变量类型
val str = “world”
// 声明多个变量
var age,fv = 18
val str = “world”;val str2= “hello”
}
}

可以同时声明多个变量,可以使用通配符声明变量:
java中的通配符是*,scala中的通配符是
定义一个变量,必须赋予初始值,如果没有初始值,可以使用
占位符代替,但是变量必须指定类型。而且占位符变量不能定义在main方法内部。

4.4 条件表达式

条件表达式的值可以赋值给一个变量
支持混合类型的表达式。

Scala的条件表达式比较简洁,例如:

| object ConditionTest {
def main(args: Array[String]) {
val x = 1
//判断x的值,将结果赋给y
val y = if (x > 0) 1 else -1
//打印y的值
println
(y)
//支持混合类型表达式
val z = if (x > 1) 1 else “error”
//打印z的值
println
(z)
混合类型会返回父类类型。
//如果缺失else,相当于if (x > 2) 1 else ()
val m = if (x > 2) 1
println(m)

  1. _//在scala中每个表达式都有返回值,scala中有个Unit类,写做(),相当于Java中的void<br /> _**val **n = **if **(x > 2) 1 **else **()<br /> _println_(n)
  2. _//if和else if<br /> _**val **k = **if **(x < 0) 0 **else if **(x >= 1) 1 **else **-1<br /> _println_(k)<br /> }<br />} |

| —- |

表达式都是有返回值的。
混合类型中,返回值类型是多个分支的返回值类型的父类(超类)
每一个条件语句,都有返回值,默认返回值是最后一行的值,如果没有内容或者内容为空,那么返回值类型就是Unit,值是()。
多条件分支中,缺少了某一个分支,默认会返回Unit。
条件表达式,可以使用{},写在一行时,或者只有一行内容时,可以省略{}。
if(x>2) 3 else 5

4.5 块表达式

{} 称为块表达式,块表达式中可以包含一系列的表达式,最后一个表达式的值就是块的值。

object BlockExpressionTest {
def main(args: Array[String]) {
val x = 0
//在scala中{}中可包含一系列表达式,块中最后一个表达式的值就是块的值
//下面就是一个块表达式
val result = {
if (x < 0){
-1
} else if(x >= 1) {
1
} else {
“error”
}
}
// result的值就是块表达式的结果
println
(result)
}
}

4.6 循环

在scala中有for循环和while循环, for循环最常用

4.6.1 for循环

for(i=0;ifor(Int I :arr){}
语法结构:for (i <- 表达式/数组/集合)
java for循环方式:
// for(i=1;i<10;i++) // 传统for循环
// for(Int I :arr) // 增强for循环

| object ForTest {
def main(args: Array[String]) {
//for(i <- 数组)
val arr = Array(“a”, “b”, “c”)
// 遍历打印数组中的每个元素 for (i <- arr) // 类似Java中的增强for println(i)
// 通过角标获取数组中的元素
val index = Array(0,1,2)
// 遍历打印数组中的每个元素 for (i <- index) // 类似Java中的传统for println(arr(i)) // 获取元素的方式是(),java中是[]


//for(i <- 表达式),表达式1 to 10返回一个Range(区间)
//每次循环将区间中的一个值赋给i
for (i <- 1 to 6)
println(i)
println(arr(i)) // 报错,如果不加{},只会把for后面的一行当做循环的内容。

for (i <- 1 to 6){
println(i)
println(arr(i))
}

  1. **for**(i <- 1 until 6) { _// 0 until 6 => 会生成一个范围集合Range(0,1,2,3,4,5)<br /> println_(_array_(i))<br /> }_ // 打印数组中的偶数元素_<br />_// 在for循环中,通过添加守卫来实现输出满足特定条件的数据<br /> _**for**(e <- _arr _**if **e % 2 == 0) { _// for表达式中可以增加守卫<br /> println_(e)<br /> }
  2. _//高级for循环<br /> //每个生成器都可以带一个条件<br /> _**for**(i <- 1 to 3; j <- 1 to 3 **if **i != j){ <br /> _print_((10 * i + j) + **" "**)<br />}
  3. _//for推导式:如果for循环的循环体以yield开始,则该循环会构建出一个集合_<br />_ //每次迭代生成集合中的一个值<br /> _**val **v = **for **(i <- 1 to 10) **yield **i * 10<br /> _println_(v)<br /> }<br />} |

| —- |

两个生成器: to until
1 to 10 生成 Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 左闭右闭区间
1 until 10 生成 Range(1, 2, 3, 4, 5, 6, 7, 8, 9) 左闭右开区间

4.6.2 while循环

Scala 的 while 循环和其它语言如 Java 功能一样,它含有一个条件,和一个循环体,只有条件满足,就一直执行循环体的代码。
语法结构:while(condition){ 循环体内容 }

var i = 0while(i<5) {
println(i)
i += 1 // i = i + 1}

Scala 也有 do-while 循环,它和 while 循环类似,只是检查条件是否满足在循环体执行之后检查。

i = 0// while 直接判断while(i>0 && i<=5) {
println(i)
i += 1}
i = 0// do while 先执行一次循环,再进行判断do{
println(i)
i += 1}while(i>0 && i<=5)

循环也有返回值,只不过都是Unit

val res = while(i>0 && i<=5) {
println(i)
i += 1}
println(s”res = $res)

插值法

4.7 函数式编程再体验:

map:对集合或者数组中的每一个元素进行操作,该方法接收一个函数,具体的业务逻辑是自己定义的。
filter: 过滤,过滤出满足条件的元素。
part1编程基础 - 图28

4.8 调用方法(运算符重载为方法)

Scala中的+ - * / %等操作符的作用与Java一样。只是有一点特别的:
这些操作符实际上是方法。操作符被重载为方法。
例如:
a + b
是如下方法调用的简写:

  1. +(b)

a 方法 b可以写成 a.方法(b)

5 方法和函数

方法用于做什么:
业务逻辑中,一段逻辑代码,被使用超过1次,就可以把这段代码抽取出方法。为了代码的复用性。
方法:一段业务逻辑的综合。

5.1 定义方法

java中的方法:
public int add(int a,int b){
return a + b;
}
def methodName ([list of parameters]) : [return type] = {}
part1编程基础 - 图29

| /*
方法的定义及调用

定义方法的格式为:
def methodName ([list of parameters]) : [return type] = {}
如果不使用等号和方法体,则隐式声明抽象(abstract)方法。
*/
object ScalaMethod extends App{

  1. _// 定义个sum方法, 该方法有2个参数, 参数类型为整型, 方法的返回值为整型<br /> _**def **sum(a:Int, b: Int): Int = {<br /> a + b<br /> }<br /> _// 定义有可变参数的方法,_<br /> **def **sumAll(b: Int*): Int = {<br /> **var **v = 0<br /> **for **(i<- b){<br /> v += i<br /> }<br /> v _// 返回值_ }<br /><br /> _// 调用<br /> _**val **_result1 _= _sum_(1, 5)<br /> _println_(_result1_)<br />println(sumAll(1,11,13))
  2. _// 该方法没有任何参数, 也没有返回值<br /> _**def **sayHello1 = _println_(**"Say BB1"**)<br /> **def **sayHello2() = _println_(**"Say BB2"**)<br /> _sayHello1 // 如果方法没有() 调用时不能加()<br /> sayHello2 // 可是省略(), 也可以不省略_} |

| —- |

总结:
调用空参方法时,可以省略参数列表的(),但是如果定义空参方法时没有添加参数列表(),则在调用时,不能加()。
方法不能做为最终的表达式单独存在,必须显示调用。(无参的方法调用时,可以省略())
方法的返回值类型,可以省略,编译器会自动推断出来,但是两种情况下必须指定
(1,递归调用的方法,2,有return返回值的方法,必须显示声明返回值类型。)
对于递归方法,必须指定返回类型
对于有return 关键字的,必须指定返回类型
def rec(n:Int):Int= if (n==0) 0 else nrec(n-1)
def meth1(x:Int,y:Double):Double = return x
y
part1编程基础 - 图30

5.2 定义函数

val| var 函数名称=(函数的参数列表) => 函数体
val | var 函数名称:(输入参数类型)=> 返回值类型 = (参数的引用)=> 函数体
part1编程基础 - 图31

函数可以作为最终的表达式存在,返回的内容就是函数的签名
签名(函数的名称,函数的参数,函数的返回值类型)
这种定义方式不需要指定返回值类型,编译器会自动推断

第二种定义方式:
复杂全面的定义
val | var 函数名称:(输入参数类型)=> 返回值类型 = (参数的引用)=> 函数体
part1编程基础 - 图32
定义一个无参的函数
不同于方法,没有参数的函数定义,也必须加()
val f2:()=>Unit =() => println(123) val f2 =() => println(123) 返回值类型为Unit
val f2:()=>Int =() => 123 val f2=() => 123 返回值类型为Int

5.3 方法和函数的区别

在函数式编程语言中,函数是“头等公民”,它可以像任何其他数据类型一样被传递和操作
函数和变量,类,对象,一个级别,方法,要归属于类或者对象
区别和联系:
1,方法使用def关键字定义 ,函数使用 => 定义
2,方法不能作为单独的表达式存在,函数可以作为最终的表达式,返回函数签名
3,都可以显示的使用参数来增加参数列表
4,函数可以作为参数,传递给方法

案例:首先定义一个方法,再定义一个函数,然后将函数传递到方法里面
part1编程基础 - 图33

| object MethodAndFunctionTest {
//定义一个方法
//方法m2参数要求是一个函数,函数的参数必须是两个Int类型

//返回值类型也是Int类型
def m1(f: (Int, Int) => Int) : Int = {
f(2, 6)
}

//定义一个函数f1,参数是两个Int类型,返回值是一个Int类型
val f1 = (x: Int, y: Int) => x + y
//再定义一个函数f2
val f2 = (m: Int, n: Int) => m * n

//main方法
def main(args: Array[String]) {

  1. _//调用m1方法,并传入f1函数<br /> _**val **r1 = _m1_(_f1_)<br /> _println_(r1)
  2. _//调用m1方法,并传入f2函数<br /> _**val **r2 = _m1_(_f2_)<br /> _println_(r2)<br /> }<br />} |

| —- |

| // 定义一个普通方法def max(x:Int,y:Int) = if(x>y)x else y
// 定义一个方法,参数是一个函数,参数只需要函数签名,在调用的时候具体再传入函数体def max1(f:(Int,Int)=>Int) = f(20,10)def max2(f:(Int,Int) => Int,x:Int,y:Int)= f(x,y)
// 定义一个方法,方法返回值是函数def max3()= (x:Int,y:Int)=> if (x>y) x else ydef main(args: Array[String]): Unit = {

println(max(10,20))
println(max1((x:Int,y:Int)=>if(x>y) x else y))
println(max2((x:Int,y:Int)=>if(x>y) x else y,10,20))
println(max3()(10,20))
} | | —- |

5.4 将方法转换成函数(神奇的下划线)

part1编程基础 - 图34

一个集合类型:
元组Tuple,是一组数据的集合,类型可以是任意的
(1,2.0,”xx”)
想要获取元组的值
通过下划线,加上下标,角标从1开始

6 元组Tuple

与数组或列表不同,元组可以容纳不同类型的对象,但它们也是不可变的。
元组是不同类型元素的集合

6.1 创建元组

定义元组时,使用小括号将多个元素括起来,元素之间使用逗号分隔,元素的类型可以不同,元素的个数任意多个(不超过22个)
注意:元组没有可变和不可变之分,都是不可变的。

val t = (12.3, 1000, “spark”)
val t4 = new Tuple4(1,2.0,””,3) // 必须4个元素

6.2 获取元组中的值

获取元组的值使用下标获取,但是元组的下标时从1开始的,而不是0
part1编程基础 - 图35

6.3 将对偶的集合转换成映射

part1编程基础 - 图36

6.4 拉链操作

zip命令可以将多个值绑定在一起,生成元组

val name=Array(“xx1”,”xx2”,”xx3”,”xx4”)
val values=Array(1,2,3)
name.zip(values)

part1编程基础 - 图37
多个zip # name zip values zip values.map(_*10)
注意:如果两个数组的元素个数不一致,拉链操作后生成的数组的长度为较小的那个数组的元素个数

6.5 元素交换

可以使用 Tuple.swap 方法来交换对偶元组的元素。
part1编程基础 - 图38

| // 定义元组var t = (1, “hello”, true)// 或者val tuple3 = new Tuple3(1, “hello”, true)
// 访问tuple中的元素println(t.2) // 访问元组总的第二个元素

// 迭代元组t.productIterator.foreach(_println)
// 对偶元组val tuple2 = (1, 3)// 交换元组的元素位置, tuple2没有变化, 生成了新的元组val swap = tuple2.swap | | —- |

元组类型Tuple1,Tuple2,Tuple3等等。目前在Scala中只能有22个上限,如果需要更多个元素,那么可以使用集合而不是元组。

7 案例wordCount

| // 原始数据
val arr = ArrayString

  1. _// 一行完全可以实现,但是不推荐<br />// arr.map(x=>x.split(" ")).flatten.map(x=>(x,1)).groupBy(x=>x._1).map(x=>(x._1,x._2.length)).toList.sortBy(x=> -x._2)<br />// .foreach(x=>println(x))
  2. // 目标: 做wordcount
  3. // 数据切分<br />// val lines: Array[Array[String]] = arr.map(x=>x.split(" "))<br /> // 数据压平<br />// val flatLines: Array[String] = lines.flatten<br /> // 有一个替代的方法,flatMap 先map 再flatten<br /> _**val **flatLines = arr.flatMap(x=>x.split(**" "**))<br /> _// 和1 组装起来<br /> _**val **wordsWithOne: Array[(String, Int)] = flatLines.map(w=>(w,1))<br /> _// 按照单词分组 就是元组的第一个元素<br /> _**val **grouped: Map[String, Array[(String, Int)]] = wordsWithOne.groupBy(t=>t._1)<br /> _// 统计单词出现的次数 正常情况下是求和,但是这里是一个特例,所以可以使用长度来判断次数<br /> _**val **result: Map[String, Int] = grouped.map(t=>(t._1,t._2.length))<br /> _// 排序<br />// map不能直接排序,那么就转变成list,然后调用List集合上的sortBy方法<br /> _**val **sortedResult: List[(String, Int)] = result.toList.sortBy(t=> -t._2)<br />_// sortedResult.foreach(x=>println(x))<br /> _sortedResult.foreach(_println_) |

| —- |