尊敬的开发大佬们,我试图用ai写了一个利用alistAPI进行不同盘之间文件同步的脚本 但是失败了
Please make sure of the following things
- [X] I have read the documentation.
- [X] I'm sure there are no duplicate issues or discussions.
- [X] I'm sure this feature is not implemented.
- [X] I'm sure it's a reasonable and popular requirement.
Description of the feature / 需求描述
`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)
@dataclass class TaskInfo: name: str status: str progress: float
class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()
def _login(self) -> None:
try:
response = self.session.post(
f"{self.base_url}/api/auth/login",
json={
"username": self.username,
"password": self.password
}
)
response.raise_for_status()
self.token = response.json()["data"]["token"]
self.session.headers.update({
"Authorization": self.token,
"Content-Type": "application/json"
})
logger.info("成功登录 AList")
except Exception as e:
logger.error(f"登录失败: {str(e)}")
raise
def list_files(self, path: str) -> Set[str]:
"""获取目录下的所有文件名,强制刷新缓存"""
try:
response = self.session.post(
f"{self.base_url}/api/fs/list",
json={
"path": path,
"refresh": True,
"password": "",
"page": 1,
"per_page": 0
}
)
response.raise_for_status()
files = {item['name'] for item in response.json()["data"]["content"]}
return files
except Exception as e:
logger.error(f"获取目录列表失败 {path}: {str(e)}")
return set()
def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool:
"""分批复制文件"""
if not names:
return True
success = True
for i in range(0, len(names), batch_size):
batch = names[i:i + batch_size]
try:
response = self.session.post(
f"{self.base_url}/api/fs/copy",
json={
"src_dir": src_dir,
"dst_dir": dst_dir,
"names": batch
}
)
response.raise_for_status()
logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}")
except Exception as e:
logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}")
success = False
return success
def delete(self, path: str, names: List[str]) -> bool:
"""删除文件"""
try:
response = self.session.post(
f"{self.base_url}/api/fs/remove",
json={
"dir": path,
"names": names
}
)
response.raise_for_status()
logger.info(f"成功删除文件: {names}")
return True
except Exception as e:
logger.error(f"删除文件失败 {names}: {str(e)}")
return False
def get_tasks(self) -> List[TaskInfo]:
"""获取所有正在进行的复制任务"""
try:
response = self.session.post(
f"{self.base_url}/api/fs/task/list",
json={
"page": 1,
"per_page": 0,
"type": "copy",
"status": ["pending", "running", "copying"]
}
)
response.raise_for_status()
data = response.json()
tasks = []
for task in data.get("data", {}).get("content", []):
tasks.append(TaskInfo(
name=task.get("name", ""),
status=task.get("status", ""),
progress=float(task.get("progress", 0))
))
return tasks
except requests.exceptions.JSONDecodeError:
logger.debug("任务列表为空")
return []
except Exception as e:
logger.debug(f"获取任务状态: {str(e)}")
return []
class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')
def is_file_in_transfer(self, filename: str) -> bool:
"""检查文件是否正在传输中"""
tasks = self.api.get_tasks()
for task in tasks:
if (task.name == filename and
task.status in ["pending", "running", "copying"]):
logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)")
return True
return False
def sync_directory(self) -> None:
try:
logger.debug("开始检查目录...")
# 获取源目录和目标目录的文件列表
source_files = self.api.list_files(self.source_path)
dest_files = self.api.list_files(self.dest_path)
# 找出需要添加和删除的文件
files_to_add = source_files - dest_files
files_to_delete = dest_files - source_files
# 处理需要添加的文件
if files_to_add:
# 过滤掉正在传输的文件
files_to_copy = [f for f in files_to_add
if not self.is_file_in_transfer(f)]
if files_to_copy:
logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}")
self.api.copy(self.source_path, self.dest_path, files_to_copy)
else:
logger.debug("所有新文件都在传输中")
# 处理需要删除的文件
if files_to_delete:
logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}")
self.api.delete(self.dest_path, list(files_to_delete))
if not files_to_add and not files_to_delete:
logger.debug("目录已同步,无需操作")
except Exception as e:
logger.error(f"同步过程中发生错误: {str(e)}")
def start_sync(self, interval: int = 5):
logger.info(f"开始监控目录: {self.source_path}")
logger.info(f"目标同步目录: {self.dest_path}")
try:
while True:
self.sync_directory()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("正在停止同步...")
logger.info("同步已停止")
if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }
alist_api = AListAPI(
config["alist_config"]["base_url"],
config["alist_config"]["username"],
config["alist_config"]["password"]
)
sync_manager = AListSyncManager(
alist_api,
config["source_path"],
config["dest_path"]
)
sync_manager.start_sync(config["sync_interval"])`
Suggested solution / 实现思路
这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能
Additional context / 附件
No response
问问gpt为什么失败了试试?
问问gpt为什么失败了试试? GPT只会一直错
直接用rclone不就行了
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码
复制:rclone -P --timeout 0 --retries 5244 --transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244 --transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码
复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗
你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗
你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令
嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗
你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令
嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多
这年头要啥图形化,命令行多舒服
直接用rclone不就行了
alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务
api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float
还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧
你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径
好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗
你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令
嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多
这年头要啥图形化,命令行多舒服
哈哈,稍微优雅一点,{狗头}
Please make sure of the following things
- [x] I have read the 文档。[x] I'm sure there are no duplicate issues or discussions.[x] I'm sure this feature is not implemented.[x] I'm sure it's a reasonable 和 popular requirement.
Description of the feature / 需求描述
`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)
@DataClass class TaskInfo: name: str status: str progress: float
class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()
def _login(self) -> None: try: response = self.session.post( f"{self.base_url}/api/auth/login", json={ "username": self.username, "password": self.password } ) response.raise_for_status() self.token = response.json()["data"]["token"] self.session.headers.update({ "Authorization": self.token, "Content-Type": "application/json" }) logger.info("成功登录 AList") except Exception as e: logger.error(f"登录失败: {str(e)}") raise def list_files(self, path: str) -> Set[str]: """获取目录下的所有文件名,强制刷新缓存""" try: response = self.session.post( f"{self.base_url}/api/fs/list", json={ "path": path, "refresh": True, "password": "", "page": 1, "per_page": 0 } ) response.raise_for_status() files = {item['name'] for item in response.json()["data"]["content"]} return files except Exception as e: logger.error(f"获取目录列表失败 {path}: {str(e)}") return set() def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool: """分批复制文件""" if not names: return True success = True for i in range(0, len(names), batch_size): batch = names[i:i + batch_size] try: response = self.session.post( f"{self.base_url}/api/fs/copy", json={ "src_dir": src_dir, "dst_dir": dst_dir, "names": batch } ) response.raise_for_status() logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}") except Exception as e: logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}") success = False return success def delete(self, path: str, names: List[str]) -> bool: """删除文件""" try: response = self.session.post( f"{self.base_url}/api/fs/remove", json={ "dir": path, "names": names } ) response.raise_for_status() logger.info(f"成功删除文件: {names}") return True except Exception as e: logger.error(f"删除文件失败 {names}: {str(e)}") return False def get_tasks(self) -> List[TaskInfo]: """获取所有正在进行的复制任务""" try: response = self.session.post( f"{self.base_url}/api/fs/task/list", json={ "page": 1, "per_page": 0, "type": "copy", "status": ["pending", "running", "copying"] } ) response.raise_for_status() data = response.json() tasks = [] for task in data.get("data", {}).get("content", []): tasks.append(TaskInfo( name=task.get("name", ""), status=task.get("status", ""), progress=float(task.get("progress", 0)) )) return tasks except requests.exceptions.JSONDecodeError: logger.debug("任务列表为空") return [] except Exception as e: logger.debug(f"获取任务状态: {str(e)}") return []class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')
def is_file_in_transfer(self, filename: str) -> bool: """检查文件是否正在传输中""" tasks = self.api.get_tasks() for task in tasks: if (task.name == filename and task.status in ["pending", "running", "copying"]): logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)") return True return False def sync_directory(self) -> None: try: logger.debug("开始检查目录...") # 获取源目录和目标目录的文件列表 source_files = self.api.list_files(self.source_path) dest_files = self.api.list_files(self.dest_path) # 找出需要添加和删除的文件 files_to_add = source_files - dest_files files_to_delete = dest_files - source_files # 处理需要添加的文件 if files_to_add: # 过滤掉正在传输的文件 files_to_copy = [f for f in files_to_add if not self.is_file_in_transfer(f)] if files_to_copy: logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}") self.api.copy(self.source_path, self.dest_path, files_to_copy) else: logger.debug("所有新文件都在传输中") # 处理需要删除的文件 if files_to_delete: logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}") self.api.delete(self.dest_path, list(files_to_delete)) if not files_to_add and not files_to_delete: logger.debug("目录已同步,无需操作") except Exception as e: logger.error(f"同步过程中发生错误: {str(e)}") def start_sync(self, interval: int = 5): logger.info(f"开始监控目录: {self.source_path}") logger.info(f"目标同步目录: {self.dest_path}") try: while True: self.sync_directory() time.sleep(interval) except KeyboardInterrupt: logger.info("正在停止同步...") logger.info("同步已停止")if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }
alist_api = AListAPI( config["alist_config"]["base_url"], config["alist_config"]["username"], config["alist_config"]["password"] ) sync_manager = AListSyncManager( alist_api, config["source_path"], config["dest_path"] ) sync_manager.start_sync(config["sync_interval"])`Suggested solution / 实现思路
这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能
Additional context / 附件
No response
看一下这个项目alist-sync
如果只是简单的调用复制,可以试一下我之前写的简单demo https://github.com/SunRain/AListCopyEnhanced/tree/master
直接用rclone不就行了
使用 WebDAV并不好用
直接用rclone不就行了
使用 WebDAV并不好用
不是直接传,是通过webdav的复制功能进行复制
直接用rclone不就行了
使用 WebDAV并不好用
不是直接传,是通过webdav的复制功能进行复制
您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync
直接用rclone不就行了
使用 WebDAV并不好用
不是直接传,是通过webdav的复制功能进行复制
您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync
目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题
直接用rclone不就行了
使用 WebDAV并不好用
不是直接传,是通过webdav的复制功能进行复制
您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync
目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题
service/alist/alistClient.py
请修改这部分代码!
直接用rclone不就行了
使用 WebDAV并不好用
不是直接传,是通过webdav的复制功能进行复制
您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync
目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题
服务/alist/alistClient.py
请修改这部分代码!
不太懂开发编译,这个要等大佬们完善了
Please make sure of the following things
- [x] I have read the documentation.[x] I'm sure there are no duplicate issues or discussions.[x] I'm sure this feature is not implemented.[x] I'm sure it's a reasonable and popular requirement.
Description of the feature / 需求描述
`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)
@DataClass class TaskInfo: name: str status: str progress: float
class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()
def _login(self) -> None: try: response = self.session.post( f"{self.base_url}/api/auth/login", json={ "username": self.username, "password": self.password } ) response.raise_for_status() self.token = response.json()["data"]["token"] self.session.headers.update({ "Authorization": self.token, "Content-Type": "application/json" }) logger.info("成功登录 AList") except Exception as e: logger.error(f"登录失败: {str(e)}") raise def list_files(self, path: str) -> Set[str]: """获取目录下的所有文件名,强制刷新缓存""" try: response = self.session.post( f"{self.base_url}/api/fs/list", json={ "path": path, "refresh": True, "password": "", "page": 1, "per_page": 0 } ) response.raise_for_status() files = {item['name'] for item in response.json()["data"]["content"]} return files except Exception as e: logger.error(f"获取目录列表失败 {path}: {str(e)}") return set() def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool: """分批复制文件""" if not names: return True success = True for i in range(0, len(names), batch_size): batch = names[i:i + batch_size] try: response = self.session.post( f"{self.base_url}/api/fs/copy", json={ "src_dir": src_dir, "dst_dir": dst_dir, "names": batch } ) response.raise_for_status() logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}") except Exception as e: logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}") success = False return success def delete(self, path: str, names: List[str]) -> bool: """删除文件""" try: response = self.session.post( f"{self.base_url}/api/fs/remove", json={ "dir": path, "names": names } ) response.raise_for_status() logger.info(f"成功删除文件: {names}") return True except Exception as e: logger.error(f"删除文件失败 {names}: {str(e)}") return False def get_tasks(self) -> List[TaskInfo]: """获取所有正在进行的复制任务""" try: response = self.session.post( f"{self.base_url}/api/fs/task/list", json={ "page": 1, "per_page": 0, "type": "copy", "status": ["pending", "running", "copying"] } ) response.raise_for_status() data = response.json() tasks = [] for task in data.get("data", {}).get("content", []): tasks.append(TaskInfo( name=task.get("name", ""), status=task.get("status", ""), progress=float(task.get("progress", 0)) )) return tasks except requests.exceptions.JSONDecodeError: logger.debug("任务列表为空") return [] except Exception as e: logger.debug(f"获取任务状态: {str(e)}") return []class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')
def is_file_in_transfer(self, filename: str) -> bool: """检查文件是否正在传输中""" tasks = self.api.get_tasks() for task in tasks: if (task.name == filename and task.status in ["pending", "running", "copying"]): logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)") return True return False def sync_directory(self) -> None: try: logger.debug("开始检查目录...") # 获取源目录和目标目录的文件列表 source_files = self.api.list_files(self.source_path) dest_files = self.api.list_files(self.dest_path) # 找出需要添加和删除的文件 files_to_add = source_files - dest_files files_to_delete = dest_files - source_files # 处理需要添加的文件 if files_to_add: # 过滤掉正在传输的文件 files_to_copy = [f for f in files_to_add if not self.is_file_in_transfer(f)] if files_to_copy: logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}") self.api.copy(self.source_path, self.dest_path, files_to_copy) else: logger.debug("所有新文件都在传输中") # 处理需要删除的文件 if files_to_delete: logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}") self.api.delete(self.dest_path, list(files_to_delete)) if not files_to_add and not files_to_delete: logger.debug("目录已同步,无需操作") except Exception as e: logger.error(f"同步过程中发生错误: {str(e)}") def start_sync(self, interval: int = 5): logger.info(f"开始监控目录: {self.source_path}") logger.info(f"目标同步目录: {self.dest_path}") try: while True: self.sync_directory() time.sleep(interval) except KeyboardInterrupt: logger.info("正在停止同步...") logger.info("同步已停止")if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }
alist_api = AListAPI( config["alist_config"]["base_url"], config["alist_config"]["username"], config["alist_config"]["password"] ) sync_manager = AListSyncManager( alist_api, config["source_path"], config["dest_path"] ) sync_manager.start_sync(config["sync_interval"])`Suggested solution / 实现思路
这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能
Additional context / 附件
No response
https://github.com/dr34m-cn/taosync