当前位置 : 首页 > 徐州软件开发技术论坛 > 基于大数据和机器学习的Web异常参数检测系统Demo实现

基于大数据和机器学习的Web异常参数检测系统Demo实现

作者:徐州软件公司  文章来源:徐州软件公司   阅读次数:441 次
2017-06-26

此文章是徐州软件公司整理的技术资料,由徐州软件公司总务部发布,邮箱:zongwu@xuzhousoft.com

江苏徐软信息科技有限公司江苏徐软信息科技有限公司

一、算法一般过程

隐马尔可夫模型是一个统计模型,可以利用这个模型解决三类基本问题:

Ø  学习问题:给定观察序列,学习出模型参数

Ø  评估问题:已知模型参数,评估出观察序列出现在这个模型下的概率

Ø  解码问题:已知模型参数和给出的观察序列,求出可能性最大的隐藏状态序列

这里我们是要解决前两类问题,使用白样本数据学习出模型和参数基线,计算检测数据在该模型下出现的可能性,如果得分低于基线就可以认为这个参数异常,产出告警。算法可分为训练过程和检测过程,算法本身我这里不在细说,这里重点讲一下参数的抽取和泛化。

1.png2.png

参数的抽取

对http请求数据进行拆解,提取如下参数,这部分的难点在于如何正确的识别编码方式并解码:

Ø  GET、POST、Cookie请求参数

Ø  GET、POST、Cookie参数名本身

Ø  请求的URL路径

Ø  http请求头,如Content_type、Content-Length(对应strust2-045)

参数泛化

需要将参数值泛化为规律性的观测经验,并取字符的unicode数值作为观察序列,泛化的方法如下:

Ø  大小写英文字母泛化为”A”,对应的unicode数值为65

Ø  数字泛化为”N”,对应的unicode数值为78

Ø  中文或中文字符泛化为“C”,对应的unicode数值为67

Ø  特殊字符和其他字符集的编码不作泛化,直接取unicode数值

Ø  参数值为空的取0

三、系统架构

在训练过程中要使用尽可能多的历史数据进行训练,这显然是一个批(batch)计算过程;在检测过程中我们希望能够实时的检测数据,及时的发现攻击,这是一个流(streaming)计算过程。典型的批+流式框架如Cisco的Opensoc使用开源大数据架构,kafka作为消息总线,Storm进行实时计算,Hadoop存储数据和批量计算。但是这样的架构有一个缺点,我们需要维护Storm和MapReduce两套不同的代码。考虑到学习成本,使用Spark作为统一的数据处理引擎,即可以实现批处理,也可以使用spark streaming实现近实时的计算。

3.png

系统架构如上图,需要在spark上运行三个任务,sparkstreaming将kafka中的数据实时的存入hdfs;训练算法定期加载批量数据进行模型训练,并将模型参数保存到Hdfs;检测算法加载模型,检测实时数据,并将告警保存到ES。

四、Spark简介

Apache Spark是一个快速通用的大数据计算框架,由Scala语言实现,同时提供Java、python、R语言的API接口。相比于Hadoop的Mapreduce,Spark可以实现在内存中计算,具有更高的计算速度,并且spark streaming提供流数据计算框架,以类似批处理的方式处理流数据。

RDD

RDD是Spark中抽象的数据结构类型,是一个弹性分布式数据集,数据在Spark中被表示为RDD。RDD提供丰富的API接口,实现对数据的操作,如map、flatmap、reduce、filter、groupby等等。

DStream

DStream(离散数据流)是Spark Streaming中的数据结构类型,它是由特定时间间隔内的数据RDD构成,可以实现与RDD的互操作,Dstream也提供与RDD类似的API接口。

DataFrame

DataFrame是spark中结构化的数据集,类似于数据库的表,可以理解为内存中的分布式表,提供了丰富的类SQL操作接口。

五、数据采集与存储

获取http请求数据通常有两种方式,第一种从web应用中采集日志,使用logstash从日志文件中提取日志并泛化,写入Kafka,第二种可以从网络流量中抓包提取http信息。我这里使用第二种,用python结合Tcpflow采集http数据,在数据量不大的情况下可稳定运行。

数据采集

与Tcpdump以包单位保存数据不同,Tcpflow是以流为单位保存数据内容,分析http数据使用tcpflow会更便捷。Tcpflow在linux下可以监控网卡流量,将tcp流保存到文件中,因此可以用python的pyinotify模块监控流文件,当流文件写入结束后提取http数据,写入Kafka,Python实现的过程如下图。

