这里只用到了 Hadoop 的 Hadoop Distributed File System (HDFS),即分布式文件系统。而数据处理是交给 Spark 了。
# 安装
Hadoop (opens new window) 和 Spark (opens new window) 在 Linux 下的安装方法
注意 Hadoop 的 namenode 默认管理 Web 页面是 http://localhost:9870/
,而从 hdfs 协议访问 namenode 是从 9000 端口。
# Hadoop 分布式文件管理
管理 HDFS 的命令和 Linux 的命令很像。
cd $HADOOP_HOME
./sbin/start-dfs.cmd # 启动
hdfs dfs -mkdir /mydata # 创建目录
hdfs dfs -put /path/to/file /mydata # 上传
hdfs dfs -cat /mydata/file # 读取
hdfs dfs -rm -r /mydata # 删除
为配合后面的测试,将爬虫爬取的二手价格数据下载后,上传到 HDFS:
cd $HADOOP_HOME
./sbin/start-dfs.cmd
hdfs dfs -mkdir /mydata
hdfs dfs -put ershoufang_price.txt /mydata
# Spark
还需要安装 pyspark
。
pip3 install pyspark
然后将环境变量写入 /etc/profile
:
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3
需要注销才能生效。
顺便一提,安装过程中还会自动安装 py4j
,因为 Spark 是运行于 Java 之上,需要用 Python 读取 JVM 中的对象。
# 读取文本文件
尝试读取上面上传的 ershoufang_price.txt
。这里的 localhost:9000
可以在 Web 端看到。然后编写一下的 Python 代码,尝试从 HDFS 读取文本文件的内容。
from pyspark.sql import SparkSession
# SparkSession 操作 SparkSQL DataFrame,读取结构化数组
spark = SparkSession \
.builder \
.appName("Spark SQL basic example") \
#.config("spark.some.config.option", "some-value") \
.getOrCreate()
# SparkContext 用于读取文本文件,读取分结构化数据
sc = spark.sparkContext
filePath = "hdfs://localhost:9000/mydata/ershoufang_price.txt"
textFile = sc.textFile(filePath)
# 返回的 textFile 是 RDD (resilient distributed dataset),是文件在 Spark 中的表示方法
data = textFile.collect() # collect 是将所有 datanode 的数据收集整合到 namenode
# 返回的 data 是一个 list,其中每个元素对应 txt 的每一行
print(data)
如果输出了 44', '杭州,下沙,77,17382,44', '杭州,下沙,77,17382,44', '杭州,下沙,77,17382,44', '杭州,下沙,77,17382,44', '杭州,下沙,92,20924,43']
等,则正常。
如果报错:Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
,则检查一下是否把环境变量加入 /etc/profile
。
# 统计城市房源数并存入临时数据库
接下来尝试将数据存到数据库。为测试语法,这里直接输出到 Spark SQL 的全局临时表中,然后做一个简单的查询。
这里的代码是紧接着上面的 textFile = sc.textFile(filePath)
。
from pyspark.sql import SparkSession, Row
filePath = "hdfs://localhost:9000/mydata/ershoufang_price.txt"
textFile = sc.textFile(filePath)
# 返回的 textFile 是 RDD (resilient distributed dataset),是文件在 Spark 中的一个表示
# data = textFile.collect() # collect 是将所有 datanode 的数据收集整合到 namenode
# # 返回的 data 是一个 list,其中每个元素对应 txt 的每一行
# print(data)
# 将 item 变为 (北京,1)
def to_pair(item):
item_list = item.split(',')
return item_list[1], 1
# 将 (北京,1)(北京,1)(成都,1) 整合为 (北京,2)(成都,1)
rdd = textFile.map(to_pair).reduceByKey(lambda x, y: x + y)
# 将上述 rdd 转为 DataFrame,放进临时表
rowRdd = rdd.map(lambda x: Row(city=x[0], count=x[1]))
dataFrame = spark.createDataFrame(rowRdd)
dataFrame.createGlobalTempView('city')
spark.sql('SELECT * FROM global_temp.city ORDER BY count desc LIMIT 5').show()
输出结果如下:
+----+-----+
|city|count|
+----+-----+
|高新| 5717|
|江北| 5100|
|渝中| 4920|
|和平| 4770|
|南岸| 4710|
+----+-----+
重新运行代码,并在 dataFrame.createGlobalTempView('city')
一行打断点暂停。暂停以后可进入 localhost:4040
,即 Spark 的管理页面。
# 将数据存入 MySQL 数据库
这里首先需要提前配好 MySQL 数据库。
然后为 Spark 下载额外的 jar 包,用以操作 MySQL 数据库。
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar # 根据数据库版本而定
mv mysql-connector-java-8.0.20.jar /usr/local/spark/jars
接着上文的 dataFrame = spark.createDataFrame(rowRdd)
写:
dataFrame = spark.createDataFrame(rowRdd)
# dataFrame.createGlobalTempView('city')
# spark.sql('SELECT * FROM global_temp.city ORDER BY count desc LIMIT 5').show()
MySQL_Conn = 'jdbc:mysql://localhost:3306/test_db?serverTimezone=UTC'
conn_param = {'user': 'root',
'password': 'lyh54333',
'driver': 'com.mysql.cj.jdbc.Driver'}
dataFrame.write.jdbc(MySQL_Conn, 'city_count', 'overwrite', conn_param)