博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
巡风源码阅读与分析---nascan.py
阅读量:5247 次
发布时间:2019-06-14

本文共 16492 字,大约阅读时间需要 54 分钟。

Nascan是巡风主要是做目标的资产识别(信息收集)

nascan.py 文件位于 nascan/nascan.py

# coding:utf-8# author:wolf@YSRCimport threadfrom lib.common import *from lib.start import *if __name__ == "__main__":    try:        CONFIG_INI = get_config()  # 读取配置        log.write('info', None, 0, u'获取配置成功')        STATISTICS = get_statistics()  # 读取统计信息        MASSCAN_AC = [0]        NACHANGE = [0]        thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程        thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效记录删除线程        socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)  # 设置连接超时        ac_data = []        while True:            now_time = time.localtime()            now_hour = now_time.tm_hour            now_day = now_time.tm_mday            now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)            cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')            log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle']))            if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 判断是否进入扫描时段                ac_data.append(now_date)                NACHANGE[0] = 0                log.write('info', None, 0, u'开始扫描')                s = start(CONFIG_INI)                s.masscan_ac = MASSCAN_AC                s.statistics = STATISTICS                s.run()            time.sleep(60)    except Exception, e:        print e

  

读取了配置,get_config() 跟进去

nascan/lib/common.py

def get_config():    config = {}    config_info = mongo.na_db.Config.find_one({"type": "nascan"})    for name in config_info['config']:        if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']:            config[name] = format_config(name, config_info['config'][name]['value'])        else:            config[name] = config_info['config'][name]['value']    return config

  

就是读取了mongodb里面Config表下的内容。

回到nascan.py

get_statistics()则是读取统计信息,返回时间。

也是位于nascan/lib/common.py

def get_statistics():    date_ = datetime.datetime.now().strftime('%Y-%m-%d')    now_stati = mongo.na_db.Statistics.find_one({"date": date_})    if not now_stati:        now_stati = {date_: {"add": 0, "update": 0, "delete": 0}}        return now_stati    else:        return {date_: now_stati['info']}

  

MASSCAN_AC 是系统来判断是否支持masscan扫描。为1的话就是masscan正在扫描。

 NACHANGE 是用来看现在的扫描列表和开始的列表有没有变化,有变化设为1

 

thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效记录删除线程socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)  # 设置连接超时

 

  

进入monitor心跳线程

位于nascan/lib/common.py

def monitor(CONFIG_INI, STATISTICS, NACHANGE):    while True:        try:            time_ = datetime.datetime.now()            date_ = time_.strftime('%Y-%m-%d')            mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})            if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0}            mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)            new_config = get_config()            if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1            CONFIG_INI.clear()            CONFIG_INI.update(new_config)        except Exception, e:            print e        time.sleep(30)

  

再次调用了get_config(),获取了配置信息,如果Config表的base64编码值如果有变化将NACHANGE[0]改成NACHANGE[1]。系统更新config,然后睡眠30秒,表示需要重新扫描。

 

返回nascan.py

Cruise()函数,位于nascan/lib/common.py

 

def cruise(STATISTICS,MASSCAN_AC):    while True:        now_str = datetime.datetime.now()        week = int(now_str.weekday())        hour = int(now_str.hour)        if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:  # 非工作时间不删除            try:                data = mongo.NA_INFO.find().sort("time", 1)                for history_info in data:                    while True:                        if MASSCAN_AC[0]:  # 如果masscan正在扫描即不进行清理                            time.sleep(10)                        else:                            break                    ip = history_info['ip']                    port = history_info['port']                    try:                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)                        sock.connect((ip, int(port)))                        sock.close()                    except Exception, e:                        time_ = datetime.datetime.now()                        date_ = time_.strftime('%Y-%m-%d')                        mongo.NA_INFO.remove({"ip": ip, "port": port})                        log.write('info', None, 0, '%s:%s delete' % (ip, port))                        STATISTICS[date_]['delete'] += 1                        del history_info["_id"]                        history_info['del_time'] = time_                        history_info['type'] = 'delete'                        mongo.NA_HISTORY.insert(history_info)            except:                pass        time.sleep(3600)

 

  

记录失效目标并删除线程,对目标(ip:port)进行sock连接,如果连接不上就删除INFO里面ipport。然后写进history表里。

 

回到nascan.py

if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 判断是否进入扫描时段

  

(now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data)

是判断是否到达扫描的周期时间。

或者就是NACHANGE[0]的值为1,任何一个成立都可以重新扫描。

进入Start()函数

nascan/lib/start.py

start类中,__init__初始化了传递过来的配置信息。直接看run(),处理目标IP地址和使用masscan进行初步扫描等。

 