4.png

核心代码:

#子进程,处理数据到kafka
queue = Queue()
threadKafka=Process(target=processKafka,args=(queue,options.kafka,options.topic))
threadKafka.start()
#子线程,开启并监控TCPFLOW
tempDir=tempfile.mkdtemp()
threadPacp=threading.Thread(target=processPcap,args=(tempDir,tcpFlowPath,tcpflow_args))
threadPacp.start()
#主进程,监控文件并生成数据
wm=pyinotify.WatchManager()
wm.add_watch(tempDir,pyinotify.ALL_EVENTS)
eventHandler=MonitorFlow(queue)
notifier=pyinotify.Notifier(wm,eventHandler)
notifier.loop()
#主进程,监控文件并生成数据
wm=pyinotify.WatchManager()
wm.add_watch(tempDir,pyinotify.ALL_EVENTS)
eventHandler=MonitorFlow(queue)
notifier=pyinotify.Notifier(wm,eventHandler)
notifier.loop()

数据储存:

开启一个SparkStreaming任务,从kafka消费数据写入Hdfs,Dstream的python API没有好的入库接口,需要将Dstream的RDD转成DataFrame进行保存,保存为json文件。

 

核心代码:

topic = {in_topic: in_topic_partitions}

#从kafka获取数据生成Dstream
dstream = KafkaUtils.createStream(ssc, zookeeper,app_conf["app_name"], topic)
dstream = dstream.map(lambda record: json.loads(record[1]))
dstream.foreachRDD(lambda rdd: self.save(rdd))
#将RDD转成DataFrame存入Hdfs
def save(self, rdd):
    if rdd.take(1):
        df = sqlcontext.createDataFrame(rdd)
        df.write.json(app_conf["savedir"], mode="append")
    else:
        pass

六、算法实现

抽取器(Extractor)

抽取器实现原始数据的参数提取和数据泛化,传入一条json格式的http请求数据,可以返回所有参数的id、参数类型、参数名、参数的观察状态序列。

代码示例:

class Extractor(object):
    def __init__(self,data):
        self.parameter={}
        self.data=data
        self.uri = urllib.unquote(data["uri"].encode("utf-8"))
        self.path = decode(get_path(self.uri))
        self.payload = get_payload(self.uri).strip("?")
        self.get_parameter()
#提取post参数
def post(self):
    post_data=urllib.unquote(urllib.unquote(self.data["data"]))
    content_t=self.data["content_type"]
#提取urlencode编码的参数
    def ex_urlencoded(post_data):

        for p in post_data.split("&"):

            p_list = p.split("=")
            p_name = p_list[0]
            if len(p_list) > 1:

                p_value = reduce(operator.add, p_list[1:])
              #取md5作为参数id
                p_id = get_md5(self.data["host"] + self.path + decode(p_name) + self.data["method"])
                p_state = self.get_Ostate(p_value)
                p_type = "post"
                yield (p_id, p_state, p_type, p_name)
#提取json格式的参数
    def ex_json(post_data):
        post_data=json.loads(post_data)
        for p_name,p_value in post_data.items():
            p_id = get_md5(self.data["host"] + self.path + decode(p_name) + self.data["method"])
            p_state=self.get_Ostate(str(p_value))
            p_type="post"
            yield (p_id, p_state, p_type, p_name)

 

训练器(Trainer)

训练器完成对参数的训练,传入参数的所有观察序列,返回训练好的模型和profile,HMM模型使用python下的hmmlearn模块,profile取观察序列的最小得分。

核心代码:

class Trainer(object):
    def __init__(self,data):
        self.p_id=data["p_id"]
        self.p_state=data["p_states"]
    def train(self):
        Hstate_num=range(len(self.p_state))
        Ostate_num=range(len(self.p_state))
        Ostate = []
        for (index,value) in enumerate(self.p_state):
            Ostate+=value     #观察状态序列
            Hstate_num[index]=len(set(np.array(value).reshape(1,len(value))[0]))
            Ostate_num[index]=len(value)
        self.Ostate=Ostate
        self.Hstate_num=Hstate_num
        self.n=int(round(np.array(Hstate_num).mean()))#隐藏状态数
        model = GaussianHMM(n_components=self.n, n_iter=1000, init_params="mcs",covariance_type="full")
        model.fit(np.array(Ostate),lengths=Ostate_num)
#计算基线
    def get_profile(self):
        scores=np.array(range(len(self.p_state)),dtype="float64")
        for (index,value) in enumerate(self.p_state):
            scores[index]=self.model.score(value)
        self.profile=float(scores.min())
        self.scores=scores

