California Housing on Spark

问题描述

作为之前练手Spark的项目,在此做个小结。开发语言为Scala。特征和模型都比较简单,需要进阶的读者请止步前往Kaggle…

数据源于1990年加州房屋价格普查数据集

数据描述为:

  • 该地区中心的纬度(latitude)
  • 该地区中心的经度(longitude)
  • 区域内所有房屋屋龄的中位数(housingMedianAge)
  • 区域内总房间数(totalRooms)
  • 区域内总卧室数(totalBedrooms)
  • 区域内总人口数(population)
  • 区域内总家庭数(households)
  • 区域内人均收入中位数(medianIncome)
  • 该区域房价的中位数(medianHouseValue)

目标是利用除最后一行的房屋信息来预测该区域房价。

数据预览

  1. 读取data和domain
    首先观察数据集,了解数据的特性,并尝试对它做一些简单的预处理,如判空去重类型转换等。
    我们将cal_housing.data和cal_housing.domain放入主工程res目录下。
    SparkSession方式启动app,输出domain和data的第一行,观察数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main(args: Array[String]): Unit = {
val appName = "California House"
val master = "local"
val conf = new SparkConf().setAppName(appName).setMaster(master).set("spark.executor.memory", "512mb")
val spark = new SparkSession.Builder()
.config(conf)
.getOrCreate()

californiaHouse(spark)

sc.stop()
}

def californiaHouse(spark: SparkSession) = {
val sc = spark.sparkContext
val sqlc = spark.sqlContext
val calDataPath = "res/cal_housing.data"
val calDataDomain = "res/cal_housing.domain"

//read
val dataRdd = sc.textFile(calDataPath)
val domainRdd = sc.textFile(calDataDomain)
domainRdd.collect().foreach(println)
dataRdd.take(1).foreach(println)
}

输出结果为

longitude: continuous.
latitude: continuous.
housingMedianAge: continuous.
totalRooms: continuous.
totalBedrooms: continuous.
population: continuous.
households: continuous.
medianIncome: continuous.
medianHouseValue: continuous.

以及

-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000

可以看到data中数据全部为连续值,且按domain顺序一一对应。当然只有target是连续值才便于做回归预测分析。

  1. 构建DataFrame
    1
    2
    3
    4
    5
    6
    //build dataFrame
    val rowRdd = dataRdd.map(l => l.split(",")).map(arr => Row.fromSeq(arr))
    val colNames = domainRdd.map(desc => desc.substring(0, desc.indexOf(":"))).map(name => StructField(name, StringType, true)).collect()
    val dfSchema = StructType(colNames)
    val df = changeColumnsType(spark.createDataFrame(rowRdd, dfSchema), Array("households", "housingMedianAge", "latitude", "longitude", "medianHouseValue", "medianIncome", "population", "totalBedRooms", "totalRooms"), DoubleType)
    df.printSchema()

注意这里使用了domain定义的前半部分作为col name,并通过StructField建立schema。data部分则利用Row,其代表一行。
其中用到了changeColumnsType,主要是利用withColumn来将列类型由String改变为Double。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* call withColumn(), change data type of column
* @param df
* @param names
* @param newType
* @return dataFrame
*/
def changeColumnsType(df: DataFrame, names: Array[String], newType: DataType) = {
var ret: DataFrame = df
for (name <- names) {
ret = ret.withColumn(name, df(name).cast(newType))
}
ret
}

输出schema见下。有了df就能够方便的使用SQL来触发Spark Job了。

root
|– longitude: double (nullable = true)
|– latitude: double (nullable = true)
|– housingMedianAge: double (nullable = true)
|– totalRooms: double (nullable = true)
|– totalBedRooms: double (nullable = true)
|– population: double (nullable = true)
|– households: double (nullable = true)
|– medianIncome: double (nullable = true)
|– medianHouseValue: double (nullable = true)

数据预处理

  1. 去除错误数据
    去除dataFrame中的null、NaN

    1
    2
    //exclude empty
    df.na.drop()
  2. 删除、添加特征
    地理位置虽然也是房价的决定因素,但由于这里缺少足够多的信息,经纬度和房价的关联不是那么明显好用。故去除lat、lon。
    房价数字很大,可以除以10000将其单位转为万。
    此外,可以根据现有特征人工制造特征,如每家平均房屋数,即roomsPerHousehold=totalRooms/households。另添加:populationPerHousehold, bedroomsPerRoom, bedroomsPerHousehold

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//exclude empty
df.na.drop()

//refine columns, features selection
df = df
.withColumn("medianHouseValue", col("medianHouseValue") / 10000)
.withColumn("roomsPerHousehold", col("totalRooms") / col("households"))
.withColumn("populationPerHousehold", col("population") / col("households"))
.withColumn("bedroomsPerRoom", col("totalBedRooms") / col("totalRooms"))
.withColumn("bedroomsPerHousehold", col("totalBedRooms") / col("households"))
df = df.select("medianHouseValue",
"totalRooms",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom",
"bedroomsPerHousehold")
df.show(5)

refined_columns

接下来就可以准备给ML使用了。整理成label、features的格式,其中label取第一列,后面所有列作features,使用denseVector包装。

1
2
3
4
5
6
7
8
9
//prepare label and features
val input = df.rdd.map(x => {
val seq = x.toSeq.slice(1, x.size)
val array = seq.map(_.toString.toDouble).toArray
Row.fromTuple((x.toSeq.head, new DenseVector(array)))
})
val structType = StructType(Array(StructField("label", DoubleType), StructField("features", SQLDataTypes.VectorType)))
df = spark.createDataFrame(input, structType)
df.show(5)

label&features

  1. 数据归一化
    为了加速算法收敛、提高精度,需要做数据归一化。借助StandScaler可以很方便的做到。
    1
    2
    3
    4
    //scale
    val scaler = new StandardScaler().setInputCol("features").setOutputCol("features_scaled").fit(df)
    val scaledDf = scaler.transform(df)
    scaledDf.show(5)

scaled

模型训练和预测

利用线性回归模型,其中数据集八二分为train和test。
借助官档
)gridSearch的教程,封装好待选参数矩阵,并交给trainValidationSplit使用。最终经train训练的model在test上使用,观察prediction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val Array(train, test) = scaledDf.randomSplit(Array(0.8d, 0.2d), seed = 12345)
val lr = new LinearRegression().setFeaturesCol("features_scaled").setLabelCol("label")
.setMaxIter(20)

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(1, 0.1, 0.01, 0.001))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
// 80% of the data will be used for training and the remaining 20% for validation.
.setTrainRatio(0.8)

val model = trainValidationSplit.fit(train)
model.transform(test)
.select("features", "label", "prediction")
.show()

| features|label| prediction|
|[9975.0,1743.0,68…| 2.25| 4.924380189334925|

貌似并不理想,应该是特征工程和参数tunning还需努力。也可能是线性回归LR本身不够适合。

//TODO TransmogrifAI