问题描述
作为之前练手Spark的项目,在此做个小结。开发语言为Scala。特征和模型都比较简单,需要进阶的读者请止步前往Kaggle…
数据源于1990年加州房屋价格普查数据集。
数据描述为:
- 该地区中心的纬度(latitude)
- 该地区中心的经度(longitude)
- 区域内所有房屋屋龄的中位数(housingMedianAge)
- 区域内总房间数(totalRooms)
- 区域内总卧室数(totalBedrooms)
- 区域内总人口数(population)
- 区域内总家庭数(households)
- 区域内人均收入中位数(medianIncome)
- 该区域房价的中位数(medianHouseValue)
目标是利用除最后一行的房屋信息来预测该区域房价。
数据预览
- 读取data和domain
首先观察数据集,了解数据的特性,并尝试对它做一些简单的预处理,如判空去重类型转换等。
我们将cal_housing.data和cal_housing.domain放入主工程res
目录下。
SparkSession方式启动app,输出domain和data的第一行,观察数据
1 | def main(args: Array[String]): Unit = { |
输出结果为
longitude: continuous.
latitude: continuous.
housingMedianAge: continuous.
totalRooms: continuous.
totalBedrooms: continuous.
population: continuous.
households: continuous.
medianIncome: continuous.
medianHouseValue: continuous.
以及
-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000
可以看到data中数据全部为连续值,且按domain顺序一一对应。当然只有target是连续值才便于做回归预测分析。
- 构建DataFrame
1
2
3
4
5
6//build dataFrame
val rowRdd = dataRdd.map(l => l.split(",")).map(arr => Row.fromSeq(arr))
val colNames = domainRdd.map(desc => desc.substring(0, desc.indexOf(":"))).map(name => StructField(name, StringType, true)).collect()
val dfSchema = StructType(colNames)
val df = changeColumnsType(spark.createDataFrame(rowRdd, dfSchema), Array("households", "housingMedianAge", "latitude", "longitude", "medianHouseValue", "medianIncome", "population", "totalBedRooms", "totalRooms"), DoubleType)
df.printSchema()
注意这里使用了domain定义的前半部分作为col name,并通过StructField建立schema。data部分则利用Row
,其代表一行。
其中用到了changeColumnsType,主要是利用withColumn来将列类型由String改变为Double。1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* call withColumn(), change data type of column
* @param df
* @param names
* @param newType
* @return dataFrame
*/
def changeColumnsType(df: DataFrame, names: Array[String], newType: DataType) = {
var ret: DataFrame = df
for (name <- names) {
ret = ret.withColumn(name, df(name).cast(newType))
}
ret
}
输出schema见下。有了df就能够方便的使用SQL来触发Spark Job了。
root
|– longitude: double (nullable = true)
|– latitude: double (nullable = true)
|– housingMedianAge: double (nullable = true)
|– totalRooms: double (nullable = true)
|– totalBedRooms: double (nullable = true)
|– population: double (nullable = true)
|– households: double (nullable = true)
|– medianIncome: double (nullable = true)
|– medianHouseValue: double (nullable = true)
数据预处理
去除错误数据
去除dataFrame中的null、NaN1
2//exclude empty
df.na.drop()删除、添加特征
地理位置虽然也是房价的决定因素,但由于这里缺少足够多的信息,经纬度和房价的关联不是那么明显好用。故去除lat、lon。
房价数字很大,可以除以10000将其单位转为万。
此外,可以根据现有特征人工制造特征,如每家平均房屋数,即roomsPerHousehold=totalRooms/households
。另添加:populationPerHousehold, bedroomsPerRoom, bedroomsPerHousehold
1 | //exclude empty |
接下来就可以准备给ML使用了。整理成label、features的格式,其中label取第一列,后面所有列作features,使用denseVector包装。
1 | //prepare label and features |
- 数据归一化
为了加速算法收敛、提高精度,需要做数据归一化。借助StandScaler可以很方便的做到。1
2
3
4//scale
val scaler = new StandardScaler().setInputCol("features").setOutputCol("features_scaled").fit(df)
val scaledDf = scaler.transform(df)
scaledDf.show(5)
模型训练和预测
利用线性回归模型,其中数据集八二分为train和test。
借助官档
)gridSearch的教程,封装好待选参数矩阵,并交给trainValidationSplit使用。最终经train训练的model在test上使用,观察prediction
1 | val Array(train, test) = scaledDf.randomSplit(Array(0.8d, 0.2d), seed = 12345) |
| features|label| prediction|
|[9975.0,1743.0,68…| 2.25| 4.924380189334925|
貌似并不理想,应该是特征工程和参数tunning还需努力。也可能是线性回归LR本身不够适合。
//TODO TransmogrifAI