ping命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障,如果有10设备,100台设备,1000台设备怎么办?一个个ping过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用python来实现批量ping测试主机。
    **

    代码如下:

    1. #!/usr/bin/python3
    2. # -*- coding: utf-8 -*-
    3. import os
    4. import argparse
    5. import socket
    6. import struct
    7. import select
    8. import time
    9. ICMP_ECHO_REQUEST = 8 # Platform specific
    10. DEFAULT_TIMEOUT = 0.1
    11. DEFAULT_COUNT = 4
    12. class Pinger(object):
    13. """ Pings to a host -- the Pythonic way"""
    14. def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
    15. self.target_host = target_host
    16. self.count = count
    17. self.timeout = timeout
    18. def do_checksum(self, source_string):
    19. """ Verify the packet integritity """
    20. sum = 0
    21. max_count = (len(source_string)/2)*2
    22. count = 0
    23. while count < max_count:
    24. val = source_string[count + 1]*256 + source_string[count]
    25. sum = sum + val
    26. sum = sum & 0xffffffff
    27. count = count + 2
    28. if max_count<len(source_string):
    29. sum = sum + ord(source_string[len(source_string) - 1])
    30. sum = sum & 0xffffffff
    31. sum = (sum >> 16) + (sum & 0xffff)
    32. sum = sum + (sum >> 16)
    33. answer = ~sum
    34. answer = answer & 0xffff
    35. answer = answer >> 8 | (answer << 8 & 0xff00)
    36. return answer
    37. def receive_pong(self, sock, ID, timeout):
    38. """
    39. Receive ping from the socket.
    40. """
    41. time_remaining = timeout
    42. while True:
    43. start_time = time.time()
    44. readable = select.select([sock], [], [], time_remaining)
    45. time_spent = (time.time() - start_time)
    46. if readable[0] == []: # Timeout
    47. return
    48. time_received = time.time()
    49. recv_packet, addr = sock.recvfrom(1024)
    50. icmp_header = recv_packet[20:28]
    51. type, code, checksum, packet_ID, sequence = struct.unpack(
    52. "bbHHh", icmp_header
    53. )
    54. if packet_ID == ID:
    55. bytes_In_double = struct.calcsize("d")
    56. time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
    57. return time_received - time_sent
    58. time_remaining = time_remaining - time_spent
    59. if time_remaining <= 0:
    60. return
    61. def send_ping(self, sock, ID):
    62. """
    63. Send ping to the target host
    64. """
    65. target_addr = socket.gethostbyname(self.target_host)
    66. my_checksum = 0
    67. # Create a dummy heder with a 0 checksum.
    68. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
    69. bytes_In_double = struct.calcsize("d")
    70. data = (192 - bytes_In_double) * "Q"
    71. data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
    72. # Get the checksum on the data and the dummy header.
    73. my_checksum = self.do_checksum(header + data)
    74. header = struct.pack(
    75. "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
    76. )
    77. packet = header + data
    78. sock.sendto(packet, (target_addr, 1))
    79. def ping_once(self):
    80. """
    81. Returns the delay (in seconds) or none on timeout.
    82. """
    83. icmp = socket.getprotobyname("icmp")
    84. try:
    85. sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    86. except socket.error as e:
    87. if e.errno == 1:
    88. # Not superuser, so operation not permitted
    89. e.msg += "ICMP messages can only be sent from root user processes"
    90. raise socket.error(e.msg)
    91. except Exception as e:
    92. print("Exception: %s" %(e))
    93. my_ID = os.getpid() & 0xFFFF
    94. self.send_ping(sock, my_ID)
    95. delay = self.receive_pong(sock, my_ID, self.timeout)
    96. sock.close()
    97. return delay
    98. def ping(self):
    99. """
    100. Run the ping process
    101. """
    102. for i in range(self.count):
    103. print ("Ping to %s..." % self.target_host,)
    104. try:
    105. delay = self.ping_once()
    106. except socket.gaierror as e:
    107. print ("Ping failed. (socket error: '%s')" % e[1])
    108. break
    109. if delay == None:
    110. print ("Ping failed. (timeout within %ssec.)" % self.timeout)
    111. else:
    112. delay = delay * 1000
    113. print("Get pong in %0.4fms" % delay)
    114. if __name__ == '__main__':
    115. alive = []
    116. host_prefix = '192.168.242.'
    117. for i in range(1, 255):
    118. host = host_prefix + str(i)
    119. pinger = Pinger(target_host=host)
    120. delay = pinger.ping_once()
    121. if delay == None:
    122. print("Ping %s 失败,超时2秒" % host)
    123. else:
    124. print("ping %s = %s ms" % (host, round(delay * 1000, 4)))
    125. alive.append(host)
    126. # time.sleep(0.5)

    测试如下:
    6、实现快速Ping一个IP网段地址 - 图1