5 Star 15 Fork 15

OceanBase / obdeploy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ssh.py 29.02 KB
一键复制 编辑 原始数据 按行查看 历史
Rongfeng Fu 提交于 2023-05-19 18:08 . V2.1.0 (#170)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import enum
import getpass
import os
import tempfile
import warnings
from glob import glob
from subprocess32 import Popen, PIPE
# paramiko import cryptography 模块在python2下会报不支持警报
warnings.filterwarnings("ignore")
from paramiko import AuthenticationException, SFTPClient
from paramiko.client import SSHClient, AutoAddPolicy
from paramiko.ssh_exception import NoValidConnectionsError, SSHException
from multiprocessing.queues import Empty
from multiprocessing import Queue, Process
from multiprocessing.pool import ThreadPool
from tool import COMMAND_ENV, DirectoryUtil, FileUtil
from _stdio import SafeStdio
from _errno import EC_SSH_CONNECT
from _environ import ENV_DISABLE_RSYNC
__all__ = ("SshClient", "SshConfig", "LocalClient", "ConcurrentExecutor")
class SshConfig(object):
def __init__(self, host, username='root', password=None, key_filename=None, port=22, timeout=30):
self.host = host
self.username = username
self.password = password if password is None else str(password)
self.key_filename = key_filename
self.port = int(port)
self.timeout = int(timeout)
def __str__(self):
return '%s@%s' % (self.username ,self.host)
class SshReturn(object):
def __init__(self, code, stdout, stderr):
self.code = code
self.stdout = stdout
self.stderr = stderr
def __bool__(self):
return self.code == 0
def __nonzero__(self):
return self.__bool__()
class FeatureSshReturn(SshReturn, SafeStdio):
def __init__(self, popen, timeout, stdio):
self.popen = popen
self.timeout = timeout
self.stdio = stdio
self._code = None
self._stdout = None
self._stderr = None
def _get_return(self):
if self._code is None:
try:
p = self.popen
output, error = p.communicate(timeout=self.timeout)
self._stdout = output.decode(errors='replace')
self._stderr = error.decode(errors='replace')
self._code = p.returncode
verbose_msg = 'exited code %s' % self._code
if self._code:
verbose_msg += ', error output:\n%s' % self._stderr
self.stdio.verbose(verbose_msg)
except Exception as e:
self._stdout = ''
self._stderr = str(e)
self._code = 255
verbose_msg = 'exited code 255, error output:\n%s' % self._stderr
self.stdio.verbose(verbose_msg)
self.stdio.exception('')
@property
def code(self):
self._get_return()
return self._code
@property
def stdout(self):
self._get_return()
return self._stdout
@property
def stderr(self):
self._get_return()
return self._stderr
class FutureSshReturn(SshReturn):
def __init__(self, client, command, timeout=None, stdio=None):
self.client = client
self.command = command
self.timeout = timeout
self.stdio = stdio if stdio else client.stdio
if self.stdio:
self.stdio = self.stdio.sub_io()
self.finsh = False
super(FutureSshReturn, self).__init__(127, '', '')
def set_return(self, ssh_return):
self.code = ssh_return.code
self.stdout = ssh_return.stdout
self.stderr = ssh_return.stderr
self.finsh = True
class ConcurrentExecutor(object):
def __init__(self, workers=None):
self.workers = workers
self.futures = []
def add_task(self, client, command, timeout=None, stdio=None):
ret = FutureSshReturn(client, command, timeout, stdio=stdio)
self.futures.append(ret)
return ret
def size(self):
return len(self.futures)
@staticmethod
def execute(future):
client = SshClient(future.client.config, future.stdio)
future.set_return(client.execute_command(future.command, timeout=future.timeout))
return future
def submit(self):
rets = []
pool = ThreadPool(processes=self.workers)
try:
results = pool.map(ConcurrentExecutor.execute, tuple(self.futures))
for r in results:
rets.append(r)
finally:
pool.close()
self.futures = []
return rets
class LocalClient(SafeStdio):
@staticmethod
def init_env(env=None):
if env is None:
return None
env_t = COMMAND_ENV.copy()
env_t.update(env)
return env_t
@staticmethod
def execute_command_background(command, env=None, timeout=None, stdio=None):
stdio.verbose('local background execute: %s ' % command, end='')
try:
p = Popen(command, env=LocalClient.init_env(env), shell=True, stdout=PIPE, stderr=PIPE)
return FeatureSshReturn(p, timeout, stdio)
except Exception as e:
output = ''
error = str(e)
code = 255
verbose_msg = 'exited code 255, error output:\n%s' % error
stdio.verbose(verbose_msg)
stdio.exception('')
return SshReturn(code, output, error)
@staticmethod
def execute_command(command, env=None, timeout=None, stdio=None):
stdio.verbose('local execute: %s ' % command, end='')
try:
p = Popen(command, env=LocalClient.init_env(env), shell=True, stdout=PIPE, stderr=PIPE)
output, error = p.communicate(timeout=timeout)
code = p.returncode
output = output.decode(errors='replace')
error = error.decode(errors='replace')
verbose_msg = 'exited code %s' % code
if code:
verbose_msg += ', error output:\n%s' % error
stdio.verbose(verbose_msg)
except Exception as e:
output = ''
error = str(e)
code = 255
verbose_msg = 'exited code 255, error output:\n%s' % error
stdio.verbose(verbose_msg)
stdio.exception('')
return SshReturn(code, output, error)
@staticmethod
def put_file(local_path, remote_path, stdio=None):
if LocalClient.execute_command('mkdir -p %s && cp -f %s %s' % (os.path.dirname(remote_path), local_path, remote_path), stdio=stdio):
return True
return False
@staticmethod
def put_dir(local_dir, remote_dir, stdio=None):
if os.path.isdir(local_dir):
local_dir = os.path.join(local_dir, '*')
if os.path.exists(os.path.dirname(local_dir)) and not glob(local_dir):
stdio.verbose("%s is empty" % local_dir)
return True
if LocalClient.execute_command('mkdir -p %s && cp -frL %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio):
return True
return False
@staticmethod
def write_file(content, file_path, mode='w', stdio=None):
stdio.verbose('write {} to {}'.format(content, file_path))
try:
with FileUtil.open(file_path, mode, stdio=stdio) as f:
f.write(content)
f.flush()
return True
except:
stdio.exception('')
return False
@staticmethod
def get_file(local_path, remote_path, stdio=None):
return LocalClient.put_file(remote_path, local_path, stdio=stdio)
@staticmethod
def get_dir(local_path, remote_path, stdio=None):
return LocalClient.put_dir(remote_path, local_path, stdio=stdio)
class RemoteTransporter(enum.Enum):
CLIENT = 0
RSYNC = 1
def __lt__(self, other):
return self.value < other.value
def __gt__(self, other):
return self.value > other.value
class SshClient(SafeStdio):
DEFAULT_PATH = '/sbin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:'
LOCAL_HOST = ['127.0.0.1', 'localhost', '127.1', '127.0.1']
def __init__(self, config, stdio=None):
self.config = config
self.stdio = stdio
self.sftp = None
self.is_connected = False
self.ssh_client = SSHClient()
self.env_str = ''
self._remote_transporter = None
self.task_queue = None
self.result_queue = None
self._is_local = self.is_localhost() and self.config.username == getpass.getuser()
if self._is_local:
self.env = {}
else:
self.env = {'PATH': self.DEFAULT_PATH}
self._update_env()
super(SshClient, self).__init__()
def _init_queue(self):
self.task_queue = Queue()
self.result_queue = Queue()
def _update_env(self):
env = []
for key in self.env:
if self.env[key]:
env.append('export %s=%s$%s;' % (key, self.env[key], key))
self.env_str = ''.join(env)
def add_env(self, key, value, rewrite=False, stdio=None):
if key not in self.env or not self.env[key] or rewrite:
stdio.verbose('%s@%s set env %s to \'%s\'' % (self.config.username, self.config.host, key, value))
if self._is_local:
self._add_env_for_local(key, value, rewrite)
else:
self.env[key] = value
else:
stdio.verbose('%s@%s append \'%s\' to %s' % (self.config.username, self.config.host, value, key))
if self._is_local:
self._add_env_for_local(key, value, rewrite)
else:
self.env[key] += value
self._update_env()
def _add_env_for_local(self, key, value, rewrite=False):
if rewrite:
self.env[key] = value
else:
if key not in self.env:
self.env[key] = COMMAND_ENV.get(key, '')
self.env[key] += value
def get_env(self, key, stdio=None):
return self.env[key] if key in self.env else None
def del_env(self, key, stdio=None):
if key in self.env:
stdio.verbose('%s@%s delete env %s' % (self.config.username, self.config.host, key))
del self.env[key]
self._update_env()
def __str__(self):
return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port)
def is_localhost(self, stdio=None):
return self.config.host in self.LOCAL_HOST
def _login(self, stdio=None):
if self.is_connected:
return True
err = None
try:
self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
self.ssh_client.connect(
self.config.host,
port=self.config.port,
username=self.config.username,
password=self.config.password,
key_filename=self.config.key_filename,
timeout=self.config.timeout
)
self.is_connected = True
except AuthenticationException:
stdio.exception('')
err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='username or password error')
except NoValidConnectionsError:
stdio.exception('')
err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='time out')
except BaseException as e:
stdio.exception('')
err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message=e)
if err:
stdio.critical(err)
return err
return self.is_connected
def _open_sftp(self, stdio=None):
if self.sftp:
return True
if self._login(stdio=stdio):
SFTPClient.from_transport(self.ssh_client.get_transport())
self.sftp = self.ssh_client.open_sftp()
return True
return False
def connect(self, stdio=None):
if self._is_local:
return True
return self._login(stdio=stdio)
def reconnect(self, stdio=None):
self.close(stdio=stdio)
return self.connect(stdio=stdio)
def close(self, stdio=None):
if self._is_local:
return True
if self.is_connected:
self.ssh_client.close()
if self.sftp:
self.sftp = None
def __del__(self):
self.close()
def _execute_command(self, command, timeout=None, retry=3, stdio=None):
if not self._login(stdio):
return SshReturn(255, '', 'connect failed')
try:
stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
output = stdout.read().decode(errors='replace')
error = stderr.read().decode(errors='replace')
if output:
idx = output.rindex('\n')
code = int(output[idx:])
stdout = output[:idx]
verbose_msg = 'exited code %s' % code
else:
code, stdout = 1, ''
if code:
verbose_msg = 'exited code %s, error output:\n%s' % (code, error)
stdio.verbose(verbose_msg)
except SSHException as e:
if retry:
self.close()
return self._execute_command(command, retry-1, stdio)
else:
stdio.exception('')
stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e))
raise e
except Exception as e:
stdio.exception('')
code = 255
stdout = ''
error = str(e)
return SshReturn(code, stdout, error)
def execute_command(self, command, timeout=None, stdio=None):
if timeout is None:
timeout = self.config.timeout
elif timeout <= 0:
timeout = None
if self._is_local:
return LocalClient.execute_command(command, self.env if self.env else None, timeout, stdio=stdio)
verbose_msg = '%s execute: %s ' % (self.config, command)
stdio.verbose(verbose_msg, end='')
command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';').lstrip('\n'))
return self._execute_command(command, retry=3, timeout=timeout, stdio=stdio)
@property
def disable_rsync(self):
return COMMAND_ENV.get(ENV_DISABLE_RSYNC) == "1"
@property
def remote_transporter(self):
if self._remote_transporter is not None:
return self._remote_transporter
_transporter = RemoteTransporter.CLIENT
if not self._is_local and self._remote_transporter is None:
if not self.config.password and not self.disable_rsync:
ret = LocalClient.execute_command('rsync -h', stdio=self.stdio) and self.execute_command('rsync -h', stdio=self.stdio)
if ret:
_transporter = RemoteTransporter.RSYNC
self._remote_transporter = _transporter
self.stdio.verbose("current remote_transporter {}".format(self._remote_transporter))
return self._remote_transporter
def put_file(self, local_path, remote_path, stdio=None):
if not os.path.isfile(local_path):
stdio.error('path: %s is not file' % local_path)
return False
if self._is_local:
return LocalClient.put_file(local_path, remote_path, stdio=stdio)
if not self._open_sftp(stdio=stdio):
return False
return self._put_file(local_path, remote_path, stdio=stdio)
def write_file(self, content, file_path, mode='w', stdio=None):
if self._is_local:
return LocalClient.write_file(content, file_path, mode, stdio)
return self._write_file(content, file_path, mode, stdio)
def _write_file(self, content, file_path, mode='w', stdio=None):
stdio.verbose('write {} to {}: {}'.format(content, self, file_path))
try:
with tempfile.NamedTemporaryFile(mode=mode) as f:
f.write(content)
f.flush()
return self.put_file(f.name, file_path, stdio=stdio)
except:
stdio.exception('')
return False
@property
def _put_file(self):
if self.remote_transporter == RemoteTransporter.RSYNC:
return self._rsync_put_file
else:
return self._client_put_file
def _client_put_file(self, local_path, remote_path, stdio=None):
if self.execute_command('mkdir -p %s && rm -fr %s' % (os.path.dirname(remote_path), remote_path), stdio=stdio):
stdio.verbose('send %s to %s' % (local_path, remote_path))
if self.sftp.put(local_path, remote_path):
return self.execute_command('chmod %s %s' % (oct(os.stat(local_path).st_mode)[-3:], remote_path))
return False
def _rsync(self, source, target, stdio=None):
identity_option = ""
if self.config.key_filename:
identity_option += '-i {key_filename} '.format(key_filename=self.config.key_filename)
if self.config.port:
identity_option += '-p {}'.format(self.config.port)
cmd = 'rsync -a -W -e "ssh {identity_option}" {source} {target}'.format(
identity_option=identity_option,
source=source,
target=target
)
ret = LocalClient.execute_command(cmd, stdio=stdio)
return bool(ret)
def _rsync_put_dir(self, local_path, remote_path, stdio=None):
stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
source = os.path.join(local_path, '*')
if os.path.exists(os.path.dirname(source)) and not glob(source):
stdio.verbose("%s is empty" % source)
return True
target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
if self._rsync(source, target, stdio=stdio):
return True
else:
return False
def _rsync_put_file(self, local_path, remote_path, stdio=None):
if not self.execute_command('mkdir -p %s' % os.path.dirname(remote_path), stdio=stdio):
return False
stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
if self._rsync(local_path, target, stdio=stdio):
return True
else:
return False
def put_dir(self, local_dir, remote_dir, stdio=None):
if self._is_local:
return LocalClient.put_dir(local_dir, remote_dir, stdio=stdio)
if not self._open_sftp(stdio=stdio):
return False
if not self.execute_command('mkdir -p %s' % remote_dir, stdio=stdio):
return False
stdio.start_loading('Send %s to %s' % (local_dir, remote_dir))
ret = self._put_dir(local_dir, remote_dir, stdio=stdio)
stdio.stop_loading('succeed' if ret else 'fail')
return ret
@property
def _put_dir(self):
if self.remote_transporter == RemoteTransporter.RSYNC:
return self._rsync_put_dir
else:
return self._client_put_dir
def _client_put_dir(self, local_dir, remote_dir, stdio=None):
has_failed = False
ret = LocalClient.execute_command('find %s -type f' % local_dir)
if not ret:
has_failed = True
all_files = ret.stdout.strip().split('\n') if ret.stdout else []
ret = LocalClient.execute_command('find %s -type d' % local_dir)
if not ret:
has_failed = True
all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
self._filter_dir_in_file_path(all_files, all_dirs)
for local_path in all_files:
remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
if not self._client_put_file(local_path, remote_path, stdio=stdio):
stdio.error('Fail to get %s' % remote_path)
has_failed = True
for local_path in all_dirs:
remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
stat = oct(os.stat(local_path).st_mode)[-3:]
cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
if not self.execute_command(cmd):
has_failed = True
return not has_failed
def get_file(self, local_path, remote_path, stdio=None):
dirname, _ = os.path.split(local_path)
if not dirname:
dirname = os.getcwd()
local_path = os.path.join(dirname, local_path)
if os.path.exists(dirname):
if not os.path.isdir(dirname):
stdio.error('%s is not directory' % dirname)
return False
elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
return False
if os.path.exists(local_path) and not os.path.isfile(local_path):
stdio.error('path: %s is not file' % local_path)
return False
if self._is_local:
return LocalClient.get_file(local_path, remote_path, stdio=stdio)
if not self._open_sftp(stdio=stdio):
return False
return self._get_file(local_path, remote_path, stdio=stdio)
@property
def _get_file(self):
if self.remote_transporter == RemoteTransporter.RSYNC:
return self._rsync_get_file
else:
return self._client_get_file
def _rsync_get_dir(self, local_path, remote_path, stdio=None):
source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
if "*" not in remote_path:
source = os.path.join(source, "*")
target = local_path
stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
if LocalClient.execute_command('mkdir -p {}'.format(local_path), stdio=stdio) and self._rsync(source, target, stdio=stdio):
return True
else:
return False
def _rsync_get_file(self, local_path, remote_path, stdio=None):
source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
target = local_path
stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
if self._rsync(source, target, stdio=stdio):
return True
else:
return False
def _client_get_file(self, local_path, remote_path, stdio=None):
try:
self.sftp.get(remote_path, local_path)
stat = self.sftp.stat(remote_path)
os.chmod(local_path, stat.st_mode)
return True
except Exception as e:
stdio.exception('get %s from %s@%s:%s failed: %s' % (local_path, self.config.username, self.config.host, remote_path, e))
return False
def get_dir(self, local_dir, remote_dir, stdio=None):
dirname, _ = os.path.split(local_dir)
if not dirname:
dirname = os.getcwd()
local_dir = os.path.join(dirname, local_dir)
if "*" in dirname:
stdio.error('Invalid directory {}'.format(dirname))
return False
if os.path.exists(dirname):
if not os.path.isdir(dirname):
stdio.error('%s is not directory' % dirname)
return False
elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
return False
if os.path.exists(local_dir) and not os.path.isdir(local_dir):
stdio.error('%s is not directory' % local_dir)
return False
if self._is_local:
return LocalClient.get_dir(local_dir, remote_dir, stdio=stdio)
if not self._open_sftp(stdio=stdio):
return False
stdio.start_loading('Get %s from %s' % (local_dir, remote_dir))
ret = self._get_dir(local_dir, remote_dir, stdio=stdio)
stdio.stop_loading('succeed' if ret else 'fail')
return ret
@property
def _get_dir(self):
if self.remote_transporter == RemoteTransporter.RSYNC:
return self._rsync_get_dir
else:
return self._client_get_dir
def _client_get_dir(self, local_dir, remote_dir, stdio=None):
task_queue = []
has_failed = False
if DirectoryUtil.mkdir(local_dir, stdio=stdio):
try:
ret = self.execute_command('find %s -type f' % remote_dir)
if not ret:
stdio.verbose(ret.stderr)
has_failed = True
all_files = ret.stdout.strip().split('\n') if ret.stdout else []
ret = self.execute_command('find %s -type d' % remote_dir)
if not ret:
has_failed = True
all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
self._filter_dir_in_file_path(all_files, all_dirs)
for f in all_files:
task_queue.append(f)
if "*" in remote_dir:
remote_base_dir = os.path.dirname(remote_dir)
else:
remote_base_dir = remote_dir
for remote_path in task_queue:
local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
if not self._client_get_file(local_path, remote_path, stdio=stdio):
stdio.error('Fail to get %s' % remote_path)
has_failed = True
for remote_path in all_dirs:
try:
local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_base_dir))
if not os.path.exists(local_path):
stat = self.sftp.stat(remote_path)
os.makedirs(local_path, mode=stat.st_mode)
except Exception as e:
stdio.exception('Fail to make directory %s in local: %s' % (remote_path, e))
has_failed = True
return not has_failed
except Exception as e:
stdio.exception('Fail to get %s: %s' % (remote_dir, e))
@staticmethod
def _filter_dir_in_file_path(files, directories):
skip_directories = []
for path in files:
dir_name = os.path.dirname(path)
while dir_name not in ["/", ".", ""]:
if dir_name in skip_directories:
break
if dir_name in directories:
directories.remove(dir_name)
skip_directories.append(dir_name)
dir_name = os.path.dirname(dir_name)
def file_downloader(self, local_dir, remote_dir, stdio=None):
try:
client = SshClient(config=self.config, stdio=None)
client._open_sftp(stdio=stdio)
client._remote_transporter = self.remote_transporter
while True:
remote_path = self.task_queue.get(block=False)
local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
if client.get_file(local_path, remote_path, stdio=stdio):
self.result_queue.put(remote_path)
else:
stdio.error('Fail to get %s' % remote_path)
except Empty:
return
except:
stdio.exception("")
stdio.exception('Failed to get %s' % remote_dir)
def file_uploader(self, local_dir, remote_dir, stdio=None):
try:
client = SshClient(config=self.config, stdio=None)
client._remote_transporter = self.remote_transporter
while True:
local_path, is_dir = self.task_queue.get(block=False)
remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
if is_dir:
stat = oct(os.stat(local_path).st_mode)[-3:]
cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
if client.execute_command(cmd):
self.result_queue.put(remote_path)
else:
if client.put_file(local_path, remote_path, stdio=stdio):
self.result_queue.put(remote_path)
else:
stdio.error('Fail to get %s' % remote_path)
except Empty:
return
except:
stdio.exception("")
stdio.verbose('Failed to get %s' % remote_dir)
Python
1
https://gitee.com/oceanbase/obdeploy.git
git@gitee.com:oceanbase/obdeploy.git
oceanbase
obdeploy
obdeploy
master

搜索帮助