alist icon indicating copy to clipboard operation
alist copied to clipboard

尊敬的开发大佬们,我试图用ai写了一个利用alistAPI进行不同盘之间文件同步的脚本 但是失败了

Open zhr520v opened this issue 11 months ago • 21 comments

Please make sure of the following things

  • [X] I have read the documentation.
  • [X] I'm sure there are no duplicate issues or discussions.
  • [X] I'm sure this feature is not implemented.
  • [X] I'm sure it's a reasonable and popular requirement.

Description of the feature / 需求描述

`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)

@dataclass class TaskInfo: name: str status: str progress: float

class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()

def _login(self) -> None:
    try:
        response = self.session.post(
            f"{self.base_url}/api/auth/login",
            json={
                "username": self.username,
                "password": self.password
            }
        )
        response.raise_for_status()
        self.token = response.json()["data"]["token"]
        self.session.headers.update({
            "Authorization": self.token,
            "Content-Type": "application/json"
        })
        logger.info("成功登录 AList")
    except Exception as e:
        logger.error(f"登录失败: {str(e)}")
        raise

def list_files(self, path: str) -> Set[str]:
    """获取目录下的所有文件名,强制刷新缓存"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/list",
            json={
                "path": path,
                "refresh": True,
                "password": "",
                "page": 1,
                "per_page": 0
            }
        )
        response.raise_for_status()
        files = {item['name'] for item in response.json()["data"]["content"]}
        return files
    except Exception as e:
        logger.error(f"获取目录列表失败 {path}: {str(e)}")
        return set()

def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool:
    """分批复制文件"""
    if not names:
        return True

    success = True
    for i in range(0, len(names), batch_size):
        batch = names[i:i + batch_size]
        try:
            response = self.session.post(
                f"{self.base_url}/api/fs/copy",
                json={
                    "src_dir": src_dir,
                    "dst_dir": dst_dir,
                    "names": batch
                }
            )
            response.raise_for_status()
            logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}")
        except Exception as e:
            logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}")
            success = False

    return success

def delete(self, path: str, names: List[str]) -> bool:
    """删除文件"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/remove",
            json={
                "dir": path,
                "names": names
            }
        )
        response.raise_for_status()
        logger.info(f"成功删除文件: {names}")
        return True
    except Exception as e:
        logger.error(f"删除文件失败 {names}: {str(e)}")
        return False

def get_tasks(self) -> List[TaskInfo]:
    """获取所有正在进行的复制任务"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/task/list",
            json={
                "page": 1,
                "per_page": 0,
                "type": "copy",
                "status": ["pending", "running", "copying"]
            }
        )
        response.raise_for_status()
        data = response.json()
        
        tasks = []
        for task in data.get("data", {}).get("content", []):
            tasks.append(TaskInfo(
                name=task.get("name", ""),
                status=task.get("status", ""),
                progress=float(task.get("progress", 0))
            ))
        return tasks
    except requests.exceptions.JSONDecodeError:
        logger.debug("任务列表为空")
        return []
    except Exception as e:
        logger.debug(f"获取任务状态: {str(e)}")
        return []

class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')

def is_file_in_transfer(self, filename: str) -> bool:
    """检查文件是否正在传输中"""
    tasks = self.api.get_tasks()
    for task in tasks:
        if (task.name == filename and 
            task.status in ["pending", "running", "copying"]):
            logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)")
            return True
    return False

def sync_directory(self) -> None:
    try:
        logger.debug("开始检查目录...")
        
        # 获取源目录和目标目录的文件列表
        source_files = self.api.list_files(self.source_path)
        dest_files = self.api.list_files(self.dest_path)
        
        # 找出需要添加和删除的文件
        files_to_add = source_files - dest_files
        files_to_delete = dest_files - source_files

        # 处理需要添加的文件
        if files_to_add:
            # 过滤掉正在传输的文件
            files_to_copy = [f for f in files_to_add 
                           if not self.is_file_in_transfer(f)]
            
            if files_to_copy:
                logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}")
                self.api.copy(self.source_path, self.dest_path, files_to_copy)
            else:
                logger.debug("所有新文件都在传输中")

        # 处理需要删除的文件
        if files_to_delete:
            logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}")
            self.api.delete(self.dest_path, list(files_to_delete))

        if not files_to_add and not files_to_delete:
            logger.debug("目录已同步,无需操作")

    except Exception as e:
        logger.error(f"同步过程中发生错误: {str(e)}")

def start_sync(self, interval: int = 5):
    logger.info(f"开始监控目录: {self.source_path}")
    logger.info(f"目标同步目录: {self.dest_path}")
    
    try:
        while True:
            self.sync_directory()
            time.sleep(interval)
    except KeyboardInterrupt:
        logger.info("正在停止同步...")
        logger.info("同步已停止")

if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }

alist_api = AListAPI(
    config["alist_config"]["base_url"],
    config["alist_config"]["username"],
    config["alist_config"]["password"]
)

sync_manager = AListSyncManager(
    alist_api,
    config["source_path"],
    config["dest_path"]
)

sync_manager.start_sync(config["sync_interval"])`

