ping命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障,如果有10设备,100台设备,1000台设备怎么办?一个个ping过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用python来实现批量ping测试主机。
**
代码如下:
#!/usr/bin/python3# -*- coding: utf-8 -*-import osimport argparseimport socketimport structimport selectimport timeICMP_ECHO_REQUEST = 8 # Platform specificDEFAULT_TIMEOUT = 0.1DEFAULT_COUNT = 4class Pinger(object):""" Pings to a host -- the Pythonic way"""def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):self.target_host = target_hostself.count = countself.timeout = timeoutdef do_checksum(self, source_string):""" Verify the packet integritity """sum = 0max_count = (len(source_string)/2)*2count = 0while count < max_count:val = source_string[count + 1]*256 + source_string[count]sum = sum + valsum = sum & 0xffffffffcount = count + 2if max_count<len(source_string):sum = sum + ord(source_string[len(source_string) - 1])sum = sum & 0xffffffffsum = (sum >> 16) + (sum & 0xffff)sum = sum + (sum >> 16)answer = ~sumanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return answerdef receive_pong(self, sock, ID, timeout):"""Receive ping from the socket."""time_remaining = timeoutwhile True:start_time = time.time()readable = select.select([sock], [], [], time_remaining)time_spent = (time.time() - start_time)if readable[0] == []: # Timeoutreturntime_received = time.time()recv_packet, addr = sock.recvfrom(1024)icmp_header = recv_packet[20:28]type, code, checksum, packet_ID, sequence = struct.unpack("bbHHh", icmp_header)if packet_ID == ID:bytes_In_double = struct.calcsize("d")time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]return time_received - time_senttime_remaining = time_remaining - time_spentif time_remaining <= 0:returndef send_ping(self, sock, ID):"""Send ping to the target host"""target_addr = socket.gethostbyname(self.target_host)my_checksum = 0# Create a dummy heder with a 0 checksum.header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)bytes_In_double = struct.calcsize("d")data = (192 - bytes_In_double) * "Q"data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))# Get the checksum on the data and the dummy header.my_checksum = self.do_checksum(header + data)header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1)packet = header + datasock.sendto(packet, (target_addr, 1))def ping_once(self):"""Returns the delay (in seconds) or none on timeout."""icmp = socket.getprotobyname("icmp")try:sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)except socket.error as e:if e.errno == 1:# Not superuser, so operation not permittede.msg += "ICMP messages can only be sent from root user processes"raise socket.error(e.msg)except Exception as e:print("Exception: %s" %(e))my_ID = os.getpid() & 0xFFFFself.send_ping(sock, my_ID)delay = self.receive_pong(sock, my_ID, self.timeout)sock.close()return delaydef ping(self):"""Run the ping process"""for i in range(self.count):print ("Ping to %s..." % self.target_host,)try:delay = self.ping_once()except socket.gaierror as e:print ("Ping failed. (socket error: '%s')" % e[1])breakif delay == None:print ("Ping failed. (timeout within %ssec.)" % self.timeout)else:delay = delay * 1000print("Get pong in %0.4fms" % delay)if __name__ == '__main__':alive = []host_prefix = '192.168.242.'for i in range(1, 255):host = host_prefix + str(i)pinger = Pinger(target_host=host)delay = pinger.ping_once()if delay == None:print("Ping %s 失败,超时2秒" % host)else:print("ping %s = %s ms" % (host, round(delay * 1000, 4)))alive.append(host)# time.sleep(0.5)
测试如下:
