分享

用python快速实现一个FTP自动下载功能

 天选小丑 2023-08-11 发布于广西

该功能产生背景

由于客户经常需要从远程服务器上下载大量数据文件到本地解析分析,接到这个活后,刚开始用最原始的方法从服务器上拉了2天才把文件拉到本地,真是手拉到抽筋,眼睛盯着屏幕发肿,如是在网上看了点资料,用python完成一个FTP或SFTP自动下文件的功能

下面的代码实现了一个FTP和SFTP下载文件的脚本,主要功能是从指定的FTP或SFTP服务器下载符合条件的文件到本地目录。
首先定义了一个抽象类AbstractFTPClient,其中包含了一些操作FTP的抽象方法,如切换目录cd、获取当前目录pwd、列出目录下的文件列表ls、下载文件get等等。这些方法需要在具体的子类中实现。
然后有两个子类FTPClient和SFTPClient,分别基于ftputil和paramiko库实现了AbstractFTPClient中定义的方法。FTPClient和SFTPClient分别对应FTP和SFTP两种协议的文件传输客户端。
接下来是一个MyFTP类,它根据传入的协议类型(默认为SFTP)选择相应的协议客户端,并封装了常用的操作方法,如列出最近更新的文件列表list_recent、下载文件get、递归上传文件夹put_r等。
最后是一个main函数,读取一个接口配置文件interface_path(csv格式),并通过多进程池Pool并发下载文件。具体操作是先解析接口配置文件得到一组FTP服务器的信息,然后遍历每个FTP服务器,根据设定的条件筛选出符合要求的文件,将其下载到本地目录。
整个脚本的逻辑是:根据接口配置文件的信息,连接到FTP服务器并获取文件列表,然后根据设定的时间条件筛选出需要下载的文件,最后将这些文件下载到本地目录中。

实现流程粗略讲解

开始 -> 读取接口配置文件(interface_path) -> 获取FTP服务器信息(omc_info_list) -> 创建本地目录(local_dir) -> 创建多进程池(Pool) -> 遍历omc_info_list中的每个omc_info ->    -> 创建FTP对象(ftp) -> 列出最近更新的文件列表(filelist) ->    -> 遍历filelist中的每个文件(i) ->        -> 提取文件名(filename)和文件修改时间(filetime) ->        -> 判断文件修改时间是否符合设定条件 ->            -> 如果符合条件 ->                -> 打印'文件下载成功'信息 -> 构建本地文件路径(local_path) ->                -> 下载文件到本地(local_path) ->            -> 否则 ->                -> 继续下一个文件的判断 ->    -> 关闭FTP对象(ftp) -> 结束遍历omc_info_list -> 关闭多进程池(Pool) -> 结束

废话不多说直接上代码

from multiprocessing import Pool

import pandas as pd
import os
from datetime import datetime
import ftputil
import ftputil.session
from abc import ABCMeta, abstractmethod
import paramiko
from dateutil.parser import parse


class AbstractFTPClient(metaclass=ABCMeta):
'''
ftp/sftp/... client(file transfer client) abstract class
抽象方法需要全部实现,其它方法,子类自行扩展。
'''
@abstractmethod
def cd(self, path) -> None:
pass

@abstractmethod
def pwd(self) -> str:
pass

@abstractmethod
def ls(self, path='.') -> list:
pass

@abstractmethod
def get(self, remote, local) -> None:
pass

@abstractmethod
def put(self, local, remote) -> None:
pass

@abstractmethod
def put_r(self, local, remote) -> None:
'''
recursive put, mkdir if directory does not exist
:param local: local path
:param remote: remote path
:return:
'''
pass

@abstractmethod
def getsize(self, path) -> int:
'''
get file size
:param path: remote file path
:return: file size
'''
pass

@abstractmethod
def getmtime(self, path) -> datetime:
'''
get file recently modify time
:param path: remote file path
:return: datetime.datetime
'''
pass

@abstractmethod
def close(self) -> None:
pass


class FTPClient(AbstractFTPClient):
''' base on ftputil '''
def __init__(self, ip, user, password, port=xx, mode='passive'):
my_session_factory = ftputil.session.session_factory(port=port, use_passive_mode=None)
self._ftp = ftputil.FTPHost(ip, user, password, session_factory=my_session_factory)

def cd(self, path):
self._ftp.chdir(path)

def pwd(self):
return self._ftp.getcwd()

def ls(self, path='.'):
return self._ftp.listdir(path)

def get(self, remote, local):
self._ftp.download(remote, local)

def put(self, local, remote):
self._ftp.upload(self, local, remote)