Suggested solution / 实现思路

这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能

Additional context / 附件

No response

zhr520v avatar Jan 07 '25 09:01 zhr520v

问问gpt为什么失败了试试?

ypq123456789 avatar Jan 07 '25 15:01 ypq123456789

问问gpt为什么失败了试试? GPT只会一直错

zhr520v avatar Jan 08 '25 01:01 zhr520v

直接用rclone不就行了

pongfcnkl avatar Jan 08 '25 02:01 pongfcnkl

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

zhr520v avatar Jan 08 '25 02:01 zhr520v

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

pongfcnkl avatar Jan 08 '25 02:01 pongfcnkl

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

zhr520v avatar Jan 08 '25 03:01 zhr520v

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码

复制:rclone -P --timeout 0 --retries 5244 --transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244 --transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

pongfcnkl avatar Jan 08 '25 03:01 pongfcnkl

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码

复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗

zhr520v avatar Jan 08 '25 03:01 zhr520v

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗

你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令

pongfcnkl avatar Jan 08 '25 03:01 pongfcnkl

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗

你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令

嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多

zhr520v avatar Jan 08 '25 03:01 zhr520v

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗

你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令

嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多

这年头要啥图形化,命令行多舒服

pongfcnkl avatar Jan 08 '25 03:01 pongfcnkl

直接用rclone不就行了

alist好像没有很方便动态检测文件变动的方法,用alistAPI操作应该会稳定点吧,至少不会重复执行传输任务

api也一样要走网盘接口的,一般限制qps速率也很稳定的 https://rclone.org/docs/#tpslimit-float

还有一点就是,rclone是不支持百度 阿里 夸克等国内网盘的,即使使用rclone,还是要通过webdav来挂载alist的形式,来操作alist上挂载的网盘的。我不太懂其中是怎么转换的,不过感觉是转了几层,应该是不如之间用alist实现同步来的好吧

你可以直接抄我的命令参数 ,吊打你自己不知道咋写问ai的同步代码 复制:rclone -P --timeout 0 --retries 5244--transfers 1 --ignore-existing --tpslimit 0.5 copy alist:源路径 alist:目标路径 同步:rclone -P --timeout 0 --retries 5244--transfers 1 --tpslimit 0.5 sync alist:源路径 alist:目标路径

好的,谢谢,,怎么实现实时的文件同步呢,只能采用时间轮询的方式吗

你api不也是时间轮询,都一样的,可以另外写油猴脚本,只要转存就向服务器发送同步命令

嗯,方案确实实现了需求,谢谢,不过还是希望开放团队能考虑加入这个功能,有图形化界面会好很多

这年头要啥图形化,命令行多舒服

哈哈,稍微优雅一点,{狗头}

zhr520v avatar Jan 08 '25 03:01 zhr520v

Please make sure of the following things

  • [x] I have read the 文档。[x] I'm sure there are no duplicate issues or discussions.[x] I'm sure this feature is not implemented.[x] I'm sure it's a reasonable 和 popular requirement.

Description of the feature / 需求描述

