FATE icon indicating copy to clipboard operation
FATE copied to clipboard

FATE2.0读取外部数据源

Open FancyXun opened this issue 1 year ago • 8 comments

请教个问题,目前官方给的例子都是基于内置的数据进行训练预测的,在实际生产中,我们会从数据库,比如Mysql读取数据,请问fate2.0支持从外部读取数据吗?我看有个table bind,但不知道具体用法,这个怎么把外部的数据库信息给到fate呢

FancyXun avatar Jul 17 '24 09:07 FancyXun

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配? @zhihuiwan

sagewe avatar Jul 18 '24 02:07 sagewe

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

sagewe avatar Jul 18 '24 02:07 sagewe

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配? @zhihuiwan

感谢回复,我看现在官方默认upload 数据的时候,里面配置对应的数据路径(csv文件),但是这个路径必须存在在fate flow里面,这样Job里面会出现tranfromer这些,我理解就是你说的转成imdb格式。

FancyXun avatar Jul 18 '24 03:07 FancyXun

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

FancyXun avatar Jul 18 '24 03:07 FancyXun

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

是的,这个后续版本会有支持,是我们推进容器化支持的一部分

sagewe avatar Jul 18 '24 03:07 sagewe

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

FancyXun avatar Jul 18 '24 06:07 FancyXun

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单: https://github.com/FederatedAI/FATE/blob/0e36edc936394331dbefa868eec236808fadbd62/python/fate/components/core/component_desc/artifacts/data/_table.py#L46-L53

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

  • 好处是对引擎透明
  • 缺点是如果底层引擎有更好的实现无法发挥,但是可以根据未来需要通过简单的接口重构克服

sagewe avatar Jul 19 '24 02:07 sagewe

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单:

https://github.com/FederatedAI/FATE/blob/0e36edc936394331dbefa868eec236808fadbd62/python/fate/components/core/component_desc/artifacts/data/_table.py#L46-L53

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

  • 好处是对引擎透明
  • 缺点是如果底层引擎有更好的实现无法发挥,但是可以根据未来需要通过简单的接口重构克服

非常感谢你提供的思路,这个我后续可以看看如何实现,目前使用了一个比较简单快捷的办法https://github.com/FederatedAI/FATE-Flow/pull/574

FancyXun avatar Jul 22 '24 08:07 FancyXun

This issue has been marked as stale because it has been open for 365 days with no activity. If this issue is still relevant or if there is new information, please feel free to update or reopen it.

github-actions[bot] avatar Aug 01 '25 03:08 github-actions[bot]

This issue was closed because it has been inactive for 1 days since being marked as stale. If this issue is still relevant or if there is new information, please feel free to update or reopen it.

github-actions[bot] avatar Aug 03 '25 03:08 github-actions[bot]