fastcrud icon indicating copy to clipboard operation
fastcrud copied to clipboard

Query OR conditional between multiple field

Open Justinianus2001 opened this issue 1 year ago • 5 comments

I have a table called 'Client' as shown below:

class Client(Base):
    __tablename__ = "client"

    id: Mapped[int] = mapped_column("id", autoincrement=True, nullable=False, unique=True, primary_key=True, init=False)

    name: Mapped[str] = mapped_column(String(50))
    contact: Mapped[str] = mapped_column(String(50))
    phone: Mapped[str] = mapped_column(String(20))
    email: Mapped[str] = mapped_column(String(50))

I want to perform a search across all fields in this table using the parameter 'keyword', for example:

name__ilike = f'%{keyword}%' OR contact__ilike = f'%{keyword}%' OR phone__ilike = f'%{keyword}%' OR email__ilike = f'%{keyword}%'

In my research of the documentation, I only found a way to query an OR condition within a single field, such as price__or={'lt': 5, 'gt': 20}. Please help me find a solution. Thank you very much!

Justinianus2001 avatar Jul 03 '24 09:07 Justinianus2001

Sadly, this particular case is not currently supported. Best you could do is inherit from FastCRUD and add your custom method (in that case feel free to open a PR with that solution included)., If you're also using our router, the EndpointCreator must be overriden too. We want to simplify this approach soon.

JakNowy avatar Jul 04 '24 09:07 JakNowy

I inherited the FastCRUD and added search prototype (OR conditions between multiple columns) on top of the normal (AND) filters. It works by specifying the adding "search_columns" list and "search_query" as keyword argument to the 'get_multi_joined'. This will only search through columns in the main model.

I extended the JoinConfig class also with "search_columns", when specifying search columns over there the new '_parse_or_filters' function will automatically also search the columns for the joined models as well.

Maybe this example can be more generalized to not only use it for search (ILIKE) but also to other OR scenarios. Anyway this works really good for my automatically CRUD generated endpoint that feeds a Datatables JS front-end table with sorting, searching, functionality. I will do some more testing and might add some more features. Maybe later I could do a PR for a more generic OR approach. Let me know if you have any feedback. :-)


## Create a FastCRUD
fast_crud = FastCRUDWithORFilters(ApiKeys)

kwargs = {
    "db": db,
    "schema_to_select": ApiKeyRead,
    "search_columns": ["name", "secret_key"],
    "search_query": global_search_value, # Variable from the front-end search field
    "joins_config": [JoinConfig(
                model=User,
                join_on=ApiKey.user_id == User.id,
                join_prefix=None,  # "user_",
                schema_to_select=UserReadName,
                join_type="left",
                alias=None,
                filters=None,
                relationship_type=None,
                search_columns=["first_name", "last_name"],
            )
        ]
}

result = await fast_crud.get_multi_joined(**kwargs)
from fastcrud import FastCRUD

from typing import Any, TypeVar, Union, Optional, Any

from pydantic import BaseModel, ValidationError
from sqlalchemy import ColumnElement
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql.selectable import Select
from sqlalchemy import select, or_

from fastcrud.crud.helper import (
    _auto_detect_join_condition,
    _nest_join_data,
    _nest_multi_join_data,
    _handle_null_primary_key_multi_join,
    _extract_matching_columns_from_schema,
    JoinConfig as JoinConfigBase,
)

# Extending JoinConfig with search_columns per joined model
class JoinConfig(JoinConfigBase):
    search_columns: Optional[list[str]] = None


ModelType = TypeVar("ModelType", bound=DeclarativeBase)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
UpdateSchemaInternalType = TypeVar("UpdateSchemaInternalType", bound=BaseModel)
DeleteSchemaType = TypeVar("DeleteSchemaType", bound=BaseModel)


