|
磁力链接(MagnetLinks)是一种流行的用于共享文件的链接格式。与传统的种子文件不同,磁力链接直接包含文件的哈希值,使得用户无需下载种子文件就能开始下载文件。在这篇教程中,我们将介绍如何使用Python和Aria2异步下载和解析磁力链接的元数据,并提取出文件的详细信息。 步骤一:准备工作 在开始之前,请确保你已经安装了以下工具和库: 1.**Aria2**:一个轻量级多协议和多源命令行下载工具。你可以从[Aria2官方网站](https://aria2.github.io/)下载并安装。2.**Python**:本教程使用Python3.7及以上版本。你可以从[Python官方网站](https://www.python.org/)下载并安装。3.**bencodepy**:用于解析`.torrent`文件的Python库。你可以使用以下命令安装它: pipinstallbencodepy步骤二:编写Python脚本接下来,我们将编写一个Python脚本,分为以下几个部分:1.**导入必要的库**: importasyncio importsubprocess importbencodepy importos importre 2.**定义判断磁力链接的函数**: defis_magnet_link(msg:str)->bool: magnet_pattern=re.compile(r'magnet:\?xt=urn:[a-zA-Z0-9]+:[a-zA-Z0-9]{32,}') returnbool(magnet_pattern.search(msg)) 3.**定义异步下载磁力链接元数据的函数**: asyncdefdownload_torrent_from_magnet(magnet_link,save_path='.'): aria2c_path=r'E:\Aria2\aria2c.exe' aria2c_command=[ aria2c_path, magnet_link, '--dir='+save_path, '--bt-metadata-only=true', '--bt-save-metadata=true' ] process=awaitasyncio.create_subprocess_exec( *aria2c_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout,stderr=awaitprocess.communicate() ifprocess.returncode==0: print(f"元数据下载成功:{magnet_link}") btih=magnet_link.split(':')[3].split('&')[0].replace('btih=','').upper() torrent_file=os.path.join(save_path,f"{btih}.torrent") returntorrent_file else: print(f"下载元数据失败:{magnet_link}") print(f"错误信息:{stderr.decode()}") returnNone 4.**定义解析`.torrent`文件的函数**: defparse_torrent_file(torrent_file): withopen(torrent_file,'rb')asf: torrent_data=bencodepy.decode(f.read()) info=torrent_data[b'info'] name=info[b'name'].decode('utf-8') ifb'files'ininfo: files=[{'path's.path.join(name,*[part.decode('utf-8')forpartinfile[b'path']]),'size':file[b'length']}forfileininfo[b'files']] else: files=[{'path':name,'size':info[b'length']}] return{'name':name,'files':files} 5.**定义格式化文件大小的函数** defformat_size(size_in_bytes): ifsize_in_byteslist: tasks=[download_torrent_from_magnet(magnet_link,save_path)formagnet_linkinmagnet_links] torrent_files=awaitasyncio.gather(*tasks) data_l=[] formagnet_link,torrent_fileinzip(magnet_links,torrent_files): iftorrent_file: metadata=parse_torrent_file(torrent_file) name=metadata['name'].replace('\\','/') bt_data={name:[]} forfileinmetadata['files']: files={'size':format_size(file['size']),'path':file['path'].replace('\\','/')} bt_data[name].append(files) data_l.append([magnet_link,bt_data]) else: data_l.append([magnet_link,None]) returndata_l 7.**编写主程序入口**: if__name__=="__main__": magnet_links=[ 'magnet:?xt=urn:btih:磁力链接' #你可以添加更多磁力链接 ] save_path=r"E:\Aria2\dows"#替换为你希望保存bt种子的路径 results=asyncio.run(mage_main(magnet_links,save_path)) forresultinresults: print(result) 步骤三:运行脚本 保存以上代码为`torrent_downloader.py`文件。在命令行中导航到该文件所在的目录,并运行以下命令:pythontorrent_downloader.py 如果你设置的磁力链接有效,脚本将下载磁力链接的元数据,并解析`.torrent`文件,输出文件的详细信息。完整代码如下importasyncioimportsubprocessimportbencodepyimportosimportredefis_magnet_link(msg:str)->bool: """ 判断给定字符串是否为磁力链接 参数: msg(str):待检测的字符串 返回: bool:如果是磁力链接返回True,否则返回False """ magnet_pattern=re.compile(r'magnet:\?xt=urn:[a-zA-Z0-9]+:[a-zA-Z0-9]{32,}') returnbool(magnet_pattern.search(msg))defextract_magnet_links(text:str)->list: """ 从给定文本中提取所有磁力链接 参数: text(str):包含磁力链接的文本 返回: list:提取出的磁力链接列表 """ magnet_pattern=re.compile(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]{32,}') magnet_links=magnet_pattern.findall(text) returnmagnet_linksasyncdefdownload_torrent_from_magnet(magnet_link:str,save_path:str='.')->str: """ 使用Aria2下载磁力链接的种子文件并保存 参数: magnet_link(str):磁力链接 save_path(str):保存种子文件的路径,默认为当前目录 返回: str:下载的种子文件路径,如果下载失败返回None """ aria2c_path=r'E:\Aria2\aria2c.exe' #Aria2的路径 aria2c_command=[ aria2c_path, magnet_link, '--dir='+save_path, '--bt-metadata-only=true', '--bt-save-metadata=true' ] process=awaitasyncio.create_subprocess_exec( *aria2c_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout,stderr=awaitprocess.communicate() ifprocess.returncode==0: print(f"元数据下载成功:{magnet_link}") btih=magnet_link.split(':')[3].split('&')[0].replace('btih=','').upper() torrent_file=os.path.join(save_path,f"{btih}.torrent") returntorrent_file else: print(f"下载元数据失败:{magnet_link}") print(f"错误信息:{stderr.decode()}") returnNonedefparse_torrent_file(torrent_file:str)->dict: """ 解析种子文件,提取文件信息 参数: torrent_file(str):种子文件路径 返回: dict:种子文件包含的文件信息 """ withopen(torrent_file,'rb')asf: torrent_data=bencodepy.decode(f.read()) info=torrent_data[b'info'] name=info[b'name'].decode('utf-8') ifb'files'ininfo: files=[{'path's.path.join(name,*[part.decode('utf-8')forpartinfile[b'path']]),'size':file[b'length']}forfileininfo[b'files']] else: files=[{'path':name,'size':info[b'length']}] return{'name':name,'files':files}defformat_size(size_in_bytes:int)->str: """ 格式化文件大小 参数: size_in_bytes(int):文件大小(字节) 返回: str:格式化后的文件大小字符串 """ ifsize_in_byteslist: """ 主函数,处理一组磁力链接,下载种子文件并提取文件信息 参数: magnet_links(list):磁力链接列表 save_path(str):保存种子文件的路径 返回: list:包含每个磁力链接对应的文件信息 """ tasks=[download_torrent_from_magnet(magnet_link,save_path)formagnet_linkinmagnet_links] torrent_files=awaitasyncio.gather(*tasks) data_l=[] formagnet_link,torrent_fileinzip(magnet_links,torrent_files): iftorrent_file: metadata=parse_torrent_file(torrent_file) name=metadata['name'].replace('\\','/') bt_data={name:[]} forfileinmetadata['files']: files={'size':format_size(file['size']),'path':file['path'].replace('\\','/')} bt_data[name].append(files) data_l.append([magnet_link,bt_data]) else: data_l.append([magnet_link,None]) returndata_lif__name__=="__main__": magnet_links=[ 'magnet:?xt=urn:btih:你的磁力链' #你可以添加更多磁力链接 ] save_path=r"E:\Aria2\dows" results=asyncio.run(mage_main(magnet_links,save_path)) forresultinresults: print(result)
|
|