#!/usr/bin/python2 # -*- coding:utf-8 -*- #========================================= # Filename : agent.py # Filetype : Python # Author : Colben # Create : 2016-07-28 13:36:04 #========================================= import os, sys, paramiko, re, csv from optparse import OptionParser from multiprocessing import Process, cpu_count, Lock from time import sleep, time from threading import Thread def print_process_result(): output_fmt = {} output_fmt['ERROR'] = '\033[31;5mERROR\033[0m' output_fmt['WRONG'] = '\033[33;1mWRONG\033[0m' output_fmt['RIGHT'] = '\033[32mRIGHT\033[0m' output_fmt['LOCAL'] = '\033[32mLOCAL\033[0m' csv_rfp = open('/tmp/agent_result.csv', 'rb') for record in csv.reader(csv_rfp): record[0] = record[0] + '-'*(15-len(record[0])) record[1] = output_fmt[record[1]] print '-> '.join(record) csv_rfp.close() return def write_csv(record): process_lock.acquire() csv_writer.writerow(record) process_lock.release() return def time_out(record, timeout): detail = record[2] begin_time = int(time()) while int(time()) - begin_time < timeout: if detail != record[2]: return sleep(0.5) write_csv(record) os.kill(os.getpid(), 9) return def start_timer(record, timeout): if 0 >= options.timeout: return thread_timer = Thread(target = time_out, args = (record, timeout, )) thread_timer.setDaemon(1) thread_timer.start() return def ssh_command(ip): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) record = [ip, 'ERROR', 'CONN TIMEOUT'] start_timer(record, 10) try: ssh.connect(ip, port=22, username='root', pkey=ssh_key) #ssh.connect(ip, 22, 'root', 'password') except Exception, e: record[2] = str(e) write_csv(record) return record[2], record[1] = 'TASK TIMEOUT', 'WRONG' command = re.sub(r'IIPP', ip, options.remote_cmd) start_timer(record, options.timeout) stdin, stdout, stderr = ssh.exec_command(command) err = stderr.readlines() out = stdout.readlines() if err: record[2] = ' '.join([x.strip() for x in err]) else: record[2], record[1] = ' '.join([x.strip() for x in out]), 'RIGHT' write_csv(record) return def ssh_transport(ip): record = [ip, 'ERROR', 'CONN TIMEOUT'] start_timer(record, 10) try: t = paramiko.Transport((ip, 22)) t.connect(username='root', pkey=ssh_key) #t.connect(username='root', password='password') except Exception, e: record[2] = str(e) write_csv(record) return record[2], record[1] = 'TASK TIMEOUT', 'WRONG' remote_file = re.sub(r'IIPP',ip,os.path.join(options.dst_path, os.path.basename(options.src_file))) start_timer(record, options.timeout) try: sftp = paramiko.SFTPClient.from_transport(t) sftp.put(options.src_file, remote_file) record[2], record[1] = '', 'RIGHT' except Exception, e: record[2] = str(e) write_csv(record) return def local_command(ip): command = re.sub(r'IIPP', ip, options.local_cmd) record = [ip, 'WRONG', 'TASK TIMEOUT'] start_timer(record, options.timeout) result = os.popen(command) record[2], record[1] = ' '.join([x.strip() for x in result.readlines()]), 'LOCAL' write_csv(record) return def check_pool_state(empty=False): while True: print "\033[2J\033[0;0H" for ip, process in process_list.items(): if process.is_alive(): print 'Handling', ip, '...' else: #print ip, '\033[32mfinished\033[0m' process_list.pop(ip) if not empty and options.process > len(process_list): break if empty and 0 == len(process_list):break sleep(0.5) return def start_process_pool(): for ip in ips: if options.remote_cmd: sub_process = Process(target=ssh_command, args=(ip,)) elif options.src_file: sub_process = Process(target=ssh_transport, args=(ip,)) elif options.local_cmd: sub_process = Process(target=local_command, args=(ip,)) else: print 'Unknown parameter ...' sys.exit(1) sub_process.start() process_list[ip] = sub_process check_pool_state() check_pool_state(empty=True) return def get_hosts(hosts_file): if '/dev/stdin' == hosts_file: print 'Specify no host file, enter ip manually here ...' try: fp = open(hosts_file, 'r') except Exception, e: print 'Failed to read hosts_file,', e sys.exit(1) ips = [] for line in fp: matchs = re.match(r'^(10\.\d{1,3}\.\d{1,3}\.\d{1,3})', line) if matchs: ips.append(matchs.group(1)) if 0 == len(ips): print 'No ips found, quit ...' return ips def initial(): global csv_writer, process_lock, ssh_key, ips, csv_fp, process_list try: ssh_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') csv_fp = open('/tmp/agent_result.csv', 'wb', 0) csv_writer = csv.writer(csv_fp) except Exception, e: print e sys.exit(1) ips = get_hosts(options.host) process_lock = Lock() process_list = {} return def get_options(): global options usage = '%s\n\t-R remote-cmd -H ip-file -P process-num -T timeout'%sys.argv[0] usage += '\n\t-S local-file -D remote-dir -H ip-file -P process-num -T timeout' usage += '\n\t-L local-cmd -H ip-file -P process-num -T timeout' usage += '\nThe pattern "IIPP" in options "RSDL" will be replaced by the ip contained in each process.' usage += '\nThe result file is /tmp/agent_result.csv' parser = OptionParser(usage) parser.add_option('-R', '--remote_cmd', action='store', help='Run a shell command in remote servers.') parser.add_option('-S', '--src_file', action='store', help='specify the file to remote servers.') parser.add_option('-D', '--dst_path', action='store', help='specify the path in remote servers.') parser.add_option('-L', '--local_cmd', action='store', help='Run a shell command in localhost.') parser.add_option('-H', '--host', action='store', default='/dev/stdin', help='Specify the file contains ip.') parser.add_option('-P', '--process', action='store', default=cpu_count(), type='int', help='Specify the num of processes.') parser.add_option('-T', '--timeout', action='store', default=0, type='int', help='Specify the seconds of timeout.') options, args = parser.parse_args() for opt in [options.src_file, options.dst_path, options.local_cmd]: if options.remote_cmd and opt: parser.print_help() sys.exit(1) for opt in [options.src_file, options.dst_path]: if options.local_cmd and opt: parser.print_help() sys.exit(1) if options.src_file and not options.dst_path or options.dst_path and not options.src_file: parser.print_help() sys.exit(1) if not options.remote_cmd and not options.src_file and not options.local_cmd: parser.print_help() sys.exit(1) return def main(): get_options() initial() start_process_pool() print_process_result() return 0 if '__main__' == __name__: main()