mongo-arrow icon indicating copy to clipboard operation
mongo-arrow copied to clipboard

Schema questions for pymongoarrow when converting from pymongo

Open covatic-john opened this issue 1 year ago • 3 comments

we have 3 mongo collections which hold a permission object all 3 slightly different in structure.

"permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "pushNotifications": "always",
                        "location": "foreground"
                    }
                ],
                "permissions": {
                    "geolocation": "prompt"
                },
                "permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "location": "foreground"
                    },
                    {
                        "pushNotifications": "always"
                    }
                ],

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype())

then using Fastparquet as the engine write to parquet with the output like this

[{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

I am having issues when converting to use pymongoarrow. if I set the schema object as "permissions": pa.list_(pa.string()), then I get null/None, I have tried using ps.struct but then get empty values for the items that are missing in the structure.

currently my project in my query is

'permissions': {
                    '$map': {
                        'input': '$os.permissions',
                        'as': 'permission',
                        'in': {
                            '$function': {
                                'body': 'function(perm) { return JSON.stringify(perm); }',
                                'args': [
                                    '$$permission'
                                ],
                                'lang': 'js'
                            }
                        }
                    }
                },

with a schema element of "permissions": pa.list_(pa.string()), but then need to convert the column with df['permissions'] = df['permissions'].apply(list).astype(str).str.replace("'", "").str.replace('"', "'")

there must be an easier way to deal with these json objects as string. ultimately these are ending up in Redshift so can be parsed in queries. Any help or suggestions for something I thought would be quite simple.

3 days messing with mongo data and converting a migration to pymongoarrow. the other collections have been a breeze and the memory consumption has come down and have a speed improvement.

John

covatic-john avatar Oct 02 '24 12:10 covatic-john

Thank you for the question! Tracking here: https://jira.mongodb.org/browse/ARROW-253

aclark4life avatar Oct 02 '24 14:10 aclark4life

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype()) then using Fastparquet as the engine write to parquet with the output like this [{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

@covatic-john Can you please explain in more detail how you are doing the transformation with PyMongo described above? Thank you

aclark4life avatar Oct 04 '24 18:10 aclark4life

morning,

my original pymongo query is

    query = [
        {
            "$match": {
                "_id": {
                    "$gte": ObjectId.from_datetime(start_date),
                    "$lt": ObjectId.from_datetime(start_date + timedelta(hours=12)),
                },
                "client_id": ObjectId(client_id),
            }
        },
        {"$addFields": {"domain": {"$arrayElemAt": [{"$split": ["$data.href", "/"]}, 2]}}},
        {
            "$project": {
                "_id": 0,
                "analytics_id": "$_id",
                "framework_id": 1,
                "client_id": 1,
                "domain": "$domain",
                "primary_directory": "$secondary",
                "app_version": "$originator.app_version",
                "report_date": "$data.timestamp.ts",
                "collation_date": "$data.timestamp.ts",
                "os": "$inferred.os",
                "acorn_code": "$data.home.acorn_code",
                "lives_in": "$data.home.lives_in",
                "permissions": "$data.permissions",
               ....
            }
        },
    ]
    analytics_list = list(db.browser_device_data.aggregate(query, batchSize=5000, allowDiskUse=True))
    return analytics_list

the before writing to parquet file I just converted the dataframe columns to the correct types I wanted in the parquet before writing out.

def convert_df_types(df):
    """Convert DataFrame columns to the appropriate data types."""
    column_names = list(df.columns.values)
    columns_not_string = [
        "collation_date",
        "framework_key",
        "client_key",
        "brand_key",
        "home_fallback",
        "acorn_code",
        "sei_version",
        "memory",
        "screen_width",
        "screen_height",
        "total_consumption_count",
        "total_consumption_duration",
    ]
    # Create sets of a,b
    setA = set(column_names)
    setB = set(columns_not_string)
    # Get new set with elements that are only in a, but not in b
    onlyInA = setA.difference(setB)

    if "collation_date" in df.columns:
        df["collation_date"] = pd.to_datetime(df["collation_date"], errors="coerce", utc=True)
    if "client_key" in df.columns:
        df["client_key"] = df["client_key"].astype("Int64")
    if "brand_key" in df.columns:
        df["brand_key"] = df["brand_key"].astype("Int64")
    if "home_fallback" in df.columns:
        df["home_fallback"] = df["home_fallback"].astype("Int64")
    if "acorn_code" in df.columns:
        df["acorn_code"] = df["acorn_code"].astype("Int64")
    if "sei_version" in df.columns:
        df["sei_version"] = df["sei_version"].astype("Int64")
    if "memory" in df.columns:
        df["memory"] = df["memory"].astype("Float64")
    if "screen_width" in df.columns:
        df["screen_width"] = df["screen_width"].astype("Float64")
    if "screen_height" in df.columns:
        df["screen_height"] = df["screen_height"].astype("Float64")
    if "total_consumption_count" in df.columns:
        df["total_consumption_count"] = df["total_consumption_count"].astype("Int64")
    if "total_consumption_duration" in df.columns:
        df["total_consumption_duration"] = df["total_consumption_duration"].astype("Int64")
    for c in onlyInA:
        df[c] = df[c].astype(pd.StringDtype())

    return df

The parquet is then uploaded to s3 and then crawled by glue. This was legacy code not written by myself but nothing complicated. The only issue I see with my original statement is the schema I was trying which was "permissions": pa.list_(pa.string()),

this should have been

"permissions": pa.string(),

but I played around lots and final have

                "permissions": {
                    "$function": {
                        "body": "function(permissions) { return JSON.stringify(permissions); }",
                        "args": ["$os.permissions"],
                        "lang": "js"
                    }
                },

using a schema of "permissions":pa.string(),

I could not seem to get any data from

analytics_table = collection.aggregate_arrow_all(query, schema=schema, allowDiskUse=True)

without using the js functions

covatic-john avatar Oct 07 '24 06:10 covatic-john