分享

python实现p2p文件传输

 caodaoquan 2018-05-11

一、需求及应用场景

考虑到我手上的服务器逐渐的增多,有时候需要大规模的部署同一个文件,例如因为方便使用systemtap这个工具定位问题,需要把手上几百台服务器同时安装kernel-debuginfo这个包,原有的方式采用一个源服务器,采用rsync或者scp之类的文件传输方式只能做到一个点往下分发这个文件,这个时候下发的速度就会比较的慢,基于以上原因,我写了一个基于bt协议传输文件的小工具,实际测试,传输到10个机房,70多台机器传输一个240M的这个内核文件,到所有的机器,源采用限速2m/s的上传速度,测试的结果大概只要140s,就可以全部传输完毕,这个效率是非常之高,如果不限速的情况下速度会更快,下面把这个程序开源出来。

二、代码

  1. #!/usr/bin/env python
  2. import libtorrent as lt
  3. import sys
  4. import os
  5. import time
  6. from optparse import OptionParser
  7. import socket
  8. import struct
  9. import fcntl
  10. def get_interface_ip(ifname):
  11. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  12. return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s',
  13. ifname[:15]))[20:24])
  14. def ip2long(ip):
  15. return reduce(lambda a,b:(a<<8)+b,[int(i) for i in ip.split('.')])
  16. def get_wan_ip_address():
  17. interfaces = set(['eth0', 'eth1', 'eth2', 'eth3', 'em1', 'em2', 'em3', 'em4'])
  18. ip = ''
  19. for i in interfaces:
  20. try:
  21. ip = get_interface_ip(i)
  22. if (ip2long(ip) < ip2long('10.0.0.0') or ip2long(ip) > ip2long('10.255.255.255')) \
  23. and (ip2long(ip) < ip2long('172.16.0.0') or ip2long(ip) > ip2long('172.33.255.255')) \
  24. and (ip2long(ip) < ip2long('192.168.0.0') or ip2long(ip) > ip2long('192.168.255.255')):
  25. return ip
  26. except:
  27. pass
  28. return ip
  29. def make_torrent(path, save):
  30. fs = lt.file_storage()
  31. lt.add_files(fs, path)
  32. if fs.num_files() == 0:
  33. print 'no files added'
  34. sys.exit(1)
  35. input = os.path.abspath(path)
  36. basename = os.path.basename(path)
  37. t = lt.create_torrent(fs, 0, 4 * 1024 * 1024)
  38. t.add_tracker("http://10.0.1.5:8760/announce")
  39. t.set_creator('libtorrent %s' % lt.version)
  40. lt.set_piece_hashes(t, os.path.split(input)[0], lambda x: sys.stderr.write('.'))
  41. sys.stderr.write('\n')
  42. save = os.path.dirname(input)
  43. save = "%s/%s.torrent" % (save, basename)
  44. f=open(save, "wb")
  45. f.write(lt.bencode(t.generate()))
  46. f.close()
  47. print "the bt torrent file is store at %s" % save
  48. def dl_status(handle):
  49. while not (handle.is_seed()):
  50. s = handle.status()
  51. state_str = ['queued', 'checking', 'downloading metadata', \
  52. 'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
  53. print '\ractive_time: %d, %.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d, seeds: %d) %s' % \
  54. (s.active_time, s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \
  55. s.num_peers, s.num_seeds, state_str[s.state]),
  56. sys.stdout.flush()
  57. time.sleep(1)
  58. def seed_status(handle, seedtime=100):
  59. seedtime = int(seedtime)
  60. if seedtime < 100:
  61. seedtime = 100
  62. while seedtime > 0:
  63. seedtime -= 1
  64. s = handle.status()
  65. state_str = ['queued', 'checking', 'downloading metadata', \
  66. 'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
  67. print '\rseed_time: %d, %.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d, seeds: %d) %s' % \
  68. (s.active_time, s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \
  69. s.num_peers, s.num_seeds, state_str[s.state]),
  70. sys.stdout.flush()
  71. time.sleep(1)
  72. def remove_torrents(torrent, session):
  73. session.remove_torrent(torrent)
  74. def read_alerts(session):
  75. alert = session.pop_alert()
  76. while alert:
  77. #print alert, alert.message()
  78. alert = session.pop_alert()
  79. def download(torrent, path, upload_rate_limit=0, seedtime=100):
  80. try:
  81. session = lt.session()
  82. session.set_alert_queue_size_limit(1024 * 1024)
  83. sts = lt.session_settings()
  84. sts.ssl_listen = False
  85. sts.user_agent = "Thunder deploy system"
  86. sts.tracker_completion_timeout = 5
  87. sts.tracker_receive_timeout = 5
  88. sts.stop_tracker_timeout = 5
  89. sts.active_downloads = -1
  90. sts.active_seeds = -1
  91. sts.active_limit = -1
  92. sts.auto_scrape_min_interval = 5
  93. sts.udp_tracker_token_expiry = 120
  94. sts.min_announce_interval = 1
  95. sts.inactivity_timeout = 60
  96. sts.connection_speed = 10
  97. sts.allow_multiple_connections_per_ip = True
  98. sts.max_out_request_queue = 128
  99. sts.request_queue_size = 3
  100. sts.use_read_cache = False
  101. session.set_settings(sts)
  102. session.set_alert_mask(lt.alert.category_t.tracker_notification | lt.alert.category_t.status_notification)
  103. session.set_alert_mask(lt.alert.category_t.status_notification)
  104. ipaddr = get_wan_ip_address()
  105. #print ipaddr
  106. if ipaddr == "":
  107. session.listen_on(6881, 6881)
  108. else:
  109. session.listen_on(6881, 6881, ipaddr)
  110. limit = int(upload_rate_limit)
  111. if limit>=100:
  112. session.set_upload_rate_limit(limit*1024)
  113. session.set_local_upload_rate_limit(limit*1024)
  114. print session.upload_rate_limit()
  115. torrent_info = lt.torrent_info(torrent)
  116. add_params = {
  117. 'save_path': path,
  118. 'storage_mode': lt.storage_mode_t.storage_mode_sparse,
  119. 'paused': False,
  120. 'auto_managed': True,
  121. 'ti': torrent_info,
  122. }
  123. handle = session.add_torrent(add_params)
  124. read_alerts(session)
  125. st = time.time()
  126. dl_status(handle)
  127. et = time.time() - st
  128. print '\nall file download in %.2f\nstart to seeding\n' % et
  129. sys.stdout.write('\n')
  130. handle.super_seeding()
  131. seed_status(handle, seedtime)
  132. remove_torrents(handle, session)
  133. assert len(session.get_torrents()) == 0
  134. finally:
  135. print 'download finished'
  136. if __name__ == '__main__':
  137. usage = "usage: %prog [options] \n \
  138. %prog -d -f <torrent file=""> -s <file save="" path="">\n \
  139. or \n \
  140. %prog -m -p <file or="" dir=""> -s <torrent save="" path="">\n"
  141. parser = OptionParser(usage=usage)
  142. parser.add_option("-d", "--download", dest="download",
  143. help="start to download file", action="store_false", default=True)
  144. parser.add_option("-f", "--file", dest="file",
  145. help="torrent file")
  146. parser.add_option("-u", "--upload", dest="upload",
  147. help="set upload rate limit, default is not limit", default=0)
  148. parser.add_option("-t", "--time", dest="time",
  149. help="set seed time, default is 100s", default=100)
  150. parser.add_option("-p", "--path", dest="path",
  151. help="to make torrent with this path")
  152. parser.add_option("-m", "--make", dest="make",
  153. help="make torrent", action="store_false", default=True)
  154. parser.add_option("-s", "--save", dest="save",
  155. help="file save path, default is store to ./", default="./")
  156. (options, args) = parser.parse_args()
  157. #download(sys.argv[1])
  158. if len(sys.argv) != 6 and len(sys.argv) != 4 and len(sys.argv) != 8 and len(sys.argv) != 10:
  159. parser.print_help()
  160. sys.exit()
  161. if options.download == False and options.file !="":
  162. download(options.file, options.save, options.upload, options.time)
  163. elif options.make == False and options.path != "":
  164. make_torrent(options.path, options.save)
  165. </torrent></file></file></torrent>

三、使用

1、环境准备

需要在所有的os上面安装一个libtorrent的库,下载地址:http://code.google.com/p/libtorrent/downloads/list (国内需使用代理访问)

记得编译的时候带上./configure –enable-python-binding,然后mak,make install,进入binding目录,make,make install就可以运行这个小的工具。当然大规模部署不可能采用每一台都去编译安装的方式,只要把编译出来的libtorrent.so libtorrent-rasterbar.so.7的文件跟bt.py这个文件放到同一个目录,另外写一个shell脚本。

  1. lib=`dirname $0`
  2. export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$lib
  3. python bt.py -d -f <种子文件> -s <文件保存路径> -t <做种时间> -u <限制上传速度>

2、使用方法

在源服务器上生成种子文件:

  1. python bt.py -m -p <要发布的文件或者文件夹> -s <种子保存地址>

在源服务器上发布文件:

  1. python bt.py -d -f <种子文件> -s <文件保存路径> -t <做种时间> -u <限制上传速度>

其中做种时间默认设置是100s,上传速度默认不限制,限制速度的单位是kb 。

只要有一台机器完成了,就自动作为种子,在下载的过程中也会上传,任何一台机器都可以作为源服务器,当然了这里面还有中心的tracker服务器,脚本当中,我搭建了一个tracker源服务器,放到10.0.1.5端口是8760上面,当然大家也可以采用opentracker这个软件自己搭建一个tracker服务器,修改其中的源代码对应部分,另外考虑到发布都是私有文件,代码当作已经禁止了dht,如果还想更安全,就自己搭建一个私有的tracker server,具体搭建方法就使用一下搜索引擎,查找一下搭建的方法!

目前基本做到可以使用,后续考虑更简单一点,采用磁力链接的方式,这样就可以做到不用每台都要拷贝一个种子文件,采用一个单独的命令行就可以发布整个文件。

注:该篇内容来自于网络,非原创,本篇摘录过来做为学习之用。在使用实用中进行大文件传输时,可以再结合ansible、saltstack等自动化运维工具需要在多台主机之间传大文件时,可以通过该文件加快传输速度,增加网络利用率。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多