`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)

@DataClass class TaskInfo: name: str status: str progress: float

class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()

def _login(self) -> None:
    try:
        response = self.session.post(
            f"{self.base_url}/api/auth/login",
            json={
                "username": self.username,
                "password": self.password
            }
        )
        response.raise_for_status()
        self.token = response.json()["data"]["token"]
        self.session.headers.update({
            "Authorization": self.token,
            "Content-Type": "application/json"
        })
        logger.info("成功登录 AList")
    except Exception as e:
        logger.error(f"登录失败: {str(e)}")
        raise

def list_files(self, path: str) -> Set[str]:
    """获取目录下的所有文件名,强制刷新缓存"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/list",
            json={
                "path": path,
                "refresh": True,
                "password": "",
                "page": 1,
                "per_page": 0
            }
        )
        response.raise_for_status()
        files = {item['name'] for item in response.json()["data"]["content"]}
        return files
    except Exception as e:
        logger.error(f"获取目录列表失败 {path}: {str(e)}")
        return set()

def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool:
    """分批复制文件"""
    if not names:
        return True

    success = True
    for i in range(0, len(names), batch_size):
        batch = names[i:i + batch_size]
        try:
            response = self.session.post(
                f"{self.base_url}/api/fs/copy",
                json={
                    "src_dir": src_dir,
                    "dst_dir": dst_dir,
                    "names": batch
                }
            )
            response.raise_for_status()
            logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}")
        except Exception as e:
            logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}")
            success = False

    return success

def delete(self, path: str, names: List[str]) -> bool:
    """删除文件"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/remove",
            json={
                "dir": path,
                "names": names
            }
        )
        response.raise_for_status()
        logger.info(f"成功删除文件: {names}")
        return True
    except Exception as e:
        logger.error(f"删除文件失败 {names}: {str(e)}")
        return False

def get_tasks(self) -> List[TaskInfo]:
    """获取所有正在进行的复制任务"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/task/list",
            json={
                "page": 1,
                "per_page": 0,
                "type": "copy",
                "status": ["pending", "running", "copying"]
            }
        )
        response.raise_for_status()
        data = response.json()
        
        tasks = []
        for task in data.get("data", {}).get("content", []):
            tasks.append(TaskInfo(
                name=task.get("name", ""),
                status=task.get("status", ""),
                progress=float(task.get("progress", 0))
            ))
        return tasks
    except requests.exceptions.JSONDecodeError:
        logger.debug("任务列表为空")
        return []
    except Exception as e:
        logger.debug(f"获取任务状态: {str(e)}")
        return []

class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')

def is_file_in_transfer(self, filename: str) -> bool:
    """检查文件是否正在传输中"""
    tasks = self.api.get_tasks()
    for task in tasks:
        if (task.name == filename and 
            task.status in ["pending", "running", "copying"]):
            logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)")
            return True
    return False

def sync_directory(self) -> None:
    try:
        logger.debug("开始检查目录...")
        
        # 获取源目录和目标目录的文件列表
        source_files = self.api.list_files(self.source_path)
        dest_files = self.api.list_files(self.dest_path)
        
        # 找出需要添加和删除的文件
        files_to_add = source_files - dest_files
        files_to_delete = dest_files - source_files

        # 处理需要添加的文件
        if files_to_add:
            # 过滤掉正在传输的文件
            files_to_copy = [f for f in files_to_add 
                           if not self.is_file_in_transfer(f)]
            
            if files_to_copy:
                logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}")
                self.api.copy(self.source_path, self.dest_path, files_to_copy)
            else:
                logger.debug("所有新文件都在传输中")

        # 处理需要删除的文件
        if files_to_delete:
            logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}")
            self.api.delete(self.dest_path, list(files_to_delete))

        if not files_to_add and not files_to_delete:
            logger.debug("目录已同步,无需操作")

    except Exception as e:
        logger.error(f"同步过程中发生错误: {str(e)}")

def start_sync(self, interval: int = 5):
    logger.info(f"开始监控目录: {self.source_path}")
    logger.info(f"目标同步目录: {self.dest_path}")
    
    try:
        while True:
            self.sync_directory()
            time.sleep(interval)
    except KeyboardInterrupt:
        logger.info("正在停止同步...")
        logger.info("同步已停止")

if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }

alist_api = AListAPI(
    config["alist_config"]["base_url"],
    config["alist_config"]["username"],
    config["alist_config"]["password"]
)

sync_manager = AListSyncManager(
    alist_api,
    config["source_path"],
    config["dest_path"]
)

sync_manager.start_sync(config["sync_interval"])`

Suggested solution / 实现思路

这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能

Additional context / 附件

No response

看一下这个项目alist-sync

kuke2733 avatar Jan 19 '25 08:01 kuke2733

如果只是简单的调用复制,可以试一下我之前写的简单demo https://github.com/SunRain/AListCopyEnhanced/tree/master

SunRain avatar Feb 24 '25 15:02 SunRain

直接用rclone不就行了

使用 WebDAV并不好用

