從2015年開始,國內(nèi)大數(shù)據(jù)市場(chǎng)繼續(xù)保持高速的發(fā)展態(tài)勢(shì),作者在與地方政府、證券金融公司的項(xiàng)目合作中發(fā)現(xiàn),他們對(duì)大數(shù)據(jù)技術(shù)很感興趣,并希望從大數(shù)據(jù)技術(shù)、大數(shù)據(jù)采集、管理、分析以及可視化等方面得到指導(dǎo)和應(yīng)用幫助。因此編寫了這本大數(shù)據(jù)技術(shù)的快速入門書。 本書共12章,以Hadoop和Spark框架為線索,比較地介紹了Hadoop技術(shù)、Spark技術(shù)、大數(shù)據(jù)存儲(chǔ)、大數(shù)據(jù)訪問、大數(shù)據(jù)采集、大數(shù)據(jù)管理、大數(shù)據(jù)分析等內(nèi)容。還給出兩個(gè)案例:環(huán)保大數(shù)據(jù)和公安大數(shù)據(jù),供讀者參考。 本書適合大數(shù)據(jù)技術(shù)初學(xué)者,政府、金融機(jī)構(gòu)的大數(shù)據(jù)應(yīng)用決策和技術(shù)人員,IT經(jīng)理,CTO,CIO等快速學(xué)數(shù)據(jù)技術(shù)。本書也可以作為高等院校和培訓(xùn)學(xué)校相關(guān)專業(yè)的培訓(xùn)教材。
本書作者楊正洪是國內(nèi)知名大數(shù)據(jù)專家,是華中科技大學(xué)和中國地質(zhì)大學(xué)客座教授,擁有國家專利,是湖北省2013年海外引進(jìn)的科技人才,受武漢市政府邀請(qǐng),成立武漢市云升科技發(fā)展有限公司,在浙江和上海分別有全資子公司,在美國硅谷設(shè)有研發(fā)中心。作者在與地方政府、證券金融公司的項(xiàng)目合作中發(fā)現(xiàn),他們對(duì)大數(shù)據(jù)技術(shù)很感興趣,并希望從大數(shù)據(jù)技術(shù)、大數(shù)據(jù)采集、管理、分析以及可視化等方面得到指導(dǎo)和應(yīng)用幫助。因此編寫了這本大數(shù)據(jù)技術(shù)的快速入門書。本書以Hadoop和Spark框架為線索,比較地介紹了Hadoop技術(shù)、Spark技術(shù)、大數(shù)據(jù)存儲(chǔ)、大數(shù)據(jù)訪問、大數(shù)據(jù)采集、大數(shù)據(jù)管理、大數(shù)據(jù)分析等內(nèi)容。后還給出兩個(gè)案例:環(huán)保大數(shù)據(jù)和公安大數(shù)據(jù),供讀者參考。
本書作者楊正洪是國內(nèi)知名大數(shù)據(jù)專家,畢業(yè)于美國State University of New York at Stony Brook,在IBM公司從事大數(shù)據(jù)相關(guān)研發(fā)工作12年多。從2003~2013年,楊正洪在美國加州的IBM硅谷實(shí)驗(yàn)室(IBM Silicon Valley Lab)負(fù)責(zé)IBM大數(shù)據(jù)平臺(tái)的設(shè)計(jì)、研發(fā)和實(shí)施,主持了保險(xiǎn)行業(yè)、金融行業(yè)、政府行業(yè)的大數(shù)據(jù)系統(tǒng)的架構(gòu)設(shè)計(jì)和實(shí)施。楊正洪是華中科技大學(xué)和中國地質(zhì)大學(xué)客座教授,擁有國家專利,是湖北省2013年海外引進(jìn)人才。受武漢市政府邀請(qǐng),楊正洪于2012年12月發(fā)起成立武漢市云升科技發(fā)展有限公司,并獲得東湖高新技術(shù)開發(fā)區(qū)辦公場(chǎng)所和資金支持。目前公司在浙江和上海分別有全資子公司,在美國硅谷設(shè)有研發(fā)中心。公司的核心產(chǎn)品是大數(shù)據(jù)管理平臺(tái)EasyDoop,并以EasyDoop為基礎(chǔ)研發(fā)了公安大數(shù)據(jù)產(chǎn)品和環(huán)保大數(shù)據(jù)產(chǎn)品。這些產(chǎn)品在公安和環(huán)保行業(yè)得到成功實(shí)施,三次被中央電視臺(tái)新聞聯(lián)播節(jié)目播報(bào),省部長級(jí)政府領(lǐng)導(dǎo)親自考察,并給予了很高的評(píng)價(jià)。楊正洪參與了多項(xiàng)大數(shù)據(jù)相關(guān)標(biāo)準(zhǔn)的制定工作,曾受邀參與了公安部主導(dǎo)的“信息安全技術(shù)-大數(shù)據(jù)平臺(tái)安全管理產(chǎn)品安全技術(shù)要求”的國家標(biāo)準(zhǔn)制定。
目 錄
第1章 大數(shù)據(jù)時(shí)代 1
1.1 什么是大數(shù)據(jù) 1
1.2 大數(shù)據(jù)的四大特征 2
1.3 大數(shù)據(jù)的商用化 3
1.4 大數(shù)據(jù)分析 5
1.5 大數(shù)據(jù)與云計(jì)算的關(guān)系 5
1.6 大數(shù)據(jù)的國家戰(zhàn)略 6
1.6.1 政府大數(shù)據(jù)的價(jià)值 7
1.6.2 政府大數(shù)據(jù)的應(yīng)用場(chǎng)景 8
1.7 企業(yè)如何迎接大數(shù)據(jù) 8
1.7.1 評(píng)估大數(shù)據(jù)方案的維度 9
1.7.2 業(yè)務(wù)價(jià)值維度 10
1.7.3 數(shù)據(jù)維度 11
1.7.4 現(xiàn)有IT環(huán)境和成本維度 12
1.7.5 數(shù)據(jù)治理維度 13
1.8 大數(shù)據(jù)產(chǎn)業(yè)鏈分析 14
1.8.1 技術(shù)分析 14
1.8.2 角色分析 15
1.8.3 大數(shù)據(jù)運(yùn)營 17
1.9 大數(shù)據(jù)交易 18
1.10 大數(shù)據(jù)之我見 19
第2章 大數(shù)據(jù)軟件框架 20
2.1 Hadoop框架 20
2.1.1 HDFS(分布式文件系統(tǒng)) 21
2.1.2 MapReduce(分布式計(jì)算框架) 22
2.1.3 YARN(集群資源管理器) 25
2.1.4 Zookeeper(分布式協(xié)作服務(wù)) 28
2.1.5 Ambari(管理工具) 29
2.2 Spark(內(nèi)存計(jì)算框架) 29
2.2.1 Scala 31
2.2.2 Spark SQL 32
2.2.3 Spark Streaming 33
2.3 實(shí)時(shí)流處理框架 34
2.4 框架的選擇 35
第3章 安裝與配置大數(shù)據(jù)軟件 36
3.1 Hadoop發(fā)行版 36
3.1.1 Cloudera 36
3.1.2 HortonWorks 37
3.1.3 MapR 38
3.2 安裝Hadoop前的準(zhǔn)備工作 39
3.2.1 Linux主機(jī)配置 40
3.2.2 配置Java環(huán)境 41
3.2.3 安裝NTP和python 42
3.2.4 安裝和配置openssl 43
3.2.5 啟動(dòng)和停止特定服務(wù) 44
3.2.6 配置SSH無密碼訪問 44
3.3 安裝Ambari 和 HDP 45
3.3.1 配置安裝包文件 45
3.3.2 安裝 Ambari 46
3.3.3 安裝和配置HDP 47
3.4 初識(shí)Hadoop 49
3.4.1 啟動(dòng)和停止服務(wù) 50
3.4.2 使用HDFS 51
3.5 Hadoop的特性 52
第4章 大數(shù)據(jù)存儲(chǔ):文件系統(tǒng) 53
4.1 HDFS shell命令 53
4.2 HDFS配置文件 55
4.3 HDFS API編程 57
4.3.1 讀取HDFS文件內(nèi)容 57
4.3.2 寫HDFS文件內(nèi)容 60
4.4 HDFS API總結(jié) 62
4.4.1 Configuration類 62
4.4.2 FileSystem抽象類 62
4.4.3 Path類 63
4.4.4 FSDataInputStream類 63
4.4.5 FSDataOutputStream類 63
4.4.6 IOUtils類 63
4.4.7 FileStatus類 64
4.4.8 FsShell類 64
4.4.9 ChecksumFileSystem抽象類 64
4.4.10 其他HDFS API實(shí)例 64
4.4.11 綜合實(shí)例 67
4.5 HDFS文件格式 69
4.5.1 SequenceFile 70
4.5.2 TextFile(文本格式) 70
4.5.3 RCFile 70
4.5.4 Avro 72
第5章 大數(shù)據(jù)存儲(chǔ):數(shù)據(jù)庫 73
5.1 NoSQL 73
5.2 HBase管理 74
5.2.1 HBase表結(jié)構(gòu) 75
5.2.2 HBase系統(tǒng)架構(gòu) 78
5.2.3 啟動(dòng)并操作HBase數(shù)據(jù)庫 80
5.2.4 HBase Shell工具 82
5.3 HBase編程 86
5.3.1 增刪改查API 86
5.3.2 過濾器 90
5.3.3 計(jì)數(shù)器 93
5.3.4 原子操作 94
5.3.5 管理API 94
5.4 其他NoSQL數(shù)據(jù)庫 95
第6章 大數(shù)據(jù)訪問:SQL引擎層 97
6.1 Phoenix 97
6.1.1 安裝和配置Phoenix 98
6.1.2 在eclipse上開發(fā)phoenix程序 104
6.1.3 Phoenix SQL工具 108
6.1.4 Phoenix SQL 語法 109
6.2 Hive 111
6.2.1 Hive架構(gòu) 111
6.2.2 安裝Hive 112
6.2.3 Hive和MySQL的配置 114
6.2.4 Hive CLI 115
6.2.5 Hive數(shù)據(jù)類型 115
6.2.6 HiveQL DDL 119
6.2.7 HiveQL DML 121
6.2.8 Hive編程 123
6.2.9 HBase集成 125
6.2.10 XML和JSON數(shù)據(jù) 127
6.2.11 使用Tez 128
6.3 Pig 130
6.3.1 Pig語法 131
6.3.2 Pig和Hive的使用場(chǎng)景比較 134
6.4 ElasticSearch(全文搜索引擎) 136
6.4.1 全文索引的基礎(chǔ)知識(shí) 136
6.4.2 安裝和配置ES 138
6.4.3 ES API 140
第7章 大數(shù)據(jù)采集和導(dǎo)入 143
7.1 Flume 145
7.1.1 Flume架構(gòu) 145
7.1.2 Flume事件 146
7.1.3 Flume源 147
7.1.4 Flume攔截器(Interceptor) 148
7.1.5 Flume通道選擇器(Channel Selector) 149
7.1.6 Flume通道 150
7.1.7 Flume接收器 151
7.1.8 負(fù)載均衡和單點(diǎn)失敗 153
7.1.9 Flume監(jiān)控管理 153
7.1.10 Flume實(shí)例 154
7.2 Kafka 155
7.2.1 Kafka架構(gòu) 156
7.2.2 Kafka與JMS的異同 158
7.2.3 Kafka性能考慮 158
7.2.4 消息傳送機(jī)制 159
7.2.5 Kafka和Flume的比較 159
7.3 Sqoop 160
7.3.1 從數(shù)據(jù)庫導(dǎo)入HDFS 160
7.3.2 增量導(dǎo)入 163
7.3.3 將數(shù)據(jù)從Oracle導(dǎo)入Hive 163
7.3.4 將數(shù)據(jù)從Oracle導(dǎo)入HBase 164
7.3.5 導(dǎo)入所有表 165
7.3.6 從HDFS導(dǎo)出數(shù)據(jù) 165
7.3.7 數(shù)據(jù)驗(yàn)證 165
7.3.8 其他Sqoop功能 165
7.4 Storm 167
7.4.1 Storm基本概念 168
7.4.2 spout 169
7.4.3 bolt 171
7.4.4 拓?fù)?173
7.4.5 Storm總結(jié) 175
7.5 Splunk 175
第8章 大數(shù)據(jù)管理平臺(tái) 177
8.1 大數(shù)據(jù)建設(shè)總體架構(gòu) 177
8.2 大數(shù)據(jù)管理平臺(tái)的必要性 178
8.3 大數(shù)據(jù)管理平臺(tái)的功能 179
8.3.1 推進(jìn)數(shù)據(jù)資源整合共享 179
8.3.2 增強(qiáng)數(shù)據(jù)管理水平 180
8.3.3 支撐創(chuàng)新大數(shù)據(jù)分析 180
8.4 數(shù)據(jù)管理平臺(tái)(DMP) 180
8.5 EasyDoop案例分析 182
8.5.1 大數(shù)據(jù)建模平臺(tái) 183
8.5.2 大數(shù)據(jù)交換和共享平臺(tái) 184
8.5.3 大數(shù)據(jù)云平臺(tái) 185
8.5.4 大數(shù)據(jù)服務(wù)平臺(tái) 186
8.5.5 EasyDoop平臺(tái)技術(shù)原理分析 188
第9章 Spark技術(shù) 192
9.1 Spark框架 192
9.1.1 安裝Spark 193
9.1.2 配置Spark 194
9.2 Spark Shell 195
9.3 Spark編程 198
9.3.1 編寫Spark API程序 198
9.3.2 使用sbt編譯并打成jar包 199
9.3.3 運(yùn)行程序 200
9.4 RDD 200
9.4.1 RDD算子和RDD依賴關(guān)系 201
9.4.2 RDD轉(zhuǎn)換操作 203
9.4.3 RDD行動(dòng)(Action)操作 204
9.4.4 RDD控制操作 205
9.4.5 RDD實(shí)例 205
9.5 Spark SQL 208
9.5.1 DataFrame 209
9.5.2 RDD轉(zhuǎn)化為DataFrame 213
9.5.3 JDBC數(shù)據(jù)源 215
9.5.4 Hive數(shù)據(jù)源 216
9.6 Spark Streaming 217
9.6.1 DStream編程模型 218
9.6.2 DStream操作 221
9.6.3 性能考慮 223
9.6.4 容錯(cuò)能力 224
9.7 GraphX圖計(jì)算框架 224
9.7.1 屬性圖 226
9.7.2 圖操作符 228
9.7.3 屬性操作 231
9.7.4 結(jié)構(gòu)操作 231
9.7.5 關(guān)聯(lián)(join)操作 233
9.7.6 聚合操作 234
9.7.7 計(jì)算度信息 235
9.7.8 緩存操作 236
9.7.9 圖算法 236
第10章 大數(shù)據(jù)分析 238
10.1 數(shù)據(jù)科學(xué) 239
10.1.1 探索性數(shù)據(jù)分析 240
10.1.2 描述統(tǒng)計(jì) 241
10.1.3 數(shù)據(jù)可視化 241
10.2 預(yù)測(cè)分析 244
10.2.1 預(yù)測(cè)分析實(shí)例 244
10.2.2 回歸(Regression)分析預(yù)測(cè)法 246
10.3 機(jī)器學(xué)習(xí) 247
10.3.1 機(jī)器學(xué)習(xí)的市場(chǎng)動(dòng)態(tài) 248
10.3.2 機(jī)器學(xué)習(xí)分類 249
10.3.3 機(jī)器學(xué)習(xí)算法 251
10.4 Spark MLib 252
10.4.1 MLib架構(gòu) 253
10.4.2 MLib算法庫 253
10.4.3 決策樹 257
10.5 深入了解算法 261
10.5.1 分類算法 262
10.5.2 預(yù)測(cè)算法 263
10.5.3 聚類分析 263
10.5.4 關(guān)聯(lián)分析 264
10.5.5 異常值分析算法 266
10.5.6 協(xié)同過濾(推薦引擎)算法 267
10.6 Mahout簡(jiǎn)介 267
第11章 案例分析:環(huán)保大數(shù)據(jù) 268
11.1 環(huán)保大數(shù)據(jù)管理平臺(tái) 268
11.2 環(huán)保大數(shù)據(jù)應(yīng)用平臺(tái) 269
11.2.1 環(huán)境自動(dòng)監(jiān)測(cè)監(jiān)控服務(wù) 270
11.2.2 綜合查詢服務(wù) 272
11.2.3 統(tǒng)計(jì)分析服務(wù) 272
11.2.4 GIS服務(wù) 274
11.2.5 視頻服務(wù) 274
11.2.6 預(yù)警服務(wù) 275
11.2.7 應(yīng)急服務(wù) 276
11.2.8 電子政務(wù)服務(wù) 277
11.2.9 智能化運(yùn)營管理系統(tǒng) 279
11.2.10 環(huán)保移動(dòng)應(yīng)用系統(tǒng) 279
11.2.11 空氣質(zhì)量系統(tǒng) 280
11.3 環(huán)保大數(shù)據(jù)分析系統(tǒng) 280
第12章 案例分析:公安大數(shù)據(jù) 281
12.1 總體架構(gòu)設(shè)計(jì) 281
12.2 建設(shè)內(nèi)容 282
12.3 建設(shè)步驟 284
附錄 1 數(shù)據(jù)量的單位級(jí)別 285
附錄 2 Linux Shell常見命令 286
附錄 3 Ganglia(分布式監(jiān)控系統(tǒng)) 289
附錄 4 auth-ssh腳本 290
附錄 5 作者簡(jiǎn)介 292
第 9 章 Spark技術(shù)
Apache Spark 是一個(gè)新興的大數(shù)據(jù)處理通用引擎,提供了分布式的內(nèi)存抽象。Spark較大的特點(diǎn)就是快(Lightning-Fast),可比 Hadoop MapReduce 的處理速度快 100 倍。此外,Spark 提供了簡(jiǎn)單易用的 API,幾行代碼就能實(shí)現(xiàn) WordCount。本章介紹Spark 的框架,Spark Shell 、RDD、Spark SQL、Spark Streaming 等的基本使用。
9.1 Spark框架
Spark作為新一代大數(shù)據(jù)快速處理平臺(tái),集成了大數(shù)據(jù)相關(guān)的各種能力。Hadoop的中間數(shù)據(jù)需要存儲(chǔ)在硬盤上,這產(chǎn)生了較高的延遲。而Spark基于內(nèi)存計(jì)算,解決了這個(gè)延遲的速度問題。Spark本身可以直接讀寫Hadoop上任何格式數(shù)據(jù),這使得批處理更加快速。
圖9-1是以Spark為核心的大數(shù)據(jù)處理框架。最底層為大數(shù)據(jù)存儲(chǔ)系統(tǒng),如:HDFS、HBase等。在存儲(chǔ)系統(tǒng)上面是Spark集群模式(也可以認(rèn)為是資源管理層),這包括Spark自帶的獨(dú)立部署模式、YARN和Mesos集群資源管理模式,也可以是Amazon EC2。Spark內(nèi)核之上是為應(yīng)用提供各類服務(wù)的組件。Spark內(nèi)核API支持Java、Python、Scala等編程語言。Spark Streaming提供高性、高吞吐量的實(shí)時(shí)流式處理服務(wù),能夠滿足實(shí)時(shí)系統(tǒng)要求;MLib提供機(jī)器學(xué)習(xí)服務(wù),Spark SQL提供了性能比Hive快了很多倍的SQL查詢服務(wù),GraphX提供圖計(jì)算服務(wù)。
圖9-1 Spark 框架
從上圖看出,Spark有效集成了Hadoop組件,可以基于Hadoop YARN作為資源管理框架,并從HDFS和HBase數(shù)據(jù)源上讀取數(shù)據(jù)。YARN是Spark目前主要使用的資源管理器。Hadoop能做的,Spark基本都能做,而且做的比Hadoop好。Spark依然是Hadoop生態(tài)圈的一員,它替換的主要是MR的計(jì)算模型而已。資源調(diào)度依賴于YARN,存儲(chǔ)則依賴于HDFS。
Spark的大數(shù)據(jù)處理平臺(tái)是建立在統(tǒng)一抽象的RDD之上。RDD是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset)的英文簡(jiǎn)稱,它是一種特殊數(shù)據(jù)集合,支持多種來源,有容錯(cuò)機(jī)制,可以被緩存,支持并行操作。Spark的一切都是基于RDD的。RDD就是Spark輸入的數(shù)據(jù)。
Spark應(yīng)用程序在集群上以獨(dú)立進(jìn)程集合的形式運(yùn)行。如圖9-2所示,主程序(叫做Driver程序)中的SparkContext對(duì)象協(xié)調(diào)Spark應(yīng)用程序。SparkContext對(duì)象首先連接到多種集群管理器(如:YARN),然后在集群節(jié)點(diǎn)上獲得Executor。SparkContext把應(yīng)用代碼發(fā)給Executor,Executor負(fù)責(zé)應(yīng)用程序的計(jì)算和數(shù)據(jù)存儲(chǔ)。
圖9-2 集群模式
每個(gè)應(yīng)用程序都擁有自己的Executor。Executor為應(yīng)用程序提供了一個(gè)隔離的運(yùn)行環(huán)境,以Task的形式執(zhí)行作業(yè)。對(duì)于Spark Shell來說,這個(gè)Driver就是與用戶交互的進(jìn)程。
9.1.1 安裝Spark
近期的Spark版本是1.6.1。它可以運(yùn)行在Windows或Linux機(jī)器上。運(yùn)行 Spark 需要 Java JDK 1.7,CentOS 6.x 系統(tǒng)默認(rèn)只安裝了 Java JRE,還需要安裝 Java JDK,并確保配置好 JAVA_HOME、PATH和CLASSPATH變量。此外,Spark 會(huì)用到 HDFS 與 YARN,因此讀者要先安裝好 Hadoop。我們可以從Spark官方網(wǎng)站spark.apache.org/downloads.html上下載Spark,如圖9-3所示。
圖9-3 下載安裝包
有幾種Package type,分別為:
l Source code:Spark 源碼,需要編譯才能使用。
l Pre-build with user-provided Hadoop:“Hadoop free”版,可應(yīng)用到任意 Hadoop 版本。
l Pre-build for Hadoop 2.6 and later:基于 Hadoop 2.6 的預(yù)編譯版,需要與本機(jī)安裝的 Hadoop 版本對(duì)應(yīng)。可選的還有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。
本書選擇的是 Pre-build with user-provided Hadoop,簡(jiǎn)單配置后可應(yīng)用到任意 Hadoop 版本。下載后,執(zhí)行如下命令進(jìn)行安裝:
sudo tar -zxf spark-1.6.1-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-1.6.1-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark
9.1.2 配置Spark
安裝后,進(jìn)入conf目錄,以spark-env.sh.template文件為模塊創(chuàng)建spark-env.sh文件,然后修改其配置信息,命令如下:
cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
編輯 ./conf/spark-env.sh(vim ./conf/spark-env.sh),在文件的加上如下一行:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath
保存后,Spark 就可以啟動(dòng)和運(yùn)行了。在 ./examples/src/main 目錄下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等語言的版本。我們可以先運(yùn)行一個(gè)示例程序 SparkPi(即計(jì)算 π 的近似值),執(zhí)行如下命令:
cd /usr/local/spark
./bin/run-example SparkPi
執(zhí)行時(shí)會(huì)輸出非常多的運(yùn)行信息,輸出結(jié)果不容易找到,可以通過 grep 命令進(jìn)行過濾(命令中的 2>&1 可以將所有的信息都輸出到 stdout 中):
./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
過濾后的運(yùn)行結(jié)果為 π 的 5 位小數(shù)近似值 。
9.2 Spark Shell
以前的統(tǒng)計(jì)和機(jī)器學(xué)習(xí)依賴于數(shù)據(jù)抽樣。從統(tǒng)計(jì)的角度來看,抽樣如果足夠隨機(jī),其實(shí)可以很精準(zhǔn)地反應(yīng)全集的結(jié)果,但事實(shí)上往往很難做到隨機(jī),所以通常做出來也會(huì)不準(zhǔn)。現(xiàn)在大數(shù)據(jù)解決了這個(gè)問題,它不是通過優(yōu)化抽樣的隨機(jī)來解決,而是通過全量數(shù)據(jù)來解決。要解決全量的數(shù)據(jù)就需要有強(qiáng)大的處理能力,Spark首先具備強(qiáng)大的處理能力,其次Spark Shell帶來了即席查詢。做算法的工程師,以前經(jīng)常是在小數(shù)據(jù)集上跑個(gè)單機(jī),然后看效果不錯(cuò),一到全量上,就可能和單機(jī)效果很不一樣。有了Spark后就不一樣了,尤其是有了Spark Shell。可以邊寫代碼,邊運(yùn)行,邊看結(jié)果。Spark提供了很多的算法,最常用的是貝葉斯、word2vec、線性回歸等。作為算法工程師,或者大數(shù)據(jù)分析師,一定要學(xué)會(huì)用Spark Shell。
Spark Shell 提供了簡(jiǎn)單的方式來學(xué)習(xí) Spark API,也提供了交互的方式來分析數(shù)據(jù)。Spark Shell 支持 Scala 和 Python,本書選擇使用 Scala 來進(jìn)行介紹。Scala集成了面向?qū)ο蠛秃瘮?shù)語言的特性,并運(yùn)行于Java 虛擬機(jī)之上,兼容現(xiàn)有的 Java 程序。Scala 是 Spark 的主要編程語言,如果僅僅是寫 Spark 應(yīng)用,并非一定要用 Scala,用Java和Python都是可以的。使用 Scala 的優(yōu)勢(shì)是開發(fā)效率更高,代碼更精簡(jiǎn),并且可以通過 Spark Shell 進(jìn)行交互式實(shí)時(shí)查詢,方便排查問題。執(zhí)行如下命令啟動(dòng) Spark Shell:
./bin/spark-shell
啟動(dòng)成功后會(huì)有“scala >”的命令提示符。這表明已經(jīng)成功啟動(dòng)了Spark Shell。在 Spark Shell 啟動(dòng)時(shí),輸出日志的有這么幾條信息:
16/04/16 17:25:47 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
這些信息表明 SparkContext已經(jīng)初始化好了,可通過對(duì)應(yīng)的sc變量直接進(jìn)行訪問。Spark 的主要抽象是分布式的數(shù)據(jù)集合RDD,它可被分發(fā)到集群各個(gè)節(jié)點(diǎn)上,進(jìn)行并行操作。一個(gè)RDD可以通過 Hadoop InputFormats 創(chuàng)建(如 HDFS),或者從其他 RDDs轉(zhuǎn)化而來。下面我們從 ./README 文件新建一個(gè) RDD,代碼如下:
scala>val textFile = sc.textFile("file:///usr/local/spark/README.md"
上述的sc是Spark創(chuàng)建的SparkContext,我們使用SparkContext對(duì)象加載本地文件README.md來創(chuàng)建RDD。輸出結(jié)果如下:
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :27
上述返回結(jié)果為一個(gè)MapPartitionsRDD文件。需要說明的是,加載HDFS文件和本地文件都是使用textFile ,區(qū)別在于前綴“hdfs://”為HDFS文件,而“file:// ”為本地文件。上述代碼中通過“file://”前綴指定讀取本地文件,直接返回MapPartitionsRDD。Spark Shell默認(rèn)方式是讀取HDFS中的文件。從HDFS讀取的文件先轉(zhuǎn)換為HadoopRDD,然后隱式轉(zhuǎn)換成MapPartitionsRDD。
上面的例子使用Spark中的文本文件README.md創(chuàng)建一個(gè)RDD textFile,文件中包含了若干文本行。將該文本文件讀入RDD textFile時(shí),其中的文本行將被分區(qū),以便能夠分發(fā)到集群中并行化操作。我們可以想象,RDD有多個(gè)分區(qū),每個(gè)分區(qū)上有多行的文本內(nèi)容。RDDs 支持兩種類型的操作:
l actions:在數(shù)據(jù)集上運(yùn)行計(jì)算后返回結(jié)果值。
l transformations:轉(zhuǎn)換。從現(xiàn)有RDD創(chuàng)建一個(gè)新的RDD。
下面我們演示count()和first()操作:
scala>textFile.count() // RDD 中的 item 數(shù)量,對(duì)于文本文件,就是總行數(shù)
輸出結(jié)果為:
res0: Long = 95
scala>textFile.first() // RDD 中的及時(shí)個(gè) item,對(duì)于文本文件,就是及時(shí)行內(nèi)容
輸出結(jié)果為:
res1: String = # Apache Spark
上面這兩個(gè)例子都是action的例子。接著演示 transformation,通過 filter transformation來篩選出包含 Spark 的行,返回一個(gè)新的RDD,代碼如下:
scala>val linesWithSpark = textFile.filter(line => line.contains("Spark"
scala>linesWithSpark.count() // 統(tǒng)計(jì)行數(shù)
上面的linesWithSpark RDD有多個(gè)分區(qū),每個(gè)分區(qū)上只有包含了Spark的若干文本行。輸出結(jié)果為:
res4: Long = 17
上述結(jié)果表明一共有17行內(nèi)容包含“Spark”,這與通過 Linux 命令 cat ./README.md | grep "Spark" -c 得到的結(jié)果一致,說明是正確的。action 和 transformation 可以用鏈?zhǔn)讲僮鞯姆绞浇Y(jié)合使用,使代碼更為簡(jiǎn)潔:
scala>textFile.filter(line => line.contains("Spark")).count() // 統(tǒng)計(jì)包含 Spark 的行數(shù)
RDD的actions和transformations可用在更復(fù)雜的計(jì)算中。例如,通過如下代碼可以找到包含單詞最多的那一行內(nèi)容共有幾個(gè)單詞:
scala>textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b
輸出結(jié)果為:
res5: Int = 14
上述代碼將每一行文本內(nèi)容使用split進(jìn)行分詞,并統(tǒng)計(jì)分詞后的單詞數(shù)。將每一行內(nèi)容map為一個(gè)整數(shù),這將創(chuàng)建一個(gè)新的 RDD,并在這個(gè) RDD 中執(zhí)行reduce操作,找到較大的數(shù)。map()、reduce()中的參數(shù)是Scala的函數(shù)字面量(function literals),并且可以使用Scala/Java的庫。例如,通過使用 Math.max() 函數(shù)(需要導(dǎo)入Java的Math庫),可以使上述代碼更容易理解:
scala>import java.lang.Math
scala>textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b
詞頻統(tǒng)計(jì)(WordCount)是Hadoop MapReduce的入門程序,Spark可以更容易地實(shí)現(xiàn)。首先結(jié)合flatMap、map和reduceKey來計(jì)算文件中每個(gè)單詞的詞頻:
scala>val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a b)
輸出結(jié)果為(string,int)類型的鍵值對(duì)ShuffledRDD。這是因?yàn)閞educeByKey操作需要進(jìn)行Shuffle操作,返回的是一個(gè)Shuffle形式的ShuffleRDD:
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :29
然后使用collect聚合單詞計(jì)算結(jié)果:
scala>wordCounts.collect
輸出結(jié)果為:
res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing,1), (Because,1), (The,1)...
Spark 支持將數(shù)據(jù)緩存在集群的內(nèi)存緩存中,當(dāng)數(shù)據(jù)需要反復(fù)訪問時(shí)這個(gè)特征非常有用。調(diào)用 cache(),就可以將數(shù)據(jù)集進(jìn)行緩存:
scala>textFilter.cache
9.3 Spark編程
無論Windows或Linux操作系統(tǒng),都是基于Eclipse或Idea構(gòu)建開發(fā)環(huán)境,通過Java、Scala或Python語言進(jìn)行開發(fā)。根據(jù)開發(fā)語言的不同,我們需要預(yù)先準(zhǔn)備好JDK、Scala或Python環(huán)境,然后在Eclipse中下載安裝Scala或Python插件。
下面我們通過一個(gè)簡(jiǎn)單的應(yīng)用程序 SimpleApp 來演示如何通過 Spark API 編寫一個(gè)獨(dú)立應(yīng)用程序。不同于使用Spark Shell自動(dòng)初始化的SparkContext,獨(dú)立應(yīng)用程序需要自己初始化一個(gè)SparkContext,將一個(gè)包含應(yīng)用程序信息的SparkConf對(duì)象傳遞給SparkContext構(gòu)造函數(shù)。對(duì)于獨(dú)立應(yīng)用程序,使用 Scala 編寫的程序需要使用 sbt 進(jìn)行編譯打包,相應(yīng)地,Java 程序使用 Maven 編譯打包,而 Python 程序通過 spark-submit 直接提交。
在終端中執(zhí)行如下命令,創(chuàng)建一個(gè)文件夾 sparkapp 作為應(yīng)用程序根目錄:
cd ~ # 進(jìn)入用戶主文件夾
mkdir ./sparkapp # 創(chuàng)建應(yīng)用程序根目錄
mkdir -p ./sparkapp/src/main/scala # 創(chuàng)建所需的文件夾結(jié)構(gòu)
9.3.1 編寫Spark API程序
在./sparkapp/src/main/scala下建立一個(gè)名為SimpleApp.scala 的文件(vim ./sparkapp/src/main/scala/SimpleApp.scala),添加代碼如下:
/ SimpleApp.scala /
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
//使用關(guān)鍵字def聲明函數(shù),必須為函數(shù)指定參數(shù)類型
def main(args: Array[String]) {
val logFile = "file:///usr/local/spark/README.md" // 一個(gè)本地文件
//創(chuàng)建SparkConf對(duì)象,該對(duì)象包含應(yīng)用程序的信息
val conf = new SparkConf().setAppName("Simple Application"
//創(chuàng)建SparkContext對(duì)象,該對(duì)象可以訪問Spark集群
val sc = new SparkContext(conf
val logData = sc.textFile(logFile, 2).cache
//line=>line.contains(..)是匿名函數(shù)的定義,line是參數(shù)
val numAs = logData.filter(line => line.contains("a")).count
val numBs = logData.filter(line => line.contains("b")).count
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs
}
}
上述程序計(jì)算 /usr/local/spark/README 文件中包含 “a” 的行數(shù)和包含 “b” 的行數(shù)。不同于 Spark Shell,獨(dú)立應(yīng)用程序需要通過“val sc = new SparkContext(conf)”初始化 SparkContext,SparkContext 的參數(shù) SparkConf 包含了應(yīng)用程序的信息。
9.3.2 使用sbt編譯并打成jar包
該程序依賴 Spark API,因此我們需要通過sbt(或mvn)進(jìn)行編譯打包。我們以sbt為例,創(chuàng)建一個(gè)包含應(yīng)用程序代碼的jar包。在 ./sparkapp 中新建文件 simple.sbt(vim ./sparkapp/simple.sbt),添加如下內(nèi)容,聲明該獨(dú)立應(yīng)用程序的信息以及與 Spark 的依賴關(guān)系:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies = "org.apache.spark" %% "spark-core" % "1.6.1"
文件 simple.sbt 需要指明Spark和Scala的版本。上述版本信息可以從Spark Shell獲得。我們啟動(dòng)Spark Shell的過程中,當(dāng)輸出到 Spark 的符號(hào)圖形時(shí),可以看到相關(guān)的版本信息。
Spark中沒有自帶sbt,需要手動(dòng)安裝sbt,我們選擇安裝在/usr/local/sbt中:
sudo mkdir /usr/local/sbt
sudo chown -R hadoop /usr/local/sbt # 此處的hadoop為你的用戶名
cd /usr/local/sbt
下載sbt后,拷貝至 /usr/local/sbt 中。接著在 /usr/local/sbt 中創(chuàng)建 sbt 腳本(vim ./sbt),添加如下內(nèi)容:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX: CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname
非常好,我很喜歡
好
價(jià)格很優(yōu)惠,書更是好書,買書來當(dāng)當(dāng)是不二選擇。
剛到手,還沒來得及看,應(yīng)該還可以吧!