介绍
MLlib中有两个lr实现。分别在mllib和ml中。此处分析的是ml,这也是后续mllib继续重点发展的一个库, mllib则会渐渐淡出历史舞台。ml的logistic回归,代码在org.apache.spark.ml.classification.LogisticRegression中实现。在详见代码之前,我们先简单地看下LR可调节的参数. [暂时修正至以1.6.1代码为参考.]
一、LR模型的参数
参数:
threshold : 如果label 1的概率>threshold,则预测为1,否则为0.
regParam :正则项参数(相当于 $ \lambda $)
elasticNetParam :ElasticNet混合参数 $ \alpha $ 。如果$ \alpha = 0 $, 惩罚项是一个L2 penalty。如果$ \alpha = 1 $,它是一个L1 penalty。如果$ 0 < \alpha < 1 $,则是L1和L2的结果。缺省为0.0,为L2罚项。
maxIter : 缺省100次。
tol :迭代收敛的容忍度。值越小,会得到更高精度,同时迭代次数开销也大。缺省为1E-6.
fitIntercept : 是否要拟合截距项(intercept term),缺省为true.
standardization :在对模型进行fit前,是否对训练特征进行标准化(归一化)。模型的系数将总是返回到原比例(origin scale)上,它对用户是透明的。注意:当不使用正则化时,使用/不使用standardization,模型都应收敛到相同的解(solution)上。在R的GLMNET包里,缺省行为也为true。缺省为true。
weightCol :如果没设置或空,所有实例为有为1的权重。如果设置,则相当于对unbalanced data设置不同的权重。缺省不设置。
treeAggregate :如果特征维度或分区数太大,该参数需要调得更大。缺省为2.
二、原理
logistic回归的原理,这里不详述。简单地写:y = logit(∑wx + b)。相应的loss和梯度如下所示(箭头表示向量):
\[l_{total}(\vec{w}, \vec{x})=l_{model}(\vec{w}, \vec{x})+l_{reg}(\vec{w})\]
\[\vec{G} (\vec{w}, \vec{x})_{total}= \vec{G} (\vec{w}, \vec{x})_{model} + \vec{G} (\vec{w})_{reg}\]
$l_{model}$为cross-entropy;$l_{reg}$为正则项。
对于第一部分$l_{model}$的计算,依赖于训练数据;对于第二部分正则项,它不依赖于训练数据。因而这两部分在实现时是独立计算的。
每个训练样本的loss和gradient是独立的。
样本i的梯度:
\[(\vec{G}(\vec{w}, \vec{x})_{model})_i= G_i(\vec{w}, \vec{x})= \frac{\partial{l(\vec{w}, \vec{x})}}{\partial{w_i}}=\sum_{k=1}^{N}y_{k} x_{ki} - \frac{exp(\vec{x}_{k} \vec{w})}{1+exp(\vec{x}_k,\vec{w})}x_{ki}\]
样本的loss:
\[l(\vec{w}, \vec{x})_{model}=-\sum_{k=1}^{N}y_{k} \vec{x}_{k} \vec{w} - log(1+exp(\vec{x}_k \vec{w}))\]
对于spark来说,ml的logistic回归实现通过treeAggregate来完成。在executors/workers上计算各RDD上的loss和gradient;在driver/controller上将它们进行reduce进行所有训练样本的求和,得到lossSum和gradientSum。
在单机的driver上完成Optimization; L1正则由OWLQN optimizer处理。L2由L-BFGS处理。这两个优化器都由Breeze库提供(Breeze库提供了Vector/Matrix的实现以及相应计算的接口Linalg)。
整个过程如图所示:
三、loss与gradient
主要在LogisticCostFun类中,具体见代码,这里仅把核心代码及注释帖出来。可以对应于上面的公式。
private class LogisticCostFun (
instances : RDD [ Instance ],
numClasses : Int ,
fitIntercept : Boolean ,
standardization : Boolean ,
featuresStd : Array [ Double ],
featuresMean : Array [ Double ],
regParamL2 : Double ) extends DiffFunction [ BDV [ Double ]] {
override def calculate ( coefficients : BDV [ Double ]) : ( Double , BDV [ Double ]) = {
val numFeatures = featuresStd . length
val coeffs = Vectors . fromBreeze ( coefficients )
// step 1: cost部分.
val logisticAggregator = {
val seqOp = ( c : LogisticAggregator , instance : Instance ) => c . add ( instance )
val combOp = ( c1 : LogisticAggregator , c2 : LogisticAggregator ) => c1 . merge ( c2 )
// 先在rdd的每个分区上应用seqOp函数,做add操作(计算每个样本的loss/gradient)
// 再在driver上应用comOp函数,做merge操作(求得总的loss/gradient)
instances . treeAggregate (
new LogisticAggregator ( coeffs , numClasses , fitIntercept , featuresStd , featuresMean )
)( seqOp , combOp )
}
// 求得总的gradientArray.
val totalGradientArray = logisticAggregator . gradient . toArray
// step 2: L2正则项部分 => regVal.
// reg = lambda * ∑ regParamL2/2 wi^2 + (1-regParamL2) |wi|
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if ( regParamL2 == 0.0 ) {
0.0
} else {
var sum = 0.0
coeffs . foreachActive { ( index , value ) =>
// If `fitIntercept` is true, the last term which is intercept doesn't
// contribute to the regularization.
if ( index != numFeatures ) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
// gradientArray: costFun项的梯度 + 正则项的梯度
// sum: L2正则
sum += {
if ( standardization ) {
totalGradientArray ( index ) += regParamL2 * value
value * value
} else {
//
if ( featuresStd ( index ) != 0.0 ) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / ( featuresStd ( index ) * featuresStd ( index ))
totalGradientArray ( index ) += regParamL2 * temp
value * temp
} else {
0.0
}
}
}
}
}
0.5 * regParamL2 * sum
}
// 返回带L2正则的loss,以及带L2正则的梯度.
( logisticAggregator . loss + regVal , new BDV ( totalGradientArray ))
}
}
里面会涉及到另一个类:LogisticAggregator。它的作用,相当于做map/reduce运算,计算loss/gradient。
private class LogisticAggregator (
coefficients : Vector ,
numClasses : Int ,
fitIntercept : Boolean ,
featuresStd : Array [ Double ],
featuresMean : Array [ Double ]) extends Serializable {
private var weightSum = 0.0
private var lossSum = 0.0
// coefficients => coefficientsArray 数组.
private val coefficientsArray = coefficients match {
case dv : DenseVector => dv . values
case _ =>
throw new IllegalArgumentException (
s "coefficients only supports dense vector but got type ${coefficients.getClass}." )
}
// 总的维度.
private val dim = if ( fitIntercept ) coefficientsArray . length - 1 else coefficientsArray . length
//gradientSumArray.
private val gradientSumArray = Array . ofDim [ Double ]( coefficientsArray . length )
/**
* 将一个训练样本添加到LogisticAggregator, 并更新目标函数的loss和gradient.
*
* Add a new training instance to this LogisticAggregator, and update the loss and gradient
* of the objective function.
*
* @param instance The instance of data point to be added.
* @return This LogisticAggregator object.
*/
def add ( instance : Instance ) : this. type = {
instance match { case Instance ( label , weight , features ) =>
require ( dim == features . size , s "Dimensions mismatch when adding new instance." +
s " Expecting $dim but got ${features.size}." )
require ( weight >= 0.0 , s "instance weight, $weight has to be >= 0.0" )
if ( weight == 0.0 ) return this
val localCoefficientsArray = coefficientsArray
val localGradientSumArray = gradientSumArray
numClasses match {
case 2 =>
// For Binary Logistic Regression.
// step 1: 二分类,求得 z = ∑ w*x + b,取负号.
val margin = - {
var sum = 0.0
// a.每个特征号上,单独做求和运算. ∑ w*x + b
// 此处是做 ∑ w*x 运算. (是否做了归一化)
features . foreachActive { ( index , value ) =>
if ( featuresStd ( index ) != 0.0 && value != 0.0 ) {
sum += localCoefficientsArray ( index ) * ( value / featuresStd ( index ))
}
}
// 此处是 + b的运算.
sum + {
if ( fitIntercept ) localCoefficientsArray ( dim ) else 0.0
}
}
// 这里的label采用的是(1,0).
// step 2: 乘以该样本所带的unbalanced weight(样本所占比重, 缺省weight=1).
val multiplier = weight * ( 1.0 / ( 1.0 + math . exp ( margin )) - label )
// step 3: 更新localGradient.
// gradient = 1/n ∑ multiplier * x
features . foreachActive { ( index , value ) =>
if ( featuresStd ( index ) != 0.0 && value != 0.0 ) {
localGradientSumArray ( index ) += multiplier * ( value / featuresStd ( index ))
}
}
if ( fitIntercept ) {
localGradientSumArray ( dim ) += multiplier
}
// step 4: 如果label为正例, loss为cross-entropy: 1/n * ∑ ln(1+exp(-y*wx))
// lossSum.
if ( label > 0 ) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * MLUtils . log1pExp ( margin )
} else {
lossSum += weight * ( MLUtils . log1pExp ( margin ) - margin )
}
case _ =>
new NotImplementedError ( "LogisticRegression with ElasticNet in ML package " +
"only supports binary classification for now." )
}
//
weightSum += weight
this
}
}
/**
* 进行merge: 方便分布式计算.
*
* Merge another LogisticAggregator, and update the loss and gradient
* of the objective function.
* (Note that it's in place merging; as a result, `this` object will be modified.)
*
* @param other The other LogisticAggregator to be merged.
* @return This LogisticAggregator object.
*/
def merge ( other : LogisticAggregator ) : this. type = {
require ( dim == other . dim , s "Dimensions mismatch when merging with another " +
s "LeastSquaresAggregator. Expecting $dim but got ${other.dim}." )
if ( other . weightSum != 0.0 ) {
weightSum += other . weightSum
lossSum += other . lossSum
var i = 0
val localThisGradientSumArray = this . gradientSumArray
val localOtherGradientSumArray = other . gradientSumArray
val len = localThisGradientSumArray . length
// 以local为准. 每列coff所对应梯度进行叠加.
while ( i < len ) {
localThisGradientSumArray ( i ) += localOtherGradientSumArray ( i )
i += 1
}
}
this
}
// 求得最终loss.
def loss : Double = {
require ( weightSum > 0.0 , s "The effective number of instances should be " +
s "greater than 0.0, but $weightSum." )
lossSum / weightSum
}
// 求得最终gradient.
def gradient : Vector = {
require ( weightSum > 0.0 , s "The effective number of instances should be " +
s "greater than 0.0, but $weightSum." )
val result = Vectors . dense ( gradientSumArray . clone ())
scal ( 1.0 / weightSum , result )
result
}
}
四、训练过程
override protected def train ( dataset : DataFrame ) : LogisticRegressionModel = {
// 从数据集抽取列. 如果数据集是persisted,不需要persist oldDataset.
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val w = if ( $ ( weightCol ). isEmpty ) lit ( 1.0 ) else col ( $ ( weightCol ))
// 选取labelCol, weightCol, featuresCol作为row
val instances : RDD [ Instance ] = dataset . select ( col ( $ ( labelCol )), w , col ( $ ( featuresCol ))). map {
case Row ( label : Double , weight : Double , features : Vector ) =>
Instance ( label , weight , features )
}
// 是否持久化. 如果持久化,将训练样本进行cache. (MEMORY_AND_DISK,如果内存不够,会存成disk)
// 这里有可能影响性能.
val handlePersistence = dataset . rdd . getStorageLevel == StorageLevel . NONE
if ( handlePersistence ) instances . persist ( StorageLevel . MEMORY_AND_DISK )
// 统计.
val ( summarizer , labelSummarizer ) = {
val seqOp = ( c : ( MultivariateOnlineSummarizer , MultiClassSummarizer ),
instance : Instance ) =>
( c . _1 . add ( instance . features , instance . weight ), c . _2 . add ( instance . label , instance . weight ))
val combOp = ( c1 : ( MultivariateOnlineSummarizer , MultiClassSummarizer ),
c2 : ( MultivariateOnlineSummarizer , MultiClassSummarizer )) =>
( c1 . _1 . merge ( c2 . _1 ), c1 . _2 . merge ( c2 . _2 ))
instances . treeAggregate (
new MultivariateOnlineSummarizer , new MultiClassSummarizer )( seqOp , combOp )
}
val histogram = labelSummarizer . histogram
val numInvalid = labelSummarizer . countInvalid
val numClasses = histogram . length
val numFeatures = summarizer . mean . size
if ( numInvalid != 0 ) {
val msg = s "Classification labels should be in {0 to ${numClasses - 1} " +
s "Found $numInvalid invalid labels."
logError ( msg )
throw new SparkException ( msg )
}
if ( numClasses > 2 ) {
val msg = s "Currently, LogisticRegression with ElasticNet in ML package only supports " +
s "binary classification. Found $numClasses in the input dataset."
logError ( msg )
throw new SparkException ( msg )
}
// 特征的mean和std.
val featuresMean = summarizer . mean . toArray
val featuresStd = summarizer . variance . toArray . map ( math . sqrt )
// L1, L2
val regParamL1 = $ ( elasticNetParam ) * $ ( regParam )
val regParamL2 = ( 1.0 - $ ( elasticNetParam )) * $ ( regParam )
// costFun.
val costFun = new LogisticCostFun ( instances , numClasses , $ ( fitIntercept ), $ ( standardization ),
featuresStd , featuresMean , regParamL2 )
// optimizer: 优化器.
// 如果 elasticNetParam = 0, 或者 regParam=0. 即使用L2正则,或者没有正则项. => 使用LBFGS.
// 否则,L1+L2正则,以及L1正则 => 使用OWLQN
val optimizer = if ( $ ( elasticNetParam ) == 0.0 || $ ( regParam ) == 0.0 ) {
new BreezeLBFGS [ BDV [ Double ]]( $ ( maxIter ), 10 , $ ( tol ))
} else {
def regParamL1Fun = ( index : Int ) => {
// Remove the L1 penalization on the intercept
if ( index == numFeatures ) {
0.0
} else {
if ( $ ( standardization )) {
regParamL1
} else {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
if ( featuresStd ( index ) != 0.0 ) regParamL1 / featuresStd ( index ) else 0.0
}
}
}
new BreezeOWLQN [ Int , BDV [ Double ]]( $ ( maxIter ), 10 , regParamL1Fun , $ ( tol ))
}
// 初始化cofficients为0.
val initialCoefficientsWithIntercept =
Vectors . zeros ( if ( $ ( fitIntercept )) numFeatures + 1 else numFeatures )
// 对于二分类,当将coefficients初始化为0时,如果根据label的分布对intercept进行初始化,收敛会更快.
// b = log(P(1)/P(0))
if ( $ ( fitIntercept )) {
/*
For binary logistic regression, when we initialize the coefficients as zeros,
it will converge faster if we initialize the intercept such that
it follows the distribution of the labels.
P(0) = 1 / (1 + \exp(b)), and
P(1) = \exp(b) / (1 + \exp(b))
, hence
b = \log{P(1) / P(0)} = \log{count_1 / count_0}
*/
initialCoefficientsWithIntercept . toArray ( numFeatures )
= math . log ( histogram ( 1 ) / histogram ( 0 ))
}
// 开始迭代.
val states = optimizer . iterations ( new CachedDiffFunction ( costFun ),
initialCoefficientsWithIntercept . toBreeze . toDenseVector )
// 获取迭代的结果.
val ( coefficients , intercept , objectiveHistory ) = {
/*
Note that in Logistic Regression, the objective history (loss + regularization)
is log-likelihood which is invariance under feature standardization. As a result,
the objective history from optimizer is the same as the one in the original space.
*/
val arrayBuilder = mutable . ArrayBuilder . make [ Double ]
var state : optimizer.State = null
while ( states . hasNext ) {
state = states . next ()
arrayBuilder += state . adjustedValue
}
if ( state == null ) {
val msg = s "${optimizer.getClass.getName} failed."
logError ( msg )
throw new SparkException ( msg )
}
// 当权重coefficients在归一化空间中训练时,我们需要将它们转回到原始空间.
// 注意,归一化空间的intercept,和原始空间是一样的,不需要归一化.
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state . x . toArray . clone ()
var i = 0
while ( i < numFeatures ) {
rawCoefficients ( i ) *= { if ( featuresStd ( i ) != 0.0 ) 1.0 / featuresStd ( i ) else 0.0 }
i += 1
}
//
if ( $ ( fitIntercept )) {
( Vectors . dense ( rawCoefficients . dropRight ( 1 )). compressed , rawCoefficients . last ,
arrayBuilder . result ())
} else {
( Vectors . dense ( rawCoefficients ). compressed , 0.0 , arrayBuilder . result ())
}
}
if ( handlePersistence ) instances . unpersist ()
// 统计状态.
val model = copyValues ( new LogisticRegressionModel ( uid , coefficients , intercept ))
val logRegSummary = new BinaryLogisticRegressionTrainingSummary (
model . transform ( dataset ),
$ ( probabilityCol ),
$ ( labelCol ),
$ ( featuresCol ),
objectiveHistory )
model . setSummary ( logRegSummary )
}
四、正则
L2 regularization -> ridge
L1 regularization -> lasso
mix L1 and L2 -> elastic Net
相应的公式:
\[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}{w_{i}}^2\]
\[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}|w_i|\]
\[l_{reg}(\vec{w})=\lambda\sum_{i=1}^{N}(\frac{\alpha}{2}{w_i}^2+(1-\alpha)|w_i|)\]
对应到后面的代码里:
regPram = regParamL1 + regParamL2
val regParamL1 = $ ( elasticNetParam ) * $ ( regParam )
val regParamL2 = ( 1.0 - $ ( elasticNetParam )) * $ ( regParam )
两种正则化方法L1和L2。L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。
LBFGS和OWLQN
ok,我们知道,模型本身基本上核心代码就落在了这两个方法上:LBFGS和OWLQN。两者都是牛顿法的变种,核心思想是:
\[\vec{w}_{n+1}=\vec{w_n}-H^{-1}\vec{G}\]
关于Hessian矩阵的计算,此处不做过多解释,如果你有兴趣想深究下数学实现。也可以再看一下breeze库里的这两个方法实现:
L-BFGS: Limited-memory BFGS。其中BFGS代表4个人的名字:broyden–fletcher–goldfarb–shanno
OWL-QN: (Orthant-Wise Limited-Memory Quasi-Newton)算法。
关于breezey库,这里再简单提一下:
breeze库用于数值处理。它的目标是通用、简洁、强大,不需要牺牲太多性能。当前版本0.12。我们所熟悉的spark中的MLlib(经常见到的线性算法库:breeze.linalg、最优化算法库:breeze.optimize)就是在它的基础上构建的。另外它还提供了图绘制的功能(breeze.plot)。
总结
整个过程基本都ok了。L1还是L2的选择,看你的具体调参。另外再提一些注意事项。
缺省会对特征做归一化,对于一些场景(比如推荐),离散化的特征,归一化没啥意义。反倒可能会影响结果好坏。
对于截距b,会使用正负样本比例,进行log(P(1)/P(0))初始化。收敛会更快。
cache机制(StorageLevel.MEMORY_AND_DISK)。当内存不够用,可能会影响性能,这一点不好。
参考:
1.LogisticRegression源码
2.breeze
3.breeze文档
lr
Updated on November 25, 2016
d0evi1