unsiao avatar Apr 05 '25 02:04 unsiao

直接用rclone不就行了

使用 WebDAV并不好用

不是直接传,是通过webdav的复制功能进行复制

pongfcnkl avatar Apr 05 '25 02:04 pongfcnkl

直接用rclone不就行了

使用 WebDAV并不好用

不是直接传,是通过webdav的复制功能进行复制

您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync

unsiao avatar Apr 05 '25 02:04 unsiao

直接用rclone不就行了

使用 WebDAV并不好用

不是直接传,是通过webdav的复制功能进行复制

您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync

目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题

zhr520v avatar Apr 05 '25 02:04 zhr520v

直接用rclone不就行了

使用 WebDAV并不好用

不是直接传,是通过webdav的复制功能进行复制

您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync

目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题

Image service/alist/alistClient.py

请修改这部分代码!

unsiao avatar Apr 05 '25 03:04 unsiao

直接用rclone不就行了

使用 WebDAV并不好用

不是直接传,是通过webdav的复制功能进行复制

您好 你并没有了解ALIST API的功能性 还请您了解一下API和 WebDAV的区别在做回答 https://alist.nn.ci/zh/guide/api/ 楼主说的同步问题还请转至另一个项目 https://github.com/dr34m-cn/taosync

目前正在使用tao-sync这个项目 还不错,能实现同步需求,不过现在发现短板出现在alist这边了 ,在同步大量文件时,比如几千个,taosync会一次性通过alist的复制api发给alist 这时候alist运行一段时间后就自己奔溃了 必须手动重启alist主程序才能访问,重试了很多次,包括在不同平台机器上试了都有这个问题,可以确定的是不是机器性能问题导致的,,这个问题出现在alist上挂载的云to云 的复制任务 ,,云to本地貌似没这个问题

Image服务/alist/alistClient.py

请修改这部分代码!

不太懂开发编译,这个要等大佬们完善了

zhr520v avatar Apr 05 '25 03:04 zhr520v

Please make sure of the following things

  • [x] I have read the documentation.[x] I'm sure there are no duplicate issues or discussions.[x] I'm sure this feature is not implemented.[x] I'm sure it's a reasonable and popular requirement.

Description of the feature / 需求描述

