1+ import argparse
2+ import socket # for connecting
3+
4+ from threading import Thread
5+ from queue import Queue
6+
7+ # number of threads, feel free to tune this parameter as you wish
8+ N_THREADS = 200
9+ # thread queue
10+ q = Queue ()
11+
12+ def port_scan (port ):
13+ """
14+ Scan a port on the global variable `host`
15+ """
16+ try :
17+ s = socket .socket ()
18+ s .connect ((host , port ))
19+ except :
20+ print (f"{ host :15} :{ port :5} is closed " , end = '\r ' )
21+ else :
22+ print (f"{ host :15} :{ port :5} is open " )
23+ finally :
24+ s .close ()
25+
26+
27+ def scan_thread ():
28+ global q
29+ while True :
30+ # get the port number from the queue
31+ worker = q .get ()
32+ # scan that port number
33+ port_scan (worker )
34+ # tells the queue that the scanning for that port
35+ # is done
36+ q .task_done ()
37+
38+
39+ def main (host , ports ):
40+ global q
41+ for t in range (N_THREADS ):
42+ # for each thread, start it
43+ t = Thread (target = scan_thread )
44+ t .daemon = True
45+ t .start ()
46+
47+ for worker in ports :
48+ # for each port, put that port into the queue
49+ # to start scanning
50+ q .put (worker )
51+
52+ # wait the threads ( port scanners ) to finish
53+ q .join ()
54+
55+
56+ if __name__ == "__main__" :
57+ # parse some parameters passed
58+ parser = argparse .ArgumentParser (description = "Simple port scanner" )
59+ parser .add_argument ("host" , help = "Host to scan." )
60+ parser .add_argument ("--ports" , "-p" , dest = "port_range" , default = "1-65535" , help = "Port range to scan, default is 1-65535 (all ports)" )
61+ args = parser .parse_args ()
62+ host , port_range = args .host , args .port_range
63+
64+ start_port , end_port = port_range .split ("-" )
65+ start_port , end_port = int (start_port ), int (end_port )
66+
67+ ports = [ p for p in range (start_port , end_port )]
68+
69+ main (host , ports )
0 commit comments