使用方法
(py-ssh) ➜ py-ssh python tmp.py -husage: tmp.py [-h] [--user USER] [--password PASSWORD] [--port PORT] [--addresses IPS [IPS ...]] [--cmd CMD] [--sem SEM] [--down_host DOWN_HOST] [--files FILES [FILES ...]] [--dst_path DST_PATH]optional arguments: -h, --help show this help message and exit --user USER, -u USER Specify ssh username --password PASSWORD, -p PASSWORD Specify ssh password --port PORT, -P PORT Specify ssh port;default 22 --addresses IPS [IPS ...], -a IPS [IPS ...] Specify ssh hosts --cmd CMD, -c CMD Specify cmd for execute --sem SEM, -s SEM Number of threads; default 5 --down_host DOWN_HOST Download file server address --files FILES [FILES ...], -f FILES [FILES ...] Specify files use for copy --dst_path DST_PATH, -d DST_PATH File storage directory --dir DIR Specify storage files directory
参数解释
- user ssh登陆用户
- password ssh登陆密码
- port ssh登陆端口号
- addresses 需要登陆的主机清单,支持单主机和多主机, —addresses 192.168.50.163 192.168.50.160:192.168.50.163
- cmd 执行的命令
- sem 并发的信号量
- down_host 用于下载文件的地址, 示例: 192.168.1.10:8000
- files 需要进行拷贝到远程主机的文件, 必须搭配 dst_path 使用
- dst_path 拷贝到远程主机的目录
- dir 用于存放需要被拷贝的文件,先拷贝
files 然后 dir 里面的文件,统一放到 dstz_path下面
用法示例
python tmp.py --user cortex \ --addresses 192.168.50.163 192.168.50.160:192.168.51.163 \ --password cortex \ --files /Users/wuxing/install.sh /Users/wuxing/minicom.log \ --dst_path /tmp \ --down_host 192.168.50.101:8000 \ --cmd 'ls -l /tmp' \ --dir files \ --sem 10
脚本
import getpassimport hashlibimport osfrom concurrent.futures import ThreadPoolExecutorfrom time import perf_counterimport sysimport threadingimport paramikofrom time import sleepfrom loguru import logger as logimport argparsefrom ipaddress import ip_addresslog.remove()log.add(sys.stderr, level="INFO")log.add("run.log", level="INFO", mode="w")log.add("error.log", level="ERROR", mode="w")class SSHConnection(threading.Thread): def __init__(self, **kwargs): super(SSHConnection, self).__init__() self.username = kwargs.get('user') self.host = kwargs.get('host') self.port = kwargs.get('port') self.password = kwargs.get('password') self.keyfile = self.get_keyfile() self.sem = kwargs.get('sem') self.files = kwargs.get('files') self.down_host = kwargs.get('down_host') self.dst_path = kwargs.get('dst_path') self.cmd = kwargs.get('cmd') self.dir = kwargs.get('dir') log.info(f"directory: {self.dir}") if self.dir: if not os.path.exists(self.dir): log.error(f"{self.dir} is not exists") sys.exit(0) def get_keyfile(self, path=os.getcwd()): default_keyfile = os.path.join( os.environ.get('HOME', 'D:/'), '.ssh', 'id_rsa') if 'id_rsa' in os.listdir(path): keyfile = os.path.join(path, 'id_rsa') elif os.path.isfile(default_keyfile): keyfile = default_keyfile else: keyfile = '' return keyfile def connect(self): try: transport = paramiko.Transport((self.host, self.port)) except Exception: log.error(f"{self.host} is down!") exit(0) if self.password: transport.connect(username=self.username, password=self.password) elif self.keyfile: transport.connect( username=self.username, pkey=paramiko.RSAKey.from_private_key_file(self.keyfile)) else: password = getpass.getpass("Password for %s@%s: " % (self.username, self.host)) transport.connect(username=self.username, password=password) self._transport = transport log.info("Connected to %s as %s" % (self.host, self.username)) def close(self): try: self._transport.close() except AttributeError: pass def run_cmd(self, command): ssh = paramiko.SSHClient() ssh._transport = self._transport stdin, stdout, stderr = ssh.exec_command(command) res = stdout.read().decode() error = stderr.read().decode() if error.strip(): # return error raise Exception(error) else: return res def trans_file(self, localpath, remotepath, method=''): sftp = paramiko.SFTPClient.from_transport(self._transport) if method == 'put': sftp.put(localpath, remotepath) print("File %s has uploaded to %s" % (localpath, remotepath)) elif method == 'get': sftp.get(remotepath, localpath) print("File %s has saved as %s" % (remotepath, localpath)) else: print('usage: trans_file(localpath, remotepath, method="get/put"') def generter_md5(self, filepath): with open(filepath, 'rb') as f: data = f.read() return hashlib.md5(data).hexdigest() def load_directory(self): files = [] for root, dirs, filenames in os.walk(self.dir): for f in filenames: filepath = os.path.join(root, f) files.append(filepath) return files def donwload_files(self, filepath, down_host): key = os.path.basename(filepath) download_script = """ if [ -f {path}/{name} ];then echo "{path}/{name} exists!" rm -rf {path}/{name} else echo "{path}/{name} not exists!" fi wget -q http://{host}/{name} -O {path}/{name} """.format(path=self.dst_path, name=key, host=down_host) self.run_cmd(download_script) dst_md5 = self.run_cmd( "md5sum {path}/{name} | cut -d ' ' -f 1".format(path=self.dst_path, name=key)).strip() src_md5 = self.generter_md5(filepath) log.debug(f"remote ssh file md5:{dst_md5}, local file md5:{src_md5}") if dst_md5 != src_md5: log.error(f"md5校验失败, {self.dst_path}/{key} md5: {dst_md5}, local md5: {src_md5}") sys.exit(0) log.info(f"{self.host}, {self.dst_path}/{key} 下载完成,md5校验成功!") @staticmethod def find_ips(start, end): start = ip_address(start) end = ip_address(end) + 1 result = [] while start < end: result.append(str(start)) start += 1 return result def run(self) -> None: try: self.sem.acquire() self.connect() log.debug(f"files: {self.files}") if self.files: for filepath in self.files: self.donwload_files(filepath, self.down_host) log.info(f"directory: {self.dir}") if self.dir: log.info(f"run dir: {self.dir}") self.files = self.load_directory() for filepath in self.files: self.donwload_files(filepath, f"{self.down_host}/{self.dir}") if self.cmd: log.info(self.run_cmd(self.cmd)) finally: self.sem.release() def __del__(self): self.close()def run(**kwargs): begin = perf_counter() ips = kwargs.get('ips', None) if not ips: log.error("Must specify hosts, use '-a' or '--addresses'") sys.exit(0) user, password = kwargs.get('user', None), kwargs.get('password', None) threads = [] for ip in ips: log.debug(f"ip:{ip}") if ':' in ip: start, end = ip.split(':') for i in SSHConnection.find_ips(start, end): kwargs['host'] = i threads.append(SSHConnection(**kwargs)) else: kwargs['host'] = ip threads.append(SSHConnection(**kwargs)) for t in threads: t.start() for t in threads: t.join() end = perf_counter() print("Cost time:", end - begin)if __name__ == '__main__': # run() parser = argparse.ArgumentParser() parser.add_argument('--user', '-u', dest="user", action='store', help='Specify ssh username') parser.add_argument('--password', '-p', dest="password", action="store", help='Specify ssh password') parser.add_argument('--port', '-P', dest="port", action='store', default=22, type=int, help='Specify ssh port;default 22') parser.add_argument('--addresses', '-a', dest="ips", action='store', nargs='+', type=str, help='Specify ssh hosts') parser.add_argument('--cmd', '-c', dest="cmd", action='store', type=str, help="Specify cmd for execute") parser.add_argument('--sem', '-s', dest="sem", action='store', type=int, help="Number of threads; default 5", default=5) parser.add_argument('--down_host', dest='down_host', action='store', help="Download file server address") copy_group = parser.add_argument_group() copy_group.add_argument('--files', '-f', dest="files", action='store', nargs='+', type=str, help="Specify files use for copy") copy_group.add_argument('--dst_path', '-d', dest="dst_path", action="store", help="File storage directory") copy_group.add_argument('--dir', dest="dir", action='store', help='Specify storage files directory') args = parser.parse_args() print(args) sem = threading.Semaphore(args.sem) params = { "user": args.user, "password": args.password, "ips": args.ips, "port": args.port, "sem": sem, "files": args.files, "dst_path": args.dst_path, "cmd": args.cmd, "down_host": args.down_host, "dir": args.dir } run(**params)