`import time import requests import logging from typing import List, Set, Dict from dataclasses import dataclass

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('alist_sync.log') ] ) logger = logging.getLogger(name)

@DataClass class TaskInfo: name: str status: str progress: float

class AListAPI: def init(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.token = None self.session = requests.Session() self._login()

def _login(self) -> None:
    try:
        response = self.session.post(
            f"{self.base_url}/api/auth/login",
            json={
                "username": self.username,
                "password": self.password
            }
        )
        response.raise_for_status()
        self.token = response.json()["data"]["token"]
        self.session.headers.update({
            "Authorization": self.token,
            "Content-Type": "application/json"
        })
        logger.info("成功登录 AList")
    except Exception as e:
        logger.error(f"登录失败: {str(e)}")
        raise

def list_files(self, path: str) -> Set[str]:
    """获取目录下的所有文件名,强制刷新缓存"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/list",
            json={
                "path": path,
                "refresh": True,
                "password": "",
                "page": 1,
                "per_page": 0
            }
        )
        response.raise_for_status()
        files = {item['name'] for item in response.json()["data"]["content"]}
        return files
    except Exception as e:
        logger.error(f"获取目录列表失败 {path}: {str(e)}")
        return set()

def copy(self, src_dir: str, dst_dir: str, names: List[str], batch_size: int = 10) -> bool:
    """分批复制文件"""
    if not names:
        return True

    success = True
    for i in range(0, len(names), batch_size):
        batch = names[i:i + batch_size]
        try:
            response = self.session.post(
                f"{self.base_url}/api/fs/copy",
                json={
                    "src_dir": src_dir,
                    "dst_dir": dst_dir,
                    "names": batch
                }
            )
            response.raise_for_status()
            logger.info(f"成功发起复制任务 批次 {i//batch_size + 1}: {batch}")
        except Exception as e:
            logger.error(f"发起复制任务失败 批次 {i//batch_size + 1}: {str(e)}")
            success = False

    return success

def delete(self, path: str, names: List[str]) -> bool:
    """删除文件"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/remove",
            json={
                "dir": path,
                "names": names
            }
        )
        response.raise_for_status()
        logger.info(f"成功删除文件: {names}")
        return True
    except Exception as e:
        logger.error(f"删除文件失败 {names}: {str(e)}")
        return False

def get_tasks(self) -> List[TaskInfo]:
    """获取所有正在进行的复制任务"""
    try:
        response = self.session.post(
            f"{self.base_url}/api/fs/task/list",
            json={
                "page": 1,
                "per_page": 0,
                "type": "copy",
                "status": ["pending", "running", "copying"]
            }
        )
        response.raise_for_status()
        data = response.json()
        
        tasks = []
        for task in data.get("data", {}).get("content", []):
            tasks.append(TaskInfo(
                name=task.get("name", ""),
                status=task.get("status", ""),
                progress=float(task.get("progress", 0))
            ))
        return tasks
    except requests.exceptions.JSONDecodeError:
        logger.debug("任务列表为空")
        return []
    except Exception as e:
        logger.debug(f"获取任务状态: {str(e)}")
        return []

class AListSyncManager: def init(self, alist_api: AListAPI, source_path: str, dest_path: str): self.api = alist_api self.source_path = source_path.rstrip('/') self.dest_path = dest_path.rstrip('/')

def is_file_in_transfer(self, filename: str) -> bool:
    """检查文件是否正在传输中"""
    tasks = self.api.get_tasks()
    for task in tasks:
        if (task.name == filename and 
            task.status in ["pending", "running", "copying"]):
            logger.info(f"文件 {filename} 正在传输中 (进度: {task.progress}%)")
            return True
    return False

def sync_directory(self) -> None:
    try:
        logger.debug("开始检查目录...")
        
        # 获取源目录和目标目录的文件列表
        source_files = self.api.list_files(self.source_path)
        dest_files = self.api.list_files(self.dest_path)
        
        # 找出需要添加和删除的文件
        files_to_add = source_files - dest_files
        files_to_delete = dest_files - source_files

        # 处理需要添加的文件
        if files_to_add:
            # 过滤掉正在传输的文件
            files_to_copy = [f for f in files_to_add 
                           if not self.is_file_in_transfer(f)]
            
            if files_to_copy:
                logger.info(f"发现 {len(files_to_copy)} 个新文件需要同步: {files_to_copy}")
                self.api.copy(self.source_path, self.dest_path, files_to_copy)
            else:
                logger.debug("所有新文件都在传输中")

        # 处理需要删除的文件
        if files_to_delete:
            logger.info(f"发现 {len(files_to_delete)} 个文件需要删除: {files_to_delete}")
            self.api.delete(self.dest_path, list(files_to_delete))

        if not files_to_add and not files_to_delete:
            logger.debug("目录已同步,无需操作")

    except Exception as e:
        logger.error(f"同步过程中发生错误: {str(e)}")

def start_sync(self, interval: int = 5):
    logger.info(f"开始监控目录: {self.source_path}")
    logger.info(f"目标同步目录: {self.dest_path}")
    
    try:
        while True:
            self.sync_directory()
            time.sleep(interval)
    except KeyboardInterrupt:
        logger.info("正在停止同步...")
        logger.info("同步已停止")

if name == "main": config = { "source_path": "/InfiniCloud/html保存", "dest_path": "/本地存储/html存档", "alist_config": { "base_url": "http://localhost:5244", "username": "admin", "password": "password" }, "sync_interval": 5 }

alist_api = AListAPI(
    config["alist_config"]["base_url"],
    config["alist_config"]["username"],
    config["alist_config"]["password"]
)

sync_manager = AListSyncManager(
    alist_api,
    config["source_path"],
    config["dest_path"]
)

sync_manager.start_sync(config["sync_interval"])`

Suggested solution / 实现思路

这个功能可以很方便的实现本地到云端,或者云端到本地,或者云端到另一个云端的智能同步,我觉得是很实用的一个功能,类似群晖的cloudsync,简直是轻nas神器,这样,比如我本地的文件,需要动态备份,就可以使用这个功能,或者云端的视频,需要下载到本地观看,也可以用nas执行这个操作,一旦云端网盘保存了新的分享,然后脚本就会将文件同步到本地,然后就能很方便的观看了,这个是非常实用的功能,我能力有限无法实现,希望开发者大佬们可以考虑实现一下这个功能

Additional context / 附件

No response

https://github.com/dr34m-cn/taosync

unsiao1 avatar May 16 '25 09:05 unsiao1