ftppythonスクリプトを使用したファイルのマルチスレッドダウンロード

なぜこれが必要なのですか?

かつて、ftpサーバーから多数のファイルをコピーするというタスクに直面しました。バックアップを作成する必要がありました。それはもっと簡単かもしれないようです!しかし、残念ながら、私の状態に対してこれほど迅速に機能する準備ができているものは何も見つかりませんでした。





状況

Windowsでは、ftpサーバーから定期的に数百のファイルをフェッチする必要がありました。たくさんの小さなことといくつかの非常に大きなファイル。合計約500GB。サーバーはかなり海外にあるvpsです。日中は車の負荷が高く、夜の早い時間帯に定期的なメンテナンスが行われ、合計で最大5時間のダウンロードが可能です。





私がレビューしたユーティリティはどれも、割り当てられた時間内に効率的に対処することができませんでした。まあ、行き場がなく、通常のバックアップシステムは、まだ我々が意味し、購入されていないエディタやPython IDEで自分自身をして行きます!冒険!





構成

便宜上、スクリプトのすべてのパラメーターを別のファイルに入れます。





構成テンプレート:





host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder'  # ,        
max_threads = 20 #     
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
      
      



設定を.py拡張子で保存し、スクリプトの最初にインポートします。スクリプトの名前名に直接インポートすることもできますが、スクリプトの主要部分で、構造をクラッチのように少し作成しました。





if __name__ == "__main__":
    host = config.host
    user = config.user
    passwd = config.passwd
    basepath = config.basepath  # ,        
    max_threads = config.max_threads
    log_path = config.log_path
    statusfilepath = config.statusfilepath
    main()
      
      



最初はリストがありました

ftp , , , ftp- . , , - .





. , , , -.





- ftp:





class MyFtp (ftplib.FTP):
    """  ,        """
    def __init__(self):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.timeout = 1800
        super(MyFtp, self).__init__()

    def connect(self):
        super(MyFtp, self).connect(self.host, timeout=self.timeout)

    def login(self):
        super(MyFtp, self).login(user=self.user, passwd=self.passwd)

    def quit(self):
        super(MyFtp,self).quit()
      
      



. ftplib, .





:





class FileList:
    """      """
    def __init__(self):
        self.ftp = None
        self.file_list = []

    def connect_ftp(self):
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()

    def get_list(self, name):
        """       ftp-."""
        import os
        for dirname in self.ftp.mlsd(str(name), facts=["type"]):
            if dirname[1]["type"] == "file":
                entry_file_list = {}
                entry_file_list['remote_path'] = name  #  
                entry_file_list['filename'] = dirname[0]  # 
                self.file_list.append(entry_file_list)
            else:
                path = os.path.join(name, dirname[0])
                self.get_list(path)

    def get_next_file(self):
        return self.file_list.pop()

    def len(self):
        return len(self.file_list)
      
      



, , , , , .





logging. , .





class MyLogger:
    """   """
    def __init__(self):
        self.logger = None

    def start_file_logging(self, logger_name, log_path):
        """   """
        import logging
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = logging.FileHandler(log_path)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
        """     """
        import logging
        from logging.handlers import RotatingFileHandler
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        try:
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        except FileNotFoundError:
            log_path = "downloader.log"
            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        self.logger.addHandler(fh)

    def add(self, msg):
        self.logger.info(str(msg))

    def add_error(self, msg):
        self.logger.error(str(msg))

      
      



, .





. , :





class BaseFileDownload(threading.Thread):
    """     """
    count = 0

    def __init__(self, rpath, filename, log):
        threading.Thread.__init__(self)
        self.remote_path = rpath
        self.filename = filename
        self.ftp = None
        self.command = None
        self.currentpath = None
        self.log = log
        self.__class__.count += 1 #     

    def __del__(self):
        self.__class__.count -= 1

    def connect(self):
        """    ftp"""
        import sys
        self.ftp = MyFtp()
        self.ftp.connect()
        self.ftp.login()
        self.ftp.__class__.encoding = sys.getfilesystemencoding()


    def run(self):
        """  """
        import os
        self.connect()
        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
        self.currentpath = os.path.join(basepath, self.remote_path[3:])
        self.ftp.cwd(self.remote_path)
        if not os.path.exists(self.currentpath):
            os.makedirs(self.currentpath, exist_ok=True)
        self.host_file = os.path.join(self.currentpath, self.filename)
        try:
            with open(self.host_file, 'wb') as local_file:
                self.log.add("Start downloading " + self.filename)
                self.ftp.retrbinary(self.command + self.filename, local_file.write)
                self.log.add("Downloading " + self.filename + " complete")
        except ftplib.error_perm:
            self.log.add_error('Perm error')
        self.ftp.quit()
      
      



count. : , , , .





run - threading ( !), .





, os.makedirs.





-

. zabbix, , - , .





このファイルを操作するためのクラスは次のようになります。





class StatusFile:
    """          ."""
    def __init__(self):
        self.msg = ''

    def setstatus(self, msg):
        global statusfilepath
        with open(statusfilepath, 'w') as status_file:
            status_file.write(msg)

      
      



マルチスレッド

そして最後に、ダウンロードストリームで動作するメインスクリプト関数自体:





def main():
    import os
    import datetime
    import time

    log = MyLogger()
    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) #  
    now = datetime.datetime.today().strftime("%Y%m%d")
    global basepath
    basepath = os.path.join(basepath, now)  #  ,   
    list_file = FileList()
    list_file.connect_ftp()
    list_file.get_list("..")
    for i in range(list_file.len()):
        flag = True
        while flag:   #         
            if BaseFileDownload.count < max_threads:
                curfile = list_file.get_next_file()
                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
                threadid.start()
                flag = False
            else:
                time.sleep(20)
    log.add("Downloading files complete")
    statusfile = StatusFile()
    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
      
      



ここでロギングを開始し、ファイルのリストを取得します(メモリに保存されます)。





永遠のwhileループでは、同時に実行されているダウンロードの数を確認し、必要に応じて追加のスレッドを開始します。





ソースコード全体はここにあります












All Articles