def run(self):    global AC_PORT_LIST    all_ip_list = []    for ip in self.scan_list:        if "/" in ip: ip = cidr.CIDR(ip)        if not ip:continue        ip_list = self.get_ip_list(ip)        for white_ip in self.white_list:            if white_ip in ip_list:                ip_list.remove(white_ip)        if self.mode == 1:            self.masscan_path = self.config_ini['Masscan'].split('|')[2]            self.masscan_rate = self.config_ini['Masscan'].split('|')[1]            ip_list = self.get_ac_ip(ip_list)            self.masscan_ac[0] = 1            AC_PORT_LIST = self.masscan(ip_list)  # 如果安装了Masscan即使用Masscan进行全端口扫描            if not AC_PORT_LIST: continue            self.masscan_ac[0] = 0            for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)  # 加入队列            self.scan_start()  # 开始扫描        else:            all_ip_list.extend(ip_list)    if self.mode == 0:        if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)        for ip_str in all_ip_list: self.queue.put(ip_str)  # 加入队列        self.scan_start()  # TCP探测模式开始扫描

 

  

if "/" in ip: ip = cidr.CIDR(ip) ,支持这样的格式:127.0.0.1/24

if self.mode == 1 判断是否支持masscan扫描,如果支持就使用Masscan进行全端口扫描。如果没有开启,将ip添加到all_ip_list这个列表中。

 

masscan()函数

nascan/lib/start.py

def masscan(self, ip):    try:        if len(ip) == 0: return        sys.path.append(sys.path[0] + "/plugin")        m_scan = __import__("masscan")        result = m_scan.run(ip, self.masscan_path, self.masscan_rate)        return result    except Exception, e:        print e        print 'No masscan plugin detected'

  

调用了/plugin/masscan.py

def run(ip_list,path,rate):    try:        ip_file = open('target.log','w')        ip_file.write("\n".join(ip_list))        ip_file.close()        path = str(path).translate(None, ';|&')        rate = str(rate).translate(None, ';|&')        if not os.path.exists(path):return        os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate))        result_file = open('tmp.log', 'r')        result_json = result_file.readlines()        result_file.close()        del result_json[0]        del result_json[-1]        open_list = {}        for res in result_json:            try:                ip = res.split()[3]                port = res.split()[2]                if ip in open_list:                    open_list[ip].append(port)                else:                    open_list[ip] = [port]            except:pass        os.remove('target.log')        os.remove('tmp.log')        return open_list    except:        pass

  

先过滤了;|&三个特殊字符。然后拼接到命令中

masscan -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=20000

masscan扫描好了后保存tmp.log文件里然后读取结果。

不管开没开masscan,都会进入scan_start()

跟进到ThreadNum,位于/nascan/lib/start.py 

 

class ThreadNum(threading.Thread):    def __init__(self, queue):        threading.Thread.__init__(self)        self.queue = queue    def run(self):        while True:            try:                task_host = self.queue.get(block=False)            except:                break            try:                if self.mode:                    port_list = AC_PORT_LIST[task_host]                else:                    port_list = self.config_ini['Port_list'].split('|')[1].split('\n')                _s = scan.scan(task_host, port_list)                _s.config_ini = self.config_ini  # 提供配置信息                _s.statistics = self.statistics  # 提供统计信息                _s.run()            except Exception, e:                print e            finally:                self.queue.task_done()

 

  

run()函数,把IP地址和端口号列表传到另一个scan()函数中。

位于/nascan/lib/scan.py

class scan:    def __init__(self, task_host, port_list):        self.ip = task_host        self.port_list = port_list        self.config_ini = {}    def run(self):        self.timeout = int(self.config_ini['Timeout'])        for _port in self.port_list:            self.server = ''            self.banner = ''            self.port = int(_port)            self.scan_port()  # 端口扫描            if not self.banner:continue            self.server_discern()  # 服务识别            if self.server == '':                web_info = self.try_web()  # 尝试web访问                if web_info:                    log.write('web', self.ip, self.port, web_info)                    time_ = datetime.datetime.now()                    mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},                                         {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,                                                   'time': time_}})

  

scan类的run函数。先进行了端口扫描,scan_port()函数

位于/nascan/lib/scan.py

def scan_port(self):    try:        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        sock.connect((self.ip, self.port))        time.sleep(0.2)    except Exception, e:        return    try:        self.banner = sock.recv(1024)        sock.close()        if len(self.banner) <= 2:            self.banner = 'NULL'    except Exception, e:        self.banner = 'NULL'    log.write('portscan', self.ip, self.port, None)    banner = ''    hostname = self.ip2hostname(self.ip)    time_ = datetime.datetime.now()    date_ = time_.strftime('%Y-%m-%d')    try:        banner = unicode(self.banner, errors='replace')        if self.banner == 'NULL': banner = ''        mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})        self.statistics[date_]['add'] += 1    except:        if banner:            history_info = mongo.NA_INFO.find_and_modify(                query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True)            if history_info:                mongo.NA_INFO.insert(                    {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})                self.statistics[date_]['update'] += 1                del history_info["_id"]                history_info['del_time'] = time_                history_info['type'] = 'update'                mongo.NA_HISTORY.insert(history_info)

  

通过socket连接,获得端口服务返回的banner信息,然后进入server_discern()函数,通过正则表达式,依次比较,获得服务类型。

server_discern()函数

位于/nascan/lib/scan.py

