defget_m3u8_url(self): # 根据剧集网址,获取m3u8下载索引 res =requests.get(url=self.url,headers=self.headers).text first_m3u8_url = re.findall(';var now="(.*?)"',res,re.S)[0] # 获取m3u8下载地址 self.name = re.findall("<li>正在播放:(.*?)</li>",res,re.S)[0] # 获取下载片名 print(self.name+"准备开始下载") res_second = requests.get(url=first_m3u8_url,headers=self.headers).text with open(self.name+".m3u8",'w',encoding='utf-8') as f: f.write(res_second)
二,获取m3u8播放列表
根据上一步下载的m3u8文件,提取下载地址,代码如下:
defget_ts_url(self): # 根据m3u8下载索引,获取各个下载地址 ts_url_list = [] with open(self.name+".m3u8",'r',encoding='utf-8') as f: for line in f: if line.startswith('#'): continue line = line.strip() ts_url_list.append(line) return ts_url_list
三、编码实现
然后下一步我们就开始下载视频了,完整代码如下:
# -*- coding:utf-8 -*- import time import re import os import requests import asyncio import aiohttp import aiofiles import nest_asyncio nest_asyncio.apply()
defget_m3u8_url(self): # 根据剧集网址,获取m3u8下载索引 res =requests.get(url=self.url,headers=self.headers).text first_m3u8_url = re.findall(';var now="(.*?)"',res,re.S)[0] # 获取m3u8下载地址 self.name = re.findall("<li>正在播放:(.*?)</li>",res,re.S)[0] # 获取下载片名 print(self.name+"准备开始下载") res_second = requests.get(url=first_m3u8_url,headers=self.headers).text with open(self.name+".m3u8",'w',encoding='utf-8') as f: f.write(res_second)
defget_ts_url(self): # 根据m3u8下载索引,获取各个ts文件地址 ts_url_list = [] with open(self.name+".m3u8",'r',encoding='utf-8') as f: for line in f: if line.startswith('#'): continue line = line.strip() ts_url_list.append(line) return ts_url_list
asyncdefmain(self): # 根据m3u8文件中的ts网址,下载ts视频 tasks = [] self.get_m3u8_url() ts_url_list = self.get_ts_url() sem = asyncio.Semaphore(5) # 设置信号量,控制异步数量 asyncwith aiohttp.ClientSession(headers=self.headers) as session: for ts_url in ts_url_list: tasks.append(asyncio.create_task(self.download_ts(url=ts_url,session=session,sem=sem))) await asyncio.wait(tasks)
if __name__ == '__main__': t = time.time() f = RRyy123() f.url = "https://www.r/video/?40479-1-18.html" asyncio.run(f.main()) print(f"{f.name}下载完成,总耗时{int(time.time()-t)}s.")
defmerge_video(name): # 合并视频 ts_url_list=[] with open(name + ".m3u8", 'r', encoding='utf-8') as f: for line in f: if line.startswith('#'): continue line = line.strip() ts_url_list.append(line)
f = open(f"./temp_{name}/filename.txt" ,'w' ,encoding="utf-8") for ts_url in ts_url_list: ts_name = os.path.splitext(ts_url)[0].split("/")[-1] f.write("file "+ ts_name +".ts\n") f.close() print(f"{name}开始合并。。。。") os.system(f"ffmpeg -f concat -i ./temp_{name}/filename.txt -c copy {name}.mp4") print(f"{name}视频合成成功")