训练任务

Spark训练任务抽取所有http请求数据的参数,并按照参数ID分组,分别进行训练,将训练模型保存到Hdfs。

核心代码:

#读取原始数据  
  df =sqlcontext.read.json(self.app_conf["data_dir"])
    rdd=df.toJSON()
#过滤出请求数据
    p_rdd=rdd.filter(self.filter).cache()
#抽取数据参数
    p_rdd=p_rdd.flatMap(self.extract).cache()
    p_list=p_rdd.collect()
    p_dict={}
#按照参数ID分组
    for p in p_list:
        if p.keys()[0] not in p_dict.keys():
            p_dict[p.keys()[0]]={}
            p_dict[p.keys()[0]]["p_states"]=[p.values()[0]["p_state"]]
            p_dict[p.keys()[0]]["p_type"]=p.values()[0]["p_type"]
            p_dict[p.keys()[0]]["p_name"] = p.values()[0]["p_name"]
        p_dict[p.keys()[0]]["p_states"].append(p.values()[0]["p_state"])
    for key in p_dict.keys():
        if len(p_dict[key]["p_states"]) <self.app_conf["min_train_num"]:

            p_dict.pop(key)

    models=[] #训练参数模型      for p_id in p_dict.keys():          data={}          data["p_id"]=p_id          data["p_states"]=p_dict[p_id]["p_states"]          trainer=Trainer(data)          (m,p)=trainer.get_model()          model = {}          model["p_id"] = p_id          model["p_type"]=p_dict[p_id]["p_type"]          model["p_name"] = p_dict[p_id]["p_name"]          model["model"] = pickle.dumps(m)          model["profile"] = p          models.append(model)          logging.info("[+]Trained:%s,num is %s"%(p_id,trained_num))          trained_num+=1
#保存模型参数到Hdfs,保存为Json文件
    model_df=sqlcontext.createDataFrame(models)
    date=time.strftime("%Y-%m-%d_%H-%M")
    path="hdfs://%s:8020%smodel%s.json"%(self.app_conf["namenode_model"],self.app_conf["model_dir"],date)
    model_df.write.json(path=path)

检测任务

Spark Streaming检测任务实时获取kafka流数据,抽取出数据的参数,如果参数有训练模型,就计算参数得分,小于基线输出告警到Elasticsearch。

核心代码:

#获取模型参数
    model_data = sqlcontext.read.json(self.app_conf["model_dir"]).collect()
    model_keys=[0]*len(model_data)
    for index,model_d in enumerate(model_data):
        model_keys[index]=model_d["p_id"]
    ssc=StreamingContext(sc,20)
    model_data = ssc._sc.broadcast(model_data)
    model_keys = ssc._sc.broadcast(model_keys)
    zookeeper = self.app_conf["zookeeper"]
    in_topic = self.app_conf["in_topic"]
    in_topic_partitions = self.app_conf["in_topic_partitions"]
    topic = {in_topic: in_topic_partitions}
#获取kafka数据
    dstream = KafkaUtils.createStream(ssc, zookeeper, self.app_conf["app_name"], topic)
#过滤出请求数据
    dstream=dstream.filter(self.filter)
#对每条数据进行检测
    dstream.foreachRDD(
       lambda rdd: rdd.foreachPartition(
           lambda iter:self.detector(iter,model_data,model_keys)
       )
    )
    ssc.start()
    ssc.awaitTermination()
def detector(self, iter,model_data,model_keys):
    es = ES(self.app_conf["elasticsearch"])
    index_name = self.app_conf["index_name"]
    type_name = self.app_conf["type_name"]
    model_data=model_data.value
    model_keys=model_keys.value
    for record in iter:
        record=json.loads(record[1])
        try:
#抽取数据参数
parameters = Extractor(record).parameter
            for (p_id, p_data) in parameters.items():
                if p_id in model_keys:
                    model_d = model_data[model_keys.index(p_id)]
                    model = pickle.loads(model_d.model)
                    profile = model_d.profile
                    score = model.score(np.array(p_data["p_state"]))
                    if score < profile:
#小于profile的参数数据输出告警到es
alarm = ES.pop_null(record)
                        alarm["alarm_type"] = "HmmParameterAnomaly "
                        alarm["p_id"] = p_id
                        alarm["p_name"] = model_d.p_name
                        alarm["p_type"] = model_d.p_type
                        alarm["p_profile"] = profile
                        alarm["score"] = score
                        es.write_to_es(index_name, type_name, alarm)
        except (UnicodeDecodeError, UnicodeEncodeError):