def server_discern(self):    for mark_info in self.config_ini['Discern_server']: # 快速识别        try:            name, default_port, mode, reg = mark_info            if mode == 'default':                if int(default_port) == self.port:                    self.server = name            elif mode == 'banner':                matchObj = re.search(reg, self.banner, re.I | re.M)                if matchObj:                    self.server = name            if self.server:break        except:            continue    if not self.server and self.port not in [80,443,8080]:        for mark_info in self.config_ini['Discern_server']:  # 发包识别            try:                name, default_port, mode, reg = mark_info                if mode not in ['default','banner']:                    dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)                    dis_sock.connect((self.ip, self.port))                    mode = mode.decode('string_escape')                    reg = reg.decode('string_escape')                    dis_sock.send(mode)                    time.sleep(0.3)                    dis_recv = dis_sock.recv(1024)                    matchObj = re.search(reg, dis_recv, re.I | re.M)                    if matchObj:                        self.server = name                        break            except:                pass    if self.server:        log.write("server", self.ip, self.port, self.server)        mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})

  

对于没识别出来的服务类型,端口号又不是常见端口号,会重新发包,发送特定包才会返回应答banner的服务类型。

 

最后如果还没识别出来,进入try_web()函数

位于/nascan/lib/scan.py

def try_web(self):    title_str, html = '', ''    try:        if self.port == 443:            info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)        else:            info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)        html = info.read()        header = info.headers    except urllib2.HTTPError, e:        html = e.read()        header = e.headers    except:        return    if not header: return    if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']:  # 解压gzip        html_data = StringIO.StringIO(html)        gz = gzip.GzipFile(fileobj=html_data)        html = gz.read()    try:        html_code = self.get_code(header, html).strip()        if html_code and len(html_code) < 12:            html = html.decode(html_code).encode('utf-8')    except: pass    try:        title = re.search(r'(.*?)', html, flags=re.I | re.M)        if title: title_str = title.group(1)    except: pass    try:        web_banner = str(header) + "\r\n\r\n" + html        self.banner = web_banner        history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})        if 'server' not in history_info:            tag = self.get_tag()            web_info = {'title': title_str, 'tag': tag}            return web_info        else:            if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60:                del history_info['_id']                history_info['del_time'] = datetime.datetime.now()                mongo.NA_HISTORY.insert(history_info)                tag = self.get_tag()                web_info = {'title': title_str, 'tag': tag}                date_ = datetime.datetime.now().strftime('%Y-%m-%d')                self.statistics[date_]['update'] += 1                log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port))                return web_info    except:        returndef ip2hostname(self,ip):    try:        hostname = socket.gethostbyaddr(ip)[0]        return hostname    except:        pass    try:        query_data = "\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x20\x43\x4b\x41\x41" + \                     "\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41" + \                     "\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x00\x00\x21\x00\x01"        dport = 137        _s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)        _s.settimeout(3)        _s.sendto(query_data, (ip, dport))        x = _s.recvfrom(1024)        tmp = x[0][57:]        hostname = tmp.split("\x00", 2)[0].strip()        hostname = hostname.split()[0]        return hostname    except:        passdef get_code(self, header, html):    try:        m = re.search(r'
| |/)', html, flags=re.I) if m: return m.group(1).replace('"', '') except: pass try: if 'Content-Type' in header: Content_Type = header['Content-Type'] m = re.search(r'.*?charset=(.*?)(;|$)', Content_Type, flags=re.I) if m: return m.group(1) except: pass

  

这个函数就是尝试用web访问,如果有结果的话就保存下来,没有的话就不管了。

 

回到nascan

大概每隔一分钟探测是否要进行扫描。

 

 

参考文章:

 

转载于:https://www.cnblogs.com/zhengjim/p/9436008.html

你可能感兴趣的文章
新知道一个 端对端加密 Signal protocol
查看>>
Docker的镜像 导出导入
查看>>
wordpress 获取所有管理员的邮箱
查看>>
学习mysql水平分区和实践笔记
查看>>
Laravel Illuminate\Http\Exceptions\PostTooLargeException
查看>>
SQLSTATE[HY000]: General error: 1366 Incorrect string value
查看>>
phpStorm 激活
查看>>
win7 ss 启动缺少文件
查看>>
Chrome DNS_PROBE_FINISHED_NXDOMAIN
查看>>
Yii2 在php 7.2环境下运行,提示 Cannot use ‘Object’ as class name
查看>>
Ionic POST提交使用普通表单提交数据
查看>>
vue type check failed for prop . Expected Number, got String
查看>>
vagrant 导入已导出的包
查看>>
[笔记]Laravel TDD 胡乱记录
查看>>
swagger.yaml转换为swagger.json文件
查看>>
Linux sed -i 字符串替换
查看>>
学习修复Laravel The only supported ciphers are AES-128-CBC and AES-256-CBC
查看>>
git 去除本地所有没有保存的修改
查看>>
PHP Catchable fatal error: Argument 2 passed to Illuminate\Routing\UrlGenerator::__construct()
查看>>
[190308]Ubuntu 安装完之后,安装的软件小记
查看>>