def put_r(self, local, remote):
try:
self.put(local, remote)
print('put: {}'.format(local))
except:
parent = local
dirs = []
while parent != '/':
parent = os.path.dirname(parent)
dirs.append(parent)
for d in reversed(dirs):
try:
self._ftp.mkdir(d)
print('mkdir: {}'.format(d))
except:
pass
self.put(local, remote)
print('put after mkdir: {}'.format(local))

def getsize(self, path):
return self._ftp.path.getsize(path)

def getmtime(self, path):
return datetime.fromtimestamp(self._ftp.path.getmtime(path))

def close(self):
self._ftp.close()


class SFTPClient(AbstractFTPClient):
''' base on paramiko '''
def __init__(self, ip, user, password, port=xx, mode='passive'):
t = paramiko.Transport(sock=(ip, port))
t.connect(username=user, password=password)
self._ftp = paramiko.SFTPClient.from_transport(t)

def cd(self, path):
self._ftp.chdir(path)

def pwd(self):
return self._ftp.getcwd()

def ls(self, path='.'):
return self._ftp.listdir(path)

def get(self, remote, local):
self._ftp.get(remote, local)

def getfo(self, remote, fo):
'''
:param fo: file-like obj
'''
self._ftp.getfo(remote, fo)

def put(self, local, remote):
self._ftp.put(local, remote)

def put_r(self, local, remote):
try:
self.put(local, remote)
print('put: {}'.format(local))
except:
parent = local
dirs = []
while parent != '/':
parent = os.path.dirname(parent)
dirs.append(parent)
for d in reversed(dirs):
try:
self._ftp.mkdir(d)
print('mkdir: {}'.format(d))
except:
pass
self.put(local, remote)
print('put after mkdir: {}'.format(local))

def getsize(self, path):
return self._ftp.stat(path).st_size

def getmtime(self, path):
return datetime.fromtimestamp(self._ftp.stat(path).st_mtime)

def close(self):
self._ftp.close()


class MyFTP():
def __init__(self, ip, user, password, protocol='sftp'):
# 部分ftp服务器需要用不同的ftp模型类
on_off = 0
if on_off == 0:
print('FTP')
self.ftp = FTPClient(ip, user, password, port=21)
else:
print('SFTP')
self.ftp = SFTPClient(ip, user, password, port=22)


def list_recent(self, path):
filelist = []
dirlist = [path]
while dirlist != []:
absdir = dirlist.pop(0)
try:
names = self.ftp.ls(absdir)
except:
names = []
for name in names:
fullname = absdir + '/' + name
if os.path.splitext(name)[1] == '':
dirlist.append(fullname)
else:
filelist.append(fullname)
return filelist

def get(self, remote, local):
self.ftp.get(remote, local)

def put_r(self, local, remote):
self.ftp.put_r(local, remote)

def close(self):
self.ftp.close()


def get_interface(interface_path):
df = pd.read_csv(interface_path,encoding='gbk')
omc_info_lists = df.to_dict(orient='records')
# print(omc_info_lists)
return omc_info_lists


def download(omc_info, local_dir):
try:
ftp = MyFTP(omc_info['ip'], omc_info['ftpuser'], str(omc_info['ftppassword']))
ip_dir = os.path.join(local_dir, omc_info['ip'])
if not os.path.exists(ip_dir):
os.mkdir(ip_dir)
except Exception as e:
print(e)
print('{} connection fail '.format(omc_info['ip']))
print(omc_info['ip'], omc_info['ftpuser'], str(omc_info['ftppassword']))
else:
filelist = ftp.list_recent(omc_info['ftppath'])
# print(filelist[0])
if filelist:
)
for i in filelist:
filename = i.split('/')[-1]
print(filename)

filetime = parse(filename.split('_')[-3])

if filetime >= datetime.strptime('2022-12-27 00:00:00','%Y-%m-%d %H:%M:%S') and filetime < datetime.strptime('2022-12-30 00:00:00','%Y-%m-%d %H:%M:%S'):
print('{} is downloaded :{}'.format(omc_info['ip'], filename))
local_path = os.path.join(ip_dir, filename)
try:
ftp.get(i, local_path)
except Exception as e:
print('{} download fail '.format(filename))
else:
continue
else:
print('IP{} directory or path is not exists!'.format(omc_info['ip']))
ftp.close()


def main(interface_path):
# 结果路径
local_dir = './house-8.24-8.27'
#local_dir = './house_0429'
    if not os.path.exists(local_dir):
os.mkdir(local_dir)
omc_info_list = get_interface(interface_path)
# print(omc_info_list[0])

pool = Pool(5)
for omc_info in omc_info_list:
print(omc_info)
print(omc_info['ip'])
pool.apply_async(download, (omc_info, local_dir))
pool.close()
pool.join()


if __name__ == '__main__':
interface_path = 'ipinterface.csv'
#interface_path = 'interface_yichang.csv'

main(interface_path)

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多