DataFrame是一個分布式數據集,可以理解為關系型數據庫一張表,由字段和字段類型、字段值按列組織,且支持四種語言,在Scala API中可以理解為: FataFrame=Dataset[ROW]
注:DataFrame產生于V1.3之后,在V1.3前為SchemaRDD,在V1.6以后又添加了Dataset
二、DataFrame vs RDD 差異:
<br> <span style="font-size: 18px;"> 概念 : </span> <span style="font-size: 18px;"> 兩個都是分布式容器,DF理解是一個表格除了RDD數據以外還有Schema,也支持復雜數據類型(map..) <br> API : </span> <span style="font-size: 18px;"> DataFrame提供的API比RDD豐富 支持map filter flatMap ..... <br> 數據結構:RDD知道類型沒有結構, DF提供Schema信息 有利于優化,性能上好 <br> 底層 :基于運行環境不一樣,RDD開發的Java/Scala API運行底層環境JVM, <br> </span> <span style="font-size: 18px;"> DF在SparkSQL中轉換成邏輯執行計劃(locaical Plan)和物理執行計劃(Physical Plan)中間自身優化功能,性能差異大 <br> </span> |
三、json文件操作
[hadoop@hadoop001 bin]$./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.34-bin.jar -- 讀取json文件 scala>val df = spark.read.json("file:///home/hadoop/data/people.json") 18/09/02 11:47:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] -- 打印schema信息 |
scala> df.printSchema <span style="font-size: 18px;"> root <br> |-- age: long (nullable = true) -- 字段 類型 允許為空 <br> |-- name: string (nullable = true) <br> </span> -- 打印字段內容 |
scala> df.show <span style="font-size: 18px;"> +----+-------+ <br> | age| name| <br> +----+-------+ <br> |null|Michael| <br> | 30| Andy| <br> | 19| Justin| <br> +----+-------+ <br> </span> -- 打印查詢字段
<span style="font-size: 18px;"> |
+-------+ <br> | name| <br> +-------+ <br> |Michael| <br> | Andy| <br> | Justin| <br> +-------+ <br> </span> -- 單引號,存在隱式轉換 |
scala> df.select('name).show <span style="font-size: 18px;"> +-------+ <br> | name| <br> +-------+ <br> |Michael| <br> | Andy| <br> | Justin| <br> +-------+ <br> </span> -- 雙引號隱式轉換不識別 scala> df.select("name).show <console>:1: error: unclosed string literal df.select("name).show ^ -- 年齡計算,NULL無法計算 |
scala> df.select($"name",$"age" + 1).show <span style="font-size: 18px;"> +-------+---------+ <br> | name|(age + 1)| <br> +-------+---------+ <br> |Michael| null| <br> | Andy| 31| <br> | Justin| 20| <br> +-------+---------+ <br> </span> -- 年齡過濾 |
scala> df.filter($"age" > 21).show <span style="font-size: 18px;"> +---+----+ <br> |age|name| <br> +---+----+ <br> | 30|Andy| <br> +---+----+ <br> </span> -- 年齡分組 匯總 |
scala> df.groupBy("age").count.show <span style="font-size: 18px;"> +----+-----+ <br> | age|count| <br> +----+-----+ <br> | 19| 1| <br> |null| 1| <br> | 30| 1| <br> +----+-----+ <br> </span> -- 創建一個臨時視圖 scala> df.createOrReplaceTempView("people") |
scala>spark.sql("select * from people").show <span style="font-size: 18px;"> +----+-------+ <br> | age| name| <br> +----+-------+ <br> |null|Michael| <br> | 30| Andy| <br> | 19| Justin| <br> +----+-------+ <br> </span> |
四、DataFrame對象上Action操作
-- 定義case class 用來創建Schema case class Student(id:String,name:String,phone:String,Email:String) -- RDD與DF反射方式實現 val students = sc.textFile("file:///home/hadoop/data/student.data").map(_.split("\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF() -- 打印DF信息 students.printSchema -- show(numRows: Int, truncate: Boolean) -- numRows截取前20行和truncate讀取前20字符串 -- students.show(5,false) 讀取前五行和所有字符串 scala> students.show |
<span style="font-size: 18px;"> +---+--------+--------------+--------------------+ <br> | id| name| phone| Email| <br> +---+--------+--------------+--------------------+ <br> | 1| Burke|1-300-746-8446|ullamcorper.velit...| <br> | 2| Kamal|1-668-571-5046|pede.Suspendisse@...| <br> | 3| Olga|1-956-311-1686|Aenean.eget.metus...| <br> | 4| Belle|1-246-894-6340|vitae.aliquet.nec...| <br> | 5| Trevor|1-300-527-4967|dapibus.id@acturp...| <br> | 6| Laurel|1-691-379-9921|adipiscing@consec...| <br> | 7| Sara|1-608-140-1995|Donec.nibh@enimEt...| <br> | 8| Kaseem|1-881-586-2689|cursus.et.magna@e...| <br> | 9| Lev|1-916-367-5608|Vivamus.nisi@ipsu...| <br> | 10| Maya|1-271-683-2698|accumsan.convalli...| <br> | 11| Emi|1-467-270-1337|est@nunc.com|.......| <br> | 12| Caleb|1-683-212-0896|Suspendisse@Quisq...| <br> | 13|Florence|1-603-575-2444|sit.amet.dapibus@...| <br> | 14| Anika|1-856-828-7883|euismod@ligulaeli...| <br> | 15| Tarik|1-398-171-2268|turpis@felisorci.com| <br> | 16| Amena|1-878-250-3129|lorem.luctus.ut@s...| <br> | 17| Blossom|1-154-406-9596|Nunc.commodo.auct...| <br> | 18| Guy|1-869-521-3230|senectus.et.netus...| <br> | 19| Malachi|1-608-637-2772|Proin.mi.Aliquam@...| <br> | 20| Edward|1-711-710-6552|lectus@aliquetlib...| <br> +---+--------+--------------+--------------------+ <br> only showing top 20 rows <br> </span> -- students.head(5) 返回前幾行數據 |
<span style="font-size: 18px;"> scala> students.head(5).foreach(println) <br> [1,Burke,1-300-746-8446,ullamcorper.velit.in@ametnullaDonec.co.uk] <br> [2,Kamal,1-668-571-5046,pede.Suspendisse@interdumenim.edu] <br> [3,Olga,1-956-311-1686,Aenean.eget.metus@dictumcursusNunc.edu] <br> [4,Belle,1-246-894-6340,vitae.aliquet.nec@neque.co.uk] <br> [5,Trevor,1-300-527-4967,dapibus.id@acturpisegestas.net] <br> </span> -- 查詢具體字段 |
<span style="font-size: 18px;"> scala> students.select("id","name").show(5) <br> +---+------+ <br> | id| name| <br> +---+------+ <br> | 1| Burke| <br> | 2| Kamal| <br> | 3| Olga| <br> | 4| Belle| <br> | 5|Trevor| <br> +---+------+ <br> </span> -- 修改字段取別名 scala> students.select($"name".as("new_name")).show(5) |
<span style="font-size: 18px;"> +--------+ <br> |new_name| <br> +--------+ <br> | Burke| <br> | Kamal| <br> | Olga| <br> | Belle| <br> | Trevor| <br> +--------+ <br> </span> --查詢id大于五 scala> students.filter("id>5").show(5) |
<span style="font-size: 18px;"> +---+------+--------------+--------------------+ <br> | id| name| phone| Email| <br> +---+------+--------------+--------------------+ <br> | 6|Laurel|1-691-379-9921|adipiscing@consec...| <br> | 7| Sara|1-608-140-1995|Donec.nibh@enimEt...| <br> | 8|Kaseem|1-881-586-2689|cursus.et.magna@e...| <br> | 9| Lev|1-916-367-5608|Vivamus.nisi@ipsu...| <br> | 10| Maya|1-271-683-2698|accumsan.convalli...| <br> +---+------+--------------+--------------------+ <br> </span> -- 查詢名稱為空或者名稱為NULL(filter=where) scala> students.filter("name=''or name='NULL'").show(false) |
<span style="font-size: 18px;"> +---+----+--------------+--------------------------+ <br> |id |name|phone |Email | <br> +---+----+--------------+--------------------------+ <br> |21 | |1-711-710-6552|lectus@aliquetlibero.co.uk| <br> |22 | |1-711-710-6552|lectus@aliquetlibero.co.uk| <br> |23 |NULL|1-711-710-6552|lectus@aliquetlibero.co.uk| <br> +---+----+--------------+--------------------------+ <br> </span> -- 查詢ID大于5且名稱模糊查詢 scala> students.filter("id>5 and name like 'M%'").show(5) |
<span style="font-size: 18px;"> +---+-------+--------------+--------------------+ <br> | id| name| phone| Email| <br> +---+-------+--------------+--------------------+ <br> | 10| Maya|1-271-683-2698|accumsan.convalli...| <br> | 19|Malachi|1-608-637-2772|Proin.mi.Aliquam@...| <br> +---+-------+--------------+--------------------+ <br> </span> -- 按照名稱升序排序且不等于空 scala> students.sort($"name").select("id","name").filter("name <> ''").show(3) |
<span style="font-size: 18px;"> +---+-----+ <br> | id| name| <br> +---+-----+ <br> | 16|Amena| <br> | 14|Anika| <br> | 4|Belle| <br> +---+-----+ <br> </span> -- 按照名稱倒敘排序(sort = orderBy) scala> students.sort($"name".desc).select("name").show(5) |
<span style="font-size: 18px;"> +------+ <br> | name| <br> +------+ <br> |Trevor| <br> | Tarik| <br> | Sara| <br> | Olga| <br> | NULL| <br> +------+ <br> </span> -- 年齡分組 匯總 scala> students.groupBy("age").count().show |
<span style="font-size: 18px;"> +----+-----+ <br> | age|count| <br> +----+-----+ <br> | 19| 1| <br> |null| 1| <br> | 30| 1| <br> +----+-----+ <br> </span> -- 聚合函數使用 scala> students.agg("id" -> "max", "id" -> "sum").show(false) |
<span style="font-size: 18px;"> +-------+-------+ <br> |max(id)|sum(id)| <br> +-------+-------+ <br> |9 |276.0 | <br> +-------+-------+ <br> </span> -- join操作,using模式seq指定多個字段 <span style="font-size: 18px;"> students.join(students2, Seq("id", "name"), "inner") <br> </span> -- DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi類型 -- 指定類型,指定join的類型 |
<span style="font-size: 18px;"> students.join(students2 , students("id" ) === students2( "t1_id"), "inner") <br> </span> |
五、DataFrame API實現文件操作
1.maven依賴下載
<span style="font-size: 20px;"> <spark.version>2.3.1</spark.version> <br> <br> <!-- 添加Spark Core的dependency --> <br> <dependency> <br> <groupId>org.apache.spark</groupId> <br> <artifactId>spark-core_2.11</artifactId> <br> <version>${spark.version}</version> <br> </dependency> <br> <br> <!-- 添加Spark SQL的dependency --> <br> <dependency> <br> <groupId>org.apache.spark</groupId> <br> <artifactId>spark-sql_2.11</artifactId> <br> <version>${spark.version}</version> <br> </dependency> <br> </span> |
2、IDEA實現方式:
<span style="font-size: 20px;"> package com.zrc.ruozedata.sparkSQL <br> import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} <br> import org.apache.spark.sql.{Row, SparkSession} <br> <br> object SparkSQL001 extends App { <br> /* <br> * RDD與DataFrame反射方式實現(一) <br> * 創建RDD --> DataFrema <br> * 利用case class創建Schema,來解析輸出文本每一行信息 <br> */ <br> val spark = SparkSession.builder() <br> .master("local[2]") <br> .appName("SparkSQL001") <br> .getOrCreate() // 操作hive添加 <br> val infos = spark.sparkContext.textFile("file:///F:/infos.txt") <br> <br> /* <br> import spark.implicits._ <br> val infoDF = infos.map(_.split(",")).map(x=>Info(x(0).toInt,x(1),x(2).toInt)).toDF() <br> infoDF.show() <br> */ <br> <br> /* <br> * RDD與DataFrame使用StructType方式實現(二) <br> * StructType構造了StructField方法傳入name和dataType <br> * 每一個字段就是為一個StructField <br> * Schema和RDD通過createDataFrame方法作用起來 <br> */ <br> // 注意通過ROW獲取的需要轉換對應類型 <br> val infoss = infos.map(_.split(",")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt)) <br> val fields = StructType( <br> Array( <br> StructField("id",IntegerType,true), <br> StructField("name",StringType,true), <br> StructField("age",IntegerType,true) <br> ) <br> ) <br> val schema = StructType(fields) <br> val infoDF = spark.createDataFrame(infoss,schema) <br> infoDF.show() <br> spark.stop() <br> } <br> // case class Info (id:Int,name:String,age:Int) </span> |