七、总结

所有的机器学习算法都大致可分为训练、检测阶段,基于HMM的web参数异常检测是其中的典型代表,本文尝试将机器学习算法在大数据环境下使用,所有用到的代码都会在Github上公开(其实数据抽取部分并不完美,欢迎提出好的建议)。

江苏徐软信息科技有限公司(简称徐州软件公司)是徐州软件公司中成立时间最长、技术能力最强、经济实力最雄厚的徐州软件开发公司之一,专业的徐州软件开发团队,从事徐州软件开发10年,一直保持着徐州软件开发行业排头兵的地位。徐州软件公司徐州软件开发行业内的众多徐州软件开发公司保持着良好的合作关系,是徐州软件开发行业的领航者之一。徐州软件公司立足徐州软件开发市场,主攻徐州软件开发徐州APP开发徐州软件公司徐州ERP软件开发徐州OA软件开发徐州CRM软件开发等领域拥有大量经典案例。更多信息请访问徐州软件公司官方网站:

徐软com:http://www.xuzhousoft.com  徐软cn:http://www.xuzhousoft.com.cn
徐软app:http://app.xuzhousoft.com  淮北徐软:http://huaibei.xuzhousoft.com.cn
济宁徐软:http://jining.xuzhousoft.com.cn  亳州徐软:http://bozhou.xuzhousoft.com.cn
菏泽徐软:http://heze.xuzhousoft.com.cn  宿州徐软:http://suzhou.xuzhousoft.com.cn
枣庄徐软:http://zaozhuang.xuzhousoft.com.cn  宿迁徐软:http://suqian.xuzhousoft.com.cn
商丘徐软:http://shangqiu.xuzhousoft.com.cn  连云港徐软:http://lianyungang.xuzhousoft.com.cn
莱芜徐软:http://laiwu.xuzhousoft.com.cn  泰安徐软:http://taian.xuzhousoft.com.cn
日照徐软:http://rizhao.xuzhousoft.com.cn  开封徐软:http://kaifeng.xuzhousoft.com.cn
周口徐软:http://zhoukou.xuzhousoft.com.cn  盐城徐软:http://yancheng.xuzhousoft.com.cn
淮安徐软:http://huaian.xuzhousoft.com.cn  阜阳徐软:http://fuyang.xuzhousoft.com.cn
蚌埠徐软:http://bengbu.xuzhousoft.com.cn  临沂徐软:http://linyi.xuzhousoft.com.cn
邳州徐软:http://pizhou.xuzhousoft.com.cn  新沂徐软:http://xinyi.xuzhousoft.com.cn
沛县徐软:http://peixian.xuzhousoft.com.cn  睢宁徐软:http://suining.xuzhousoft.com.cn
丰县徐软:http://fengxian.xuzhousoft.com.cn  萧县徐软:http://xiaoxian.xuzhousoft.com.cn
砀山徐软:http://dangshan.xuzhousoft.com.cn  微山徐软:http://weishan.xuzhousoft.com.cn
永城徐软:http://yongcheng.xuzhousoft.com.cn  网络营销:http://www.f168yingxiao.com
徐州系统集成公司:http://www.0516app.com

关键字标签:徐州软件公司 徐州软件开发公司 徐州APP软件开发公司 徐州ERP软件开发公司 徐州CRM软件开发公司 徐州OA软件开发公司

下载DOC版 下载PDF版

* 以上内容由 徐州软件公司 整理


关于我们

    江苏徐软信息科技有限公司(简称徐州软件)位于国家大学科技园内,成立于2005年,注册资金1000万元,是徐州地区最具实力的集软件开发、电子商务技术服务、门户网站建设、系统集成、网络工程为一体的高科技IT技术公司之一。

技术支持

  • 售后服务电话:0516-83003411
  • 售后服务QQ:412110939
  • 售后服务邮箱:
    service@xuzhousoft.com
  • 售后投诉电话:18795428064
徐州软件公司
    扫描微信二维码即可获得
    免费信息化咨询服务

Copyright© 2005 江苏徐软信息科技有限公司 All Rights Reserved.
苏公网安备 32030302000144号  苏ICP备11059116号-5

地址:江苏省徐州市云龙区和平路57号江苏师范大学科技园4F  徐州软件公司
电话:0516-83737996 邮箱:sales@xuzhousoft.com

江苏徐软信息科技有限公司地图
江苏徐软信息科技有限公司地图
点这里关闭本窗口
×