class FastCRUDWithORFilters(FastCRUD):
    async def get_multi_joined(
        self,
        db: AsyncSession,
        schema_to_select: Optional[type[BaseModel]] = None,
        join_model: Optional[type[ModelType]] = None,
        join_on: Optional[Any] = None,
        join_prefix: Optional[str] = None,
        join_schema_to_select: Optional[type[BaseModel]] = None,
        join_type: str = "left",
        alias: Optional[AliasedClass[Any]] = None,
        join_filters: Optional[dict] = None,
        nest_joins: bool = False,
        offset: int = 0,
        limit: Optional[int] = 100,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        return_as_model: bool = False,
        joins_config: Optional[list[JoinConfig]] = None,
        return_total_count: bool = True,
        relationship_type: Optional[str] = None,
        ## Added search_query and search_columns (for main model)
        search_query: Optional[str] = None,
        search_columns: Optional[list[str]] = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Fetch multiple records with a join on another model, allowing for pagination, optional sorting, and model conversion.
        For filtering details see:
        https://igorbenav.github.io/fastcrud/advanced/crud/#advanced-filters

        Args:
            db: The SQLAlchemy async session.
            schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
            join_model: The model to join with.
            join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
            join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
            join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
            join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
            alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
            join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
            nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
            offset: The offset (number of records to skip) for pagination.
            limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
            sort_columns: A single column name or a list of column names on which to apply sorting.
            sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders corresponding to the columns in sort_columns. If not provided, defaults to 'asc' for each column.
            return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
            joins_config: List of JoinConfig instances for specifying multiple joins. Each instance defines a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and join type.
            return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
            relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
            **kwargs: Filters to apply to the primary query, including advanced comparison operators for refined searching.

        Returns:
            A dictionary containing the fetched rows under 'data' key and total count under 'total_count'.

        Raises:
            ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.
                        Also if both 'joins_config' and any of the single join parameters are provided or none of 'joins_config' and 'join_model' is provided.


        """
        if joins_config and (
            join_model or join_prefix or join_on or join_schema_to_select or alias or relationship_type
        ):
            raise ValueError("Cannot use both single join parameters and joins_config simultaneously.")
        elif not joins_config and not join_model:
            raise ValueError("You need one of join_model or joins_config.")

        if (limit is not None and limit < 0) or offset < 0:
            raise ValueError("Limit and offset must be non-negative.")

        if relationship_type is None:
            relationship_type = "one-to-one"

        primary_select = _extract_matching_columns_from_schema(model=self.model, schema=schema_to_select)
        stmt: Select = select(*primary_select)

        join_definitions = joins_config if joins_config else []
        if join_model:
            join_definitions.append(
                JoinConfig(
                    model=join_model,
                    join_on=join_on or _auto_detect_join_condition(self.model, join_model),
                    join_prefix=join_prefix,
                    schema_to_select=join_schema_to_select,
                    join_type=join_type,
                    alias=alias,
                    filters=join_filters,
                    relationship_type=relationship_type,
                )
            )

        stmt = self._prepare_and_apply_joins(stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins)

        primary_filters = self._parse_filters(**kwargs)
        if primary_filters:
            stmt = stmt.filter(*primary_filters)

        ### Custom search over multiple columns and multiple models
        if search_query and (search_columns or any(join.search_columns for join in join_definitions)):
            or_conditions = self._parse_or_filters(search_query, search_columns, join_definitions)
            if or_conditions:
                stmt = stmt.filter(or_(*or_conditions))

        if sort_columns:
            stmt = self._apply_sorting(stmt, sort_columns, sort_orders)

        if offset:
            stmt = stmt.offset(offset)
        if limit is not None:
            stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data: list[Union[dict, BaseModel]] = []

        for row in result.mappings().all():
            row_dict = dict(row)

            if nest_joins:
                row_dict = _nest_join_data(
                    data=row_dict,
                    join_definitions=join_definitions,
                )

            if return_as_model:
                if schema_to_select is None:
                    raise ValueError("schema_to_select must be provided when return_as_model is True.")
                try:
                    model_instance = schema_to_select(**row_dict)
                    data.append(model_instance)
                except ValidationError as e:
                    raise ValueError(f"Data validation error for schema {schema_to_select.__name__}: {e}")
            else:
                data.append(row_dict)

        if nest_joins and any(join.relationship_type == "one-to-many" for join in join_definitions):
            nested_data = _nest_multi_join_data(
                base_primary_key=self._primary_keys[0].name,
                data=data,
                joins_config=join_definitions,
                return_as_model=return_as_model,
                schema_to_select=schema_to_select if return_as_model else None,
                nested_schema_to_select={
                    (join.join_prefix.rstrip("_") if join.join_prefix else join.model.__name__): join.schema_to_select
                    for join in join_definitions
                    if join.schema_to_select
                },
            )
        else:
            nested_data = _handle_null_primary_key_multi_join(data, join_definitions)

        response: dict[str, Any] = {"data": nested_data}

        if return_total_count:
            total_count: int = await self.count(db=db, joins_config=joins_config, **kwargs)
            response["total_count"] = total_count

        return response

    # Custom parse function for the search functionality
    def _parse_or_filters(
        self, search_query: str, main_search_columns: list[str], join_definitions: list[JoinConfig]
    ) -> list[ColumnElement]:
        or_conditions = []

        # Handle main model columns
        for column_name in main_search_columns:
            column = getattr(self.model, column_name, None)
            if column is not None:
                or_conditions.append(column.ilike(f"%{search_query}%"))

        # Handle joined model columns
        for join in join_definitions:
            if join.search_columns:
                model = join.alias or join.model
                for column_name in join.search_columns:
                    column = getattr(model, column_name, None)
                    if column is not None:
                        if join.join_prefix:
                            column = column.label(f"{join.join_prefix}{column_name}")
                        or_conditions.append(column.ilike(f"%{search_query}%"))

        return or_conditions

mick-net avatar Jul 16 '24 08:07 mick-net

Not a permanent solution, but just dumping this info for self-reference, and maybe as inspiration for next releases.

Besides an OR filter for a global search in the specified columns in my previous comment, I've extended the get_multi and get_multi_joined to also support an custom OR filter that takes a list with both the column and value as OR filters.

Maybe the search OR filters and this new custom_or_filter should be refactored into the JoinsConfig, but this works good for now.

In the example below I want to filter on rows that are either from the current user, OR the row should have the 'shared=True'.

# List of tuples with column (string) and value as OR filters)
kwargs["custom_or_filter"] = [("created_by_id", user_id), ("shared", True)]

result = await fast_crud.get_multi_joined(**kwargs)
from fastcrud import FastCRUD

from typing import Any, TypeVar, Union, Optional, Any

from pydantic import BaseModel, ValidationError
from sqlalchemy import ColumnElement
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql.selectable import Select
from sqlalchemy import select, or_, func

from fastcrud.crud.helper import (
    _auto_detect_join_condition,
    _nest_join_data,
    _nest_multi_join_data,
    _handle_null_primary_key_multi_join,
    _extract_matching_columns_from_schema,
    JoinConfig as JoinConfigBase,
)


class JoinConfig(JoinConfigBase):
    search_columns: Optional[list[str]] = None


ModelType = TypeVar("ModelType", bound=DeclarativeBase)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
UpdateSchemaInternalType = TypeVar("UpdateSchemaInternalType", bound=BaseModel)
DeleteSchemaType = TypeVar("DeleteSchemaType", bound=BaseModel)


class FastCRUDWithORFilters(FastCRUD):
    async def get_multi_joined(
        self,
        db: AsyncSession,
        schema_to_select: Optional[type[BaseModel]] = None,
        join_model: Optional[type[ModelType]] = None,
        join_on: Optional[Any] = None,
        join_prefix: Optional[str] = None,
        join_schema_to_select: Optional[type[BaseModel]] = None,
        join_type: str = "left",
        alias: Optional[AliasedClass[Any]] = None,
        join_filters: Optional[dict] = None,
        nest_joins: bool = False,
        offset: int = 0,
        limit: Optional[int] = 100,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        return_as_model: bool = False,
        joins_config: Optional[list[JoinConfig]] = None,
        return_total_count: bool = True,
        relationship_type: Optional[str] = None,
        search_query: Optional[str] = None,
        search_columns: Optional[list[str]] = None,
        custom_or_filter: Optional[list[tuple[str, Any]]] = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Fetch multiple records with a join on another model, allowing for pagination, optional sorting, and model conversion.
        For filtering details see:
        https://igorbenav.github.io/fastcrud/advanced/crud/#advanced-filters

        Args:
            db: The SQLAlchemy async session.
            schema_to_select: Pydantic schema for selecting specific columns from the primary model. Required if `return_as_model` is True.
            join_model: The model to join with.
            join_on: SQLAlchemy Join object for specifying the ON clause of the join. If None, the join condition is auto-detected based on foreign keys.
            join_prefix: Optional prefix to be added to all columns of the joined model. If None, no prefix is added.
            join_schema_to_select: Pydantic schema for selecting specific columns from the joined model.
            join_type: Specifies the type of join operation to perform. Can be "left" for a left outer join or "inner" for an inner join.
            alias: An instance of `AliasedClass` for the join model, useful for self-joins or multiple joins on the same model. Result of `aliased(join_model)`.
            join_filters: Filters applied to the joined model, specified as a dictionary mapping column names to their expected values.
            nest_joins: If True, nested data structures will be returned where joined model data are nested under the join_prefix as a dictionary.
            offset: The offset (number of records to skip) for pagination.
            limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows. Note that in order to use `limit=None`, you'll have to provide a custom endpoint to facilitate it, which you should only do if you really seriously want to allow the user to get all the data at once.
            sort_columns: A single column name or a list of column names on which to apply sorting.
            sort_orders: A single sort order ('asc' or 'desc') or a list of sort orders corresponding to the columns in sort_columns. If not provided, defaults to 'asc' for each column.
            return_as_model: If True, converts the fetched data to Pydantic models based on schema_to_select. Defaults to False.
            joins_config: List of JoinConfig instances for specifying multiple joins. Each instance defines a model to join with, join condition, optional prefix for column names, schema for selecting specific columns, and join type.
            return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
            relationship_type: Specifies the relationship type, such as 'one-to-one' or 'one-to-many'. Used to determine how to nest the joined data. If None, uses one-to-one.
            **kwargs: Filters to apply to the primary query, including advanced comparison operators for refined searching.

        Returns:
            A dictionary containing the fetched rows under 'data' key and total count under 'total_count'.

        Raises:
            ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.
                        Also if both 'joins_config' and any of the single join parameters are provided or none of 'joins_config' and 'join_model' is provided.


        """
        if joins_config and (
            join_model or join_prefix or join_on or join_schema_to_select or alias or relationship_type
        ):
            raise ValueError("Cannot use both single join parameters and joins_config simultaneously.")
        elif not joins_config and not join_model:
            raise ValueError("You need one of join_model or joins_config.")

        if (limit is not None and limit < 0) or offset < 0:
            raise ValueError("Limit and offset must be non-negative.")

        if relationship_type is None:
            relationship_type = "one-to-one"

        primary_select = _extract_matching_columns_from_schema(model=self.model, schema=schema_to_select)
        stmt: Select = select(*primary_select)

        join_definitions = joins_config if joins_config else []
        if join_model:
            join_definitions.append(
                JoinConfig(
                    model=join_model,
                    join_on=join_on or _auto_detect_join_condition(self.model, join_model),
                    join_prefix=join_prefix,
                    schema_to_select=join_schema_to_select,
                    join_type=join_type,
                    alias=alias,
                    filters=join_filters,
                    relationship_type=relationship_type,
                )
            )

        stmt = self._prepare_and_apply_joins(stmt=stmt, joins_config=join_definitions, use_temporary_prefix=nest_joins)

        primary_filters = self._parse_filters(**kwargs)
        if primary_filters:
            stmt = stmt.filter(*primary_filters)

        # Apply custom OR filter
        if custom_or_filter:
            or_conditions = [getattr(self.model, col) == val for col, val in custom_or_filter]
            stmt = stmt.filter(or_(*or_conditions))

        # Custom search over multiple columns
        if search_query and (search_columns or any(join.search_columns for join in join_definitions)):
            or_conditions = self._parse_or_filters(search_query, search_columns, join_definitions)
            if or_conditions:
                stmt = stmt.filter(or_(*or_conditions))

        if sort_columns:
            stmt = self._apply_sorting(stmt, sort_columns, sort_orders)

        if offset:
            stmt = stmt.offset(offset)
        if limit is not None:
            stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data: list[Union[dict, BaseModel]] = []

        for row in result.mappings().all():
            row_dict = dict(row)

            if nest_joins:
                row_dict = _nest_join_data(
                    data=row_dict,
                    join_definitions=join_definitions,
                )

            if return_as_model:
                if schema_to_select is None:
                    raise ValueError("schema_to_select must be provided when return_as_model is True.")
                try:
                    model_instance = schema_to_select(**row_dict)
                    data.append(model_instance)
                except ValidationError as e:
                    raise ValueError(f"Data validation error for schema {schema_to_select.__name__}: {e}")
            else:
                data.append(row_dict)

        if nest_joins and any(join.relationship_type == "one-to-many" for join in join_definitions):
            nested_data = _nest_multi_join_data(
                base_primary_key=self._primary_keys[0].name,
                data=data,
                joins_config=join_definitions,
                return_as_model=return_as_model,
                schema_to_select=schema_to_select if return_as_model else None,
                nested_schema_to_select={
                    (join.join_prefix.rstrip("_") if join.join_prefix else join.model.__name__): join.schema_to_select
                    for join in join_definitions
                    if join.schema_to_select
                },
            )
        else:
            nested_data = _handle_null_primary_key_multi_join(data, join_definitions)

        response: dict[str, Any] = {"data": nested_data}

        if return_total_count:
            total_count: int = await self.count(db=db, joins_config=joins_config, **kwargs)
            response["total_count"] = total_count

        return response

    async def get_multi(
        self,
        db: AsyncSession,
        offset: int = 0,
        limit: Optional[int] = 100,
        schema_to_select: Optional[type[BaseModel]] = None,
        sort_columns: Optional[Union[str, list[str]]] = None,
        sort_orders: Optional[Union[str, list[str]]] = None,
        return_as_model: bool = False,
        return_total_count: bool = True,
        search_query: Optional[str] = None,
        search_columns: Optional[list[str]] = None,
        custom_or_filter: Optional[list[tuple[str, Any]]] = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """
        Fetches multiple records based on filters, supporting sorting, pagination, and search functionality.
        For filtering details see:
        https://igorbenav.github.io/fastcrud/advanced/crud/#advanced-filters

        Args:
            db: The database session to use for the operation.
            offset: Starting index for records to fetch, useful for pagination.
            limit: Maximum number of records to fetch in one call. Use `None` for "no limit", fetching all matching rows.
            schema_to_select: Optional Pydantic schema for selecting specific columns. Required if `return_as_model` is True.
            sort_columns: Column names to sort the results by.
            sort_orders: Corresponding sort orders ('asc', 'desc') for each column in sort_columns.
            return_as_model: If True, returns data as instances of the specified Pydantic model.
            return_total_count: If True, also returns the total count of rows with the selected filters. Useful for pagination.
            search_query: Optional search query string to filter results across specified columns.
            search_columns: List of column names to apply the search query to.
            **kwargs: Filters to apply to the query, including advanced comparison operators for more detailed querying.

        Returns:
            A dictionary containing 'data' with fetched records and 'total_count' indicating the total number of records matching the filters.

        Raises:
            ValueError: If limit or offset is negative, or if schema_to_select is required but not provided or invalid.
        """
        if (limit is not None and limit < 0) or offset < 0:
            raise ValueError("Limit and offset must be non-negative.")

        stmt = await self.select(
            schema_to_select=schema_to_select,
            sort_columns=sort_columns,
            sort_orders=sort_orders,
            **kwargs,
        )

        # Apply custom OR filter
        if custom_or_filter:
            or_conditions = [getattr(self.model, col) == val for col, val in custom_or_filter]
            stmt = stmt.filter(or_(*or_conditions))

        # Apply search functionality
        if search_query and search_columns:
            search_conditions = [
                getattr(self.model, column).ilike(f"%{search_query}%")
                for column in search_columns
                if hasattr(self.model, column)
            ]
            if search_conditions:
                stmt = stmt.filter(or_(*search_conditions))

        if offset:
            stmt = stmt.offset(offset)
        if limit is not None:
            stmt = stmt.limit(limit)

        result = await db.execute(stmt)
        data = [dict(row) for row in result.mappings()]

        response: dict[str, Any] = {"data": data}

        if return_total_count:
            # For accurate count with search, we need to apply the same filters
            count_stmt = select(func.count()).select_from(self.model)
            if kwargs:
                count_stmt = count_stmt.filter(*self._parse_filters(**kwargs))
            if custom_or_filter:
                count_stmt = count_stmt.filter(or_(*or_conditions))
            if search_query and search_columns:
                count_stmt = count_stmt.filter(or_(*search_conditions))
            total_count = await db.scalar(count_stmt)
            response["total_count"] = total_count

        if return_as_model:
            if not schema_to_select:
                raise ValueError("schema_to_select must be provided when return_as_model is True.")
            try:
                model_data = [schema_to_select(**row) for row in data]
                response["data"] = model_data
            except ValidationError as e:
                raise ValueError(f"Data validation error for schema {schema_to_select.__name__}: {e}")

        return response

    # Custom parse function for the search functionality
    def _parse_or_filters(
        self, search_query: str, main_search_columns: list[str], join_definitions: list[JoinConfig]
    ) -> list[ColumnElement]:
        or_conditions = []

        # Handle main model columns
        for column_name in main_search_columns:
            column = getattr(self.model, column_name, None)
            if column is not None:
                or_conditions.append(column.ilike(f"%{search_query}%"))

        # Handle joined model columns
        for join in join_definitions:
            if join.search_columns:
                model = join.alias or join.model
                for column_name in join.search_columns:
                    column = getattr(model, column_name, None)
                    if column is not None:
                        if join.join_prefix:
                            column = column.label(f"{join.join_prefix}{column_name}")
                        or_conditions.append(column.ilike(f"%{search_query}%"))

        return or_conditions

mick-net avatar Sep 12 '24 14:09 mick-net

Meh, I was lazy - basically did the following:

class FastCrudWithOrFilters(FastCRUD):
    def _parse_filters(
        self, model: Optional[Union[type[ModelType], AliasedClass]] = None, **kwargs
    ) -> list[ColumnElement]:
        model = model or self.model
        kwargs_rest = {}
        or_filters = []
        for key, value in kwargs.items():
            if key.startswith("__or"):
                or_filters_parsed = super()._parse_filters(model, **value)
                or_filters.append(or_(*or_filters_parsed))
            else:
                kwargs_rest[key] = value

        return super()._parse_filters(model, **kwargs_rest) + or_filters

Which allowed me to execute the call like this:

# List of tuples with column (string) and value as OR filters)
kwargs["__or_whatever"] = { "created_by_id": user_id, "shared": True }

result = await fast_crud.get_multi_joined(**kwargs)

And more "or" filters can be added to form an AND(expr, expr, .. OR(expr, expr, ...) , OR(), ...).

Not pretty - but works.

I can send a PR for this if this makes sense.

yoadsn avatar Oct 06 '24 17:10 yoadsn

Thank you @yoadsn, it works well for my issue. I hope this will be included in the latest official release.

Justinianus2001 avatar Oct 15 '24 07:10 Justinianus2001

Fixed in #218

igorbenav avatar May 17 '25 14:05 igorbenav