mais icon indicating copy to clipboard operation
mais copied to clipboard

[utils] utils em Python, R, Stata

Open rdahis opened this issue 3 years ago • 4 comments

Motivação

Hoje na limpeza de bases a equipe de dados faz uma série de passos automatizáveis, em Python e R. São passos facilmente abstraíveis para funções, que nos pouparão tempo e erros.

Proposta (a ser preenchida e expandida por tod@s)

Escrever módulo de Python e R para uso interno. Publicar em repositório novo.

Funções para:

  • [ ] Checar se tabela ou partição existem no nosso storage.
Parâmetros: 
Retorno:
  • [x] Particionar tabela em estrutura de pastas e csvs
Parâmetros: arquivo csv ou DataFrame
Retorno: pasta em estrutura Hive com csvs, com as colunas de partição já deletadas
  • [ ] Converter unidades de medida
Parâmetros: coluna num DataFrame, unidade de medida 'de' e 'para'
Retorno: DataFrame
  • [ ] Retornar informações de diretórios
Parâmetros: entidade, chave primária
Retorno: DataFrame
  • [ ] Retornar query/tabela com valores já substituídos pelo dicionário.
1. query = select * from rais.microdados WHERE sigla_uf = 'RJ'
2. select * from rais.dicionario
3. JOIN 1 e 2

df = read_sql_with_labels(query)
df = read_table_with_labels(dataset_id, table_id)
  • Lidar com múltiplas tabelas puxadas na query.
  • Lidar com queries que mudam nome de variável.

rdahis avatar Oct 26 '21 18:10 rdahis

Esboço inicial para checar se tabela ou partição existem no nosso storage.

import basedosdados as bd

def get_table_blobs(dataset_id, table_id, mode='staging', bucket_name="basedosdados"):
    """Get Storage blobs from a table given `dataset_id` and `table_id`, from from path `bucket_name`/`mode`/`dataset_id`/`table_id`

        There are 5 modes:
        * `raw` : should contain raw files from datasource
        * `staging` : should contain pre-treated files ready to upload to BiqQuery
        * `header`: should contain the header of the tables
        * `auxiliary_files`: should contain auxiliary files from eache table
        * `architecture`: should contain the architecture sheet of the tables

    Args:
        dataset_id (str): Dataset ID
        table (str): Table ID
        mode (str):  Folder of which dataset to check [raw|staging|header|auxiliary_files|architecture]. Default to `staging`.
        bucket_name (str): Storage bucket where data is, defaults to `basedosdados`
    Returns:
        list: List of Storage blobs.
    """

    tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
    blobs = list(
        tb.client["storage_staging"]
        .bucket(bucket_name)
        .list_blobs(prefix=f"{mode}/{tb.dataset_id}/{tb.table_id}/")
    )

    return [b.name for b in blobs]

d116626 avatar Oct 26 '21 18:10 d116626

Esboço inicial para particionar tabela em estrutura de pastas e csvs. Acho melhor só receber dataframe, pq as vezes o csv pode vir bugado, com separador não padronizado, ou encoding ...

import os
from pathlib import Path
import pandas as pd


def to_partitions(data, partition_columns, savepath):
    """Save data in to hive patitions schema, given a dataframe and a list of partition columns.
    Args:
        data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
        partition_columns (list): List of columns to be used as partitions.
        savepath (str, pathlib.PosixPath): folder path to save the partitions

    Exemple:

        data = {
            "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
            "mes": [1, 2, 3, 4, 5, 6, 6,9],
            "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
            "dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
        }

        to_partitions(
            data=pd.DataFrame(data),
            partition_columns=['ano','mes','sigla_uf'],
            savepath='partitions/'
        )
    """

    if isinstance(data, (pd.core.frame.DataFrame)):

        savepath = Path(savepath)

        unique_combinations = (
            data[partition_columns]
            .drop_duplicates(subset=partition_columns)
            .to_dict(orient="records")
        )

        for filter_combination in unique_combinations:
            patitions_values = [
                f"{partition}={value}"
                for partition, value in filter_combination.items()
            ]
            filter_save_path = Path(savepath / "/".join(patitions_values))
            filter_save_path.mkdir(parents=True, exist_ok=True)

            df_filter = data.loc[
                data[filter_combination.keys()]
                .isin(filter_combination.values())
                .all(axis=1),
                :,
            ]
            df_filter = df_filter.drop(columns=partition_columns)

            df_filter.to_csv(filter_save_path / "data.csv", index=False)

    else:
        raise (BaseException("Data need to be a pandas DataFrame"))

d116626 avatar Oct 26 '21 20:10 d116626

@rdahis acho que podemos fechar essa aqui né? Algumas dessas funções já estão em pipelines. Acho que a única que seria interessante ter uma issue ainda seria "Retornar query/tabela com valores já substituídos pelo dicionário" parece que seria bem útil pro usuário final

laura-l-amaral avatar Jan 16 '24 12:01 laura-l-amaral

Você que manda, de acordo com o que for útil generalizar para a equipe de dados. Se usarem mais em pipelines, já fica lá. Mas sim, poder baixar dados com valores substituídos pelo dicionário ajudaria bastante o usuário.

rdahis avatar Jan 16 '24 23:01 rdahis