该功能产生背景 实现流程粗略讲解 开始 -> 读取接口配置文件(interface_path) -> 获取FTP服务器信息(omc_info_list) -> 创建本地目录(local_dir) -> 创建多进程池(Pool) -> 遍历omc_info_list中的每个omc_info -> -> 创建FTP对象(ftp) -> 列出最近更新的文件列表(filelist) -> -> 遍历filelist中的每个文件(i) -> -> 提取文件名(filename)和文件修改时间(filetime) -> -> 判断文件修改时间是否符合设定条件 -> -> 如果符合条件 -> -> 打印'文件下载成功'信息 -> 构建本地文件路径(local_path) -> -> 下载文件到本地(local_path) -> -> 否则 -> -> 继续下一个文件的判断 -> -> 关闭FTP对象(ftp) -> 结束遍历omc_info_list -> 关闭多进程池(Pool) -> 结束 废话不多说直接上代码 from multiprocessing import Pool import pandas as pdimport os from datetime import datetime import ftputil import ftputil.session from abc import ABCMeta, abstractmethod import paramiko from dateutil.parser import parse class AbstractFTPClient(metaclass=ABCMeta): ''' ftp/sftp/... client(file transfer client) abstract class 抽象方法需要全部实现,其它方法,子类自行扩展。 ''' @abstractmethod def cd(self, path) -> None: pass @abstractmethod def pwd(self) -> str: pass @abstractmethod def ls(self, path='.') -> list: pass @abstractmethod def get(self, remote, local) -> None: pass @abstractmethod def put(self, local, remote) -> None: pass @abstractmethod def put_r(self, local, remote) -> None: ''' recursive put, mkdir if directory does not exist :param local: local path :param remote: remote path :return: ''' pass @abstractmethod def getsize(self, path) -> int: ''' get file size :param path: remote file path :return: file size ''' pass @abstractmethod def getmtime(self, path) -> datetime: ''' get file recently modify time :param path: remote file path :return: datetime.datetime ''' pass @abstractmethod def close(self) -> None: pass class FTPClient(AbstractFTPClient): ''' base on ftputil ''' def __init__(self, ip, user, password, port=xx, mode='passive'): my_session_factory = ftputil.session.session_factory(port=port, use_passive_mode=None) self._ftp = ftputil.FTPHost(ip, user, password, session_factory=my_session_factory) def cd(self, path): self._ftp.chdir(path) def pwd(self): return self._ftp.getcwd() def ls(self, path='.'): return self._ftp.listdir(path) def get(self, remote, local): self._ftp.download(remote, local) def put(self, local, remote): self._ftp.upload(self, local, remote) def put_r(self, local, remote): try: self.put(local, remote) print('put: {}'.format(local)) except: parent = local dirs = [] while parent != '/': parent = os.path.dirname(parent) dirs.append(parent) for d in reversed(dirs): try: self._ftp.mkdir(d) print('mkdir: {}'.format(d)) except: pass self.put(local, remote) print('put after mkdir: {}'.format(local)) def getsize(self, path): return self._ftp.path.getsize(path) def getmtime(self, path): return datetime.fromtimestamp(self._ftp.path.getmtime(path)) def close(self): self._ftp.close() class SFTPClient(AbstractFTPClient): ''' base on paramiko ''' def __init__(self, ip, user, password, port=xx, mode='passive'): t = paramiko.Transport(sock=(ip, port)) t.connect(username=user, password=password) self._ftp = paramiko.SFTPClient.from_transport(t) def cd(self, path): self._ftp.chdir(path) def pwd(self): return self._ftp.getcwd() def ls(self, path='.'): return self._ftp.listdir(path) def get(self, remote, local): self._ftp.get(remote, local) def getfo(self, remote, fo): ''' :param fo: file-like obj ''' self._ftp.getfo(remote, fo) def put(self, local, remote): self._ftp.put(local, remote) def put_r(self, local, remote): try: self.put(local, remote) print('put: {}'.format(local)) except: parent = local dirs = [] while parent != '/': parent = os.path.dirname(parent) dirs.append(parent) for d in reversed(dirs): try: self._ftp.mkdir(d) print('mkdir: {}'.format(d)) except: pass self.put(local, remote) print('put after mkdir: {}'.format(local)) def getsize(self, path): return self._ftp.stat(path).st_size def getmtime(self, path): return datetime.fromtimestamp(self._ftp.stat(path).st_mtime) def close(self): self._ftp.close() class MyFTP(): def __init__(self, ip, user, password, protocol='sftp'): # 部分ftp服务器需要用不同的ftp模型类 on_off = 0 if on_off == 0: print('FTP') self.ftp = FTPClient(ip, user, password, port=21) else: print('SFTP') self.ftp = SFTPClient(ip, user, password, port=22) def list_recent(self, path): filelist = [] dirlist = [path] while dirlist != []: absdir = dirlist.pop(0) try: names = self.ftp.ls(absdir) except: names = [] for name in names: fullname = absdir + '/' + name if os.path.splitext(name)[1] == '': dirlist.append(fullname) else: filelist.append(fullname) return filelist def get(self, remote, local): self.ftp.get(remote, local) def put_r(self, local, remote): self.ftp.put_r(local, remote) def close(self): self.ftp.close() def get_interface(interface_path): df = pd.read_csv(interface_path,encoding='gbk') omc_info_lists = df.to_dict(orient='records') # print(omc_info_lists) return omc_info_lists def download(omc_info, local_dir): try: ftp = MyFTP(omc_info['ip'], omc_info['ftpuser'], str(omc_info['ftppassword'])) ip_dir = os.path.join(local_dir, omc_info['ip']) if not os.path.exists(ip_dir): os.mkdir(ip_dir) except Exception as e: print(e) print('{} connection fail '.format(omc_info['ip'])) print(omc_info['ip'], omc_info['ftpuser'], str(omc_info['ftppassword'])) else: filelist = ftp.list_recent(omc_info['ftppath']) # print(filelist[0]) if filelist: ) for i in filelist: filename = i.split('/')[-1] print(filename) filetime = parse(filename.split('_')[-3]) if filetime >= datetime.strptime('2022-12-27 00:00:00','%Y-%m-%d %H:%M:%S') and filetime < datetime.strptime('2022-12-30 00:00:00','%Y-%m-%d %H:%M:%S'): print('{} is downloaded :{}'.format(omc_info['ip'], filename)) local_path = os.path.join(ip_dir, filename) try: ftp.get(i, local_path) except Exception as e: print('{} download fail '.format(filename)) else: continue else: print('IP{} directory or path is not exists!'.format(omc_info['ip'])) ftp.close() def main(interface_path): # 结果路径 local_dir = './house-8.24-8.27' #local_dir = './house_0429' if not os.path.exists(local_dir): |
|