mais
mais copied to clipboard
[utils] utils em Python, R, Stata
Motivação
Hoje na limpeza de bases a equipe de dados faz uma série de passos automatizáveis, em Python e R. São passos facilmente abstraíveis para funções, que nos pouparão tempo e erros.
Proposta (a ser preenchida e expandida por tod@s)
Escrever módulo de Python e R para uso interno. Publicar em repositório novo.
Funções para:
- [ ] Checar se tabela ou partição existem no nosso storage.
Parâmetros:
Retorno:
- [x] Particionar tabela em estrutura de pastas e csvs
Parâmetros: arquivo csv ou DataFrame
Retorno: pasta em estrutura Hive com csvs, com as colunas de partição já deletadas
- [ ] Converter unidades de medida
Parâmetros: coluna num DataFrame, unidade de medida 'de' e 'para'
Retorno: DataFrame
- [ ] Retornar informações de diretórios
Parâmetros: entidade, chave primária
Retorno: DataFrame
- [ ] Retornar query/tabela com valores já substituídos pelo dicionário.
1. query = select * from rais.microdados WHERE sigla_uf = 'RJ'
2. select * from rais.dicionario
3. JOIN 1 e 2
df = read_sql_with_labels(query)
df = read_table_with_labels(dataset_id, table_id)
- Lidar com múltiplas tabelas puxadas na query.
- Lidar com queries que mudam nome de variável.
Esboço inicial para checar se tabela ou partição existem no nosso storage
.
import basedosdados as bd
def get_table_blobs(dataset_id, table_id, mode='staging', bucket_name="basedosdados"):
"""Get Storage blobs from a table given `dataset_id` and `table_id`, from from path `bucket_name`/`mode`/`dataset_id`/`table_id`
There are 5 modes:
* `raw` : should contain raw files from datasource
* `staging` : should contain pre-treated files ready to upload to BiqQuery
* `header`: should contain the header of the tables
* `auxiliary_files`: should contain auxiliary files from eache table
* `architecture`: should contain the architecture sheet of the tables
Args:
dataset_id (str): Dataset ID
table (str): Table ID
mode (str): Folder of which dataset to check [raw|staging|header|auxiliary_files|architecture]. Default to `staging`.
bucket_name (str): Storage bucket where data is, defaults to `basedosdados`
Returns:
list: List of Storage blobs.
"""
tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
blobs = list(
tb.client["storage_staging"]
.bucket(bucket_name)
.list_blobs(prefix=f"{mode}/{tb.dataset_id}/{tb.table_id}/")
)
return [b.name for b in blobs]
Esboço inicial para particionar tabela em estrutura de pastas e csvs
.
Acho melhor só receber dataframe, pq as vezes o csv pode vir bugado, com separador não padronizado, ou encoding ...
import os
from pathlib import Path
import pandas as pd
def to_partitions(data, partition_columns, savepath):
"""Save data in to hive patitions schema, given a dataframe and a list of partition columns.
Args:
data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
partition_columns (list): List of columns to be used as partitions.
savepath (str, pathlib.PosixPath): folder path to save the partitions
Exemple:
data = {
"ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
"mes": [1, 2, 3, 4, 5, 6, 6,9],
"sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
"dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
}
to_partitions(
data=pd.DataFrame(data),
partition_columns=['ano','mes','sigla_uf'],
savepath='partitions/'
)
"""
if isinstance(data, (pd.core.frame.DataFrame)):
savepath = Path(savepath)
unique_combinations = (
data[partition_columns]
.drop_duplicates(subset=partition_columns)
.to_dict(orient="records")
)
for filter_combination in unique_combinations:
patitions_values = [
f"{partition}={value}"
for partition, value in filter_combination.items()
]
filter_save_path = Path(savepath / "/".join(patitions_values))
filter_save_path.mkdir(parents=True, exist_ok=True)
df_filter = data.loc[
data[filter_combination.keys()]
.isin(filter_combination.values())
.all(axis=1),
:,
]
df_filter = df_filter.drop(columns=partition_columns)
df_filter.to_csv(filter_save_path / "data.csv", index=False)
else:
raise (BaseException("Data need to be a pandas DataFrame"))
@rdahis acho que podemos fechar essa aqui né? Algumas dessas funções já estão em pipelines. Acho que a única que seria interessante ter uma issue ainda seria "Retornar query/tabela com valores já substituídos pelo dicionário" parece que seria bem útil pro usuário final
Você que manda, de acordo com o que for útil generalizar para a equipe de dados. Se usarem mais em pipelines, já fica lá. Mas sim, poder baixar dados com valores substituídos pelo dicionário ajudaria bastante o usuário.