一个ssh实用工具

需求

公司电脑连接很多环境都是通过./ssh/config配置的,并且用到了ssh tunnel。

每次on duty时,经常需要到日志服务器上抓一些特定的日志。例如拉取一些失败的交易数据,包括请求和响应,给运营去找银行线下处理。

当然,运营希望看到的是关心的字段都要有,并且最好是excel格式。

写excel格式还好,关键是有些字段可能要匹配多个不通的日志(例如不同服务的日志匹配),简而言之,需要对日志进行类似join的操作。

在没有专门用于日志分析的基础设施,并且没有实时性要求的情况下,写个脚本能很有效率地解决这类问题。这时,一个实用的ssh工具就很有用,连接服务器,远程awk几次就ok。

这本来是一个很小的需求,为什么要单独写一篇博客呢? 忍不住吐槽下,靠谱的ssh库其实并不多。刚开始想找java的库,试了几个,要么几年没人维护,要么star很多但是解决不了都ssh_config配置和tunnel的问题。

最后不得已,很不情愿但是没办法,还是回到python,使用paramiko。

简单封装

import os

import paramiko


class SSHSession(object):
    def __init__(
            self,
            host,
            port=22,
            username=None,
            password=None,
            key_file_path=None,
            config_file_path='~/.ssh/config'
    ):
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._key_file_path = [key_file_path] if key_file_path else None
        self._config_file_path = config_file_path
        self.client = self._connect()

    def exec_command(self, command, timeout=None):
        stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
        return stdout.readlines(), stderr.readlines()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.client is not None:
            self.client.close()

    def _connect(self):
        client = paramiko.SSHClient()
        client._policy = paramiko.WarningPolicy()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        ssh_config = paramiko.SSHConfig()
        user_config_file = os.path.expanduser(self._config_file_path)
        with open(user_config_file) as f:
            ssh_config.parse(f)

        user_config = ssh_config.lookup(self._host)

        cfg = {
            'username': self._username or user_config.get('user', None),
            'password': self._password,
            'port': self._port or user_config.get('port', None),
            'key_filename': self._key_file_path or user_config.get('identityfile', None),
            'hostname': user_config.get('hostname', None) or self._host,
            'sock': paramiko.ProxyCommand(user_config['proxycommand']) if 'proxycommand' in user_config else None
        }

        client.connect(**cfg)
        return client

交互式shell(作者:misha):

import re

from ssh.client import SSHSession



class ShellHandler:

    def __init__(self, host, **kwargs):
        self.session = SSHSession(host, **kwargs)
        self.ssh = self.session.client
        channel = self.ssh.invoke_shell()
        self.stdin = channel.makefile('wb')
        self.stdout = channel.makefile('r')

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.ssh is not None:
            self.ssh.close()

    def execute(self, cmd):
        """

        :param cmd: the command to be executed on the remote computer
        :examples:  execute('ls')
                    execute('finger')
                    execute('cd folder_name')
        """
        cmd = cmd.strip('\n')
        self.stdin.write(cmd + '\n')
        finish = 'End-Of-Command-by-ShellHandler'
        echo_cmd = 'echo {} $?'.format(finish)
        self.stdin.write(echo_cmd + '\n')
        shin = self.stdin
        self.stdin.flush()

        shout = []
        sherr = []
        for line in self.stdout:
            if str(line).startswith(cmd) or str(line).startswith(echo_cmd):
                shout = []
            if str(line).startswith(finish):
                exit_status = int(str(line).rsplit(None, 1)[1])
                if exit_status:
                    sherr = shout
                    shout = []
                break
            else:
                s = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]').sub('', line).replace('\b', '').replace('\r', '')
                if not (finish in s):
                    shout.append(s)

        if shout and echo_cmd in shout[-1]:
            shout.pop()
        if shout and cmd in shout[0]:
            shout.pop(0)
        if sherr and echo_cmd in sherr[-1]:
            sherr.pop()
        if sherr and cmd in sherr[0]:
            sherr.pop(0)

        return shin, shout, sherr

使用

# SSHSession
with SSHSession("server0") as session:
    print session.exec_command("ls")

# ShellHandler
with ShellHandler("server2") as shell:
    shell.execute("cd /home")
    shell.execute("ls -al")
    _, out, err = shell.execute("date")
    print out
2021-03-27 14:49:15 +0800 yajw Copy old posts A