问题
首先定义问题,我要用线上一段时间的行为日志做逻辑回归训练,行为数据中主要包括物品id和用户id以及场景信息,我要把他们处理成一些用户相关特征,物品相关特征以及场景特征,物品特征在物品模型中,大约2.9G,大约1426w条,而用户特征包含在用户模型中,大约2.6G,共计171w条,这些都是通过id做关联的,我现在要做的是把他们关联在一起,变成一个spark的 LabeledPoint 特征矩阵。
初始解决方案
初始想的是,我只要把用户模型和歌曲模型做成两个广播变量,然后直接在行为日志中处理就可以了。
大概代码是这样
usermodel_broad = sc.broadcast(usermodel.collectAsMap())
songinfo_broad = sc.broadcast(songinfo.collectAsMap())
if usermodel_broad.value.has_key(cuid):
usermodel = usermodel_broad.value[cuid]
if songinfo_broad.value.has_key(sid):
songinfo = songinfo_broad.value[sid]
后面就直接报错,
spark.yarn.executor.memoryOverhead
Container killed by YARN for exceeding memory limits. 18.1 GB of 18 GB physics
基本上这个量级别跑不起来,很明显就是广播变量太大了,然后分发到每个executor节点上,占用内存过多,加上处理需要的内存,直接被kill了。
首先想到的是通过序列化来压缩广播变量,看了下说是Kryo节省空间相当可观,但是一个是kryo需要用Java或者Scala,另一个是考虑序列化之后到查找的时候还需要反序列化,可能内存并不能节省多少。
然后就想到通过调整参数,让executor的memory变大一点。参数包括
–executor-memory 20g –conf spark.default.parallelism=1000 –conf spark.storage.memoryFraction=0.6 –conf spark.shuffle.memoryFraction =0.35
大概策略是让executor的内存变大一点,处理的时候parallelism更大,这样相当于把处理数据分发到跟多的executor上,处理内存变大,shuffle内存变小一点,后面发现这些调整基本上没用,通过反复尝试发现在所有数据抽取30%能跑起来,但是达不到我的要求
其实后面发现了这样一段
数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
感觉2G就是广播变量的最大限制吧,超过2G就别用广播变量了,以后算是涨个经验。
通过join优化
刚开始执着于广播变量,把物品模型搞成广播变量,用户模型和行为一起join,代码大概这样
songinfo_broad = sc.broadcast(songinfo.collectAsMap())
usermodel = sc.textFile(BASE_DIR + "input/sim").map(parserUsermodel).partitionBy(1000)
points = sc.textFile(BASE_DIR + "action").map(parserWireless).filter(lambda x:(x[0] != -1)).partitionBy(1000)
actionSlope = points.join(usermodel).map(parserAction).filter(lambda x:(x.label != -1))
基本上还是memoryOverhead,然后就想到了是数据倾斜了,通过统计行为数据中uid的出现次数,某段时间的行为数据共计5103w条,单个uid最大出现12w条,超过5w的有5条,然后就准备过滤掉试试,然后发现还是memoryOverhead
然后就考虑彻底解决,把uid量大的分成一份,其他的一份,两份单独join,然后union到一起,日志用uid共计107w,前8w的uid占了一般的行为,把这8w单拿出来作为一份数据,大概代码这样
songinfo_broad = sc.broadcast(songinfo.collectAsMap())
usermodel = sc.textFile(BASE_DIR + "input/sim").map(parserUsermodel).partitionBy(1000)
points = sc.textFile(BASE_DIR + "action").map(parserWireless).filter(lambda x:(x[0] != -1)).partitionBy(1000)
pointsCount = points.map(lambda x:(x[0], 1)).reduceByKey(add).collectAsMap()
userTop = sc.parallelize(sorted(pointsCount.items(), lambda x, y: cmp(x[1], y[1]), reverse=True)[0:80000])
filterUser_borad = sc.broadcast(userTop.map(lambda x:(x[0], 1)).collectAsMap())
filterPoints = points.filter(lambda x: (x[0] in filterUser_borad.value))
normalPoints = points.filter(lambda x: (x[0] not in filterUser_borad.value))
filterUsermodel = usermodel.filter(lambda x: (x[0] in filterUser_borad.value))
normalUsermodel = usermodel.filter(lambda x: (x[0] not in filterUser_borad.value))
filterAction = filterPoints.join(filterUsermodel)
normalAction = normalPoints.join(normalUsermodel)
action = normalAction.union(filterAction).map(parserAction).filter(lambda x:(x.label != -1))
然后发现还是memoryOverhead,有点无语,感觉肯定不是join的问题了,就单独把用户模型和行为往一起join,先不用广播变量的用户模型,发现没有可以join,而且很快十几分钟就出来结果了。
于是就有了思路了,歌曲也是存在top效应,然后拆成两份,也通过先join后union,然后就都成功了。
内存释放
等把所有的优化放一起跑的时候,又出问题了
Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
查了一下,大概意思是我们跑出来的结果很多都缓存在内存中的,而前面的跑完后面的也不会用了,但是内存没有释放,然后GC的压力比较大,就会出这个错误,所以我们用的时候要在恰当的时候
usermodel.unpersist()
或者对广播变量恰当的
borad.destroy()
理论上能够解决这个问题。
当然我用了一个更简单粗暴的方法,就是分两个任务,第一个任务分两拨join数据,union数据,相当于数据准备,然后第二个任务训练,这样训练的时候基本上只要跑第二个任务了,当然上线要一起跑。
总结
spark的优化还是有一些坑要踩的,踩过了就好了,spark经典的书不太多,有两本数可以看看,第一本入门,第二本遇到优化问题可以读读,有大量源码和代码逻辑分析。
- spark快速大数据分析
- Spark技术内幕 深入解析Spark内核架构设计与实现原理