Python网络安全基础:二、批量扫描和嗅探主机端口

在上一篇文章《Python网络安全基础:一、编写一个简单的端口扫描器》 中,我们介绍了端口扫描的作用,并使用Python编写了一个简单的端口识别扫描器。那个端口识别扫描器实在是过于简单,所以在本篇文章中,我们来完善一下它。

本篇文章所使用的靶机还是来自于metasploitable2虚拟机,没有下载安装的小伙伴可以提前下载安装好。

一、端口与端口范围

在上一篇我们知道,计算机上的每一个服务都有对应的端口号,比如Web服务使用80端口号,SSH服务使用22端口号,DNS服务使用25端口号等等。

一般而言,我们要是知道了计算机上有什么服务,就很容易得知其使用的端口是多少,进而通过对端口进行连接获得相应服务的旗标。但是如果我们不知道一台计算机上运行了什么服务呢?

那就需要对所有的端口号进行扫描了。安装计算机网络的定义和规范,网络端口从1开始,到65535为止。其中:
– 1~1023号端口为公共端口,这个区间段的端口号是众所周知、约定俗成定义好的指定给了特定的网络服务,默认被分配给了各个特定的网络服务,比如Web服务的80端口,FTP服务的21端口等。当然,这些服务也可以使用其他的端口号进行服务的绑定。
– 1024~49151,这个区间段的端口为注册端口,用于分配给用户进程或应用程序。
– 49152~65535,这个区间段的端口为动态端口,这些端口号一般不固定分配某种服务,而是根据实际的使用情况动态地进行分配。

二、批量扫描端口

在了解了端口范围的相关知识后,我们如果想要对一个主机所有的端口进行扫描,最简单的做法就是,生成一个端口列表,然后对端口列表进行遍历,将遍历出来的每一个端口进行扫描。

上一篇文章中,我们的端口扫描器代码如下所示:

# coding:utf-8
import socket

s = socket.socket()
s.settimeout(3)

port = input("请输入端口号:")

try:
    s.connect(('192.168.194.131',int(port)))
    print(s.recv(1024))
    s.close()
except Exception as e:
    print(">>>扫描错误:",e)

上面的代码要我们输入一个端口号,就扫描一个端口号。下面我们将其改造成一次扫描一个区间的端口号:

# coding:utf-8

'''
    批量扫描 - 州的先生 zmister.com
'''
import socket
start_port = int(input("请输入起始端口,最小为1:"))
end_port = int(input("请输入结束端口号,最大为65535:"))
print(">>>州的先生zmister.com 端口扫描器 Beta0.1")
for port in range(start_port,end_port):
    try:
        s = socket.socket()
        s.settimeout(3)
        s.connect(('192.168.194.131',int(port)))
        print(">>>端口号:", port)
        print(s.recv(1024))
        s.close()
    except Exception as e:
        pass
        # print(">>>扫描错误:",e)
print("扫描完成!")

运行这份代码,我们需要输入起始的端口号和截止的端口号,然后通过range()方法生成一个列表进行遍历,对每一个端口号进行扫描。

我们使用20~30区间的端口号进行测试,看上去效果还不错,但是大家有没有想过,如果我们需要从端口号1扫描到端口号65535呢?

简单的for循环肯定是低效的,非常地耗时间,要想提高扫描速度,又不想漏过一些端口的扫描,我们只能使用并发手段来提高扫描效率。

三、使用多线程提高扫描效率

在Python中可以使用的并发手段有很多,比如多进程、多线程、协程、异步等等。在这里,我们使用线程池来multiprocessing库中的ThreadPool线程池。修改后的代码如下所示:

# coding:utf-8
from multiprocessing.pool import ThreadPool
import socket
import time

def scan_port(port):
    try:
        s = socket.socket()
        s.settimeout(2)
        s.connect(('192.168.194.131',int(port)))
        print("+++端口号:", port)
        print(s.recv(1024))
        s.close()
    except Exception as e:
        # print(">>>端口号关闭:",port)
        pass
if __name__ == '__main__':
    print(">>>州的先生zmister.com 端口扫描器 Beta0.2")
    start_port = int(input("请输入起始端口,最小为1:"))
    end_port = int(input("请输入结束端口号,最大为65535:"))
    start_time = time.time()
    pool = ThreadPool(processes=500)
    pool.map_async(scan_port,range(start_port,end_port))
    pool.close()
    pool.join()
    end_time = time.time()
    print("扫描耗时:",end_time - start_time)

我们在线程池中使用了500个线程来对端口进行扫描处理,当然也可以使用input()函数接收一个值作为指定的线程数,在这里作为演示,我们就写死了。运行后效果如下所示:

在程序里面我们简单的使用了time.time()计算了一下扫描所有的耗时,结果为一百多秒:

65535个端口只花了两分多钟就扫描完了,还是很快的。

这样,我们的端口批量扫描器就完成了。有什么问题欢迎留言讨论。

猜你也喜欢

  1. caiji说道:

    大佬 我改成这个样子但是有待点不对 你抽时间看下吗 能运行 但是输入后直接显示时间差没有端口扫描
    from multiprocessing.pool import ThreadPool
    import socket
    import time
    def scan_port(ip,start_port,end_port):

    for port in range(start_port,end_port):
    try:
    s = socket.socket()
    s.settimeout(3)

    s.connect((str(ip),int(port)))
    print(">>>端口号:",port)
    print(s.recv(1024))
    s.close()
    except Exception as e:
    pass
    #print(">>>扫描错误: ",e)
    if __name__ == '__main__':
    print(">>ssssssssssssssssssssss<<")
    ip = input("请输入起始ip:")
    start_port = int(input("请输入起始端口:"))
    end_port = int(input("请输入结束端口:"))
    start_time = time.time()
    pool = ThreadPool(processes=500)
    pool.map_async(scan_port,(ip,start_port,end_port))
    pool.close()
    pool.join()
    end_time = time.time()
    print("扫描耗时:",end_time- start_time)

  2. caiji说道:

    大佬 我把代码底下改了下 但是好像不支持多参数,map_async 有什么其他解决办法可以支持多参数
    from multiprocessing.pool import ThreadPool
    import socket
    import time
    def scan_port(ip,start_port,end_port):

    for port in range(start_port,end_port):
    try:
    s = socket.socket()
    s.settimeout(3)

    s.connect((str(ip),int(port)))
    print(">>>端口号:",port)
    print(s.recv(1024))
    s.close()
    except Exception as e:
    pass
    #print(">>>扫描错误: ",e)
    if __name__ == '__main__':
    print(">>ssssssssssssssssssssss<<")
    ip = input("请输入起始ip:")
    start_port = int(input("请输入起始端口:"))
    end_port = int(input("请输入结束端口:"))
    start_time = time.time()
    pool = ThreadPool(processes=500)
    pool.map_async(scan_port,(ip,start_port,end_port))
    pool.close()
    pool.join()
    end_time = time.time()
    print("扫描耗时:",end_time- start_time)

  3. 东方说道:

    请问怎么解决使用ThreadPool线程池多线程,导致端口与banner信息不一一对应的问题。

    1. 州的先生说道:

      用一个字典存端口,然后子线程中端口和banner信息一同返回

州的先生进行回复 取消回复

邮箱地址不会被公开。