需求

公司电脑连接很多环境都是通过./ssh/config配置的,并且用到了ssh tunnel。

每次on duty时,经常需要到日志服务器上抓一些特定的日志。例如拉取一些失败的交易数据,包括请求和响应,给运营去找银行线下处理。

当然,运营希望看到的是关心的字段都要有,并且最好是excel格式。

写excel格式还好,关键是有些字段可能要匹配多个不通的日志(例如不同服务的日志匹配),简而言之,需要对日志进行类似join的操作。

在没有专门用于日志分析的基础设施,并且没有实时性要求的情况下,写个脚本能很有效率地解决这类问题。这时,一个实用的ssh工具就很有用,连接服务器,远程awk几次就ok。

这本来是一个很小的需求,为什么要单独写一篇博客呢?
忍不住吐槽下,靠谱的ssh库其实并不多。刚开始想找java的库,试了几个,要么几年没人维护,要么star很多但是解决不了都ssh_config配置和tunnel的问题。

最后不得已,很不情愿但是没办法,还是回到python,使用paramiko。

简单封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os

import paramiko


class SSHSession(object):
def __init__(
self,
host,
port=22,
username=None,
password=None,
key_file_path=None,
config_file_path='~/.ssh/config'
):
self._host = host
self._port = port
self._username = username
self._password = password
self._key_file_path = [key_file_path] if key_file_path else None
self._config_file_path = config_file_path
self.client = self._connect()

def exec_command(self, command, timeout=None):
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
return stdout.readlines(), stderr.readlines()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.client is not None:
self.client.close()

def _connect(self):
client = paramiko.SSHClient()
client._policy = paramiko.WarningPolicy()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser(self._config_file_path)
with open(user_config_file) as f:
ssh_config.parse(f)

user_config = ssh_config.lookup(self._host)

cfg = {
'username': self._username or user_config.get('user', None),
'password': self._password,
'port': self._port or user_config.get('port', None),
'key_filename': self._key_file_path or user_config.get('identityfile', None),
'hostname': user_config.get('hostname', None) or self._host,
'sock': paramiko.ProxyCommand(user_config['proxycommand']) if 'proxycommand' in user_config else None
}

client.connect(**cfg)
return client

交互式shell(作者:misha):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import re

from ssh.client import SSHSession



class ShellHandler:

def __init__(self, host, **kwargs):
self.session = SSHSession(host, **kwargs)
self.ssh = self.session.client
channel = self.ssh.invoke_shell()
self.stdin = channel.makefile('wb')
self.stdout = channel.makefile('r')

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.ssh is not None:
self.ssh.close()

def execute(self, cmd):
"""

:param cmd: the command to be executed on the remote computer
:examples: execute('ls')
execute('finger')
execute('cd folder_name')
"""
cmd = cmd.strip('\n')
self.stdin.write(cmd + '\n')
finish = 'End-Of-Command-by-ShellHandler'
echo_cmd = 'echo {} $?'.format(finish)
self.stdin.write(echo_cmd + '\n')
shin = self.stdin
self.stdin.flush()

shout = []
sherr = []
for line in self.stdout:
if str(line).startswith(cmd) or str(line).startswith(echo_cmd):
shout = []
if str(line).startswith(finish):
exit_status = int(str(line).rsplit(None, 1)[1])
if exit_status:
sherr = shout
shout = []
break
else:
s = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]').sub('', line).replace('\b', '').replace('\r', '')
if not (finish in s):
shout.append(s)

if shout and echo_cmd in shout[-1]:
shout.pop()
if shout and cmd in shout[0]:
shout.pop(0)
if sherr and echo_cmd in sherr[-1]:
sherr.pop()
if sherr and cmd in sherr[0]:
sherr.pop(0)

return shin, shout, sherr

使用

1
2
3
4
5
6
7
8
9
10
# SSHSession
with SSHSession("server0") as session:
print session.exec_command("ls")

# ShellHandler
with ShellHandler("server2") as shell:
shell.execute("cd /home")
shell.execute("ls -al")
_, out, err = shell.execute("date")
print out