ODD_spark <- spark_read_parquet(sc,
path = "hdfs:///dataset/MiDAS_v4/odd.parquet",
memory = FALSE)
ODD_premier <- ODD_spark %>%
group_by(id_midas) %>%
window_order(id_midas, KDPOD) %>%
mutate(date_premier_droit = first(KDPOD)) %>%
ungroup() %>%
distinct(id_midas, KROD3, date_premier_droit) %>%
head(5)Fonctions spécifiques
La majorité des commandes dplyr fonctionnent sur un spark_data_frame avec le package sparklyr. Les divergences principales sont les suivantes :
| Fonctionnalité | tidyverse | sparklyr |
|---|---|---|
import d’un fichier .parquet |
read_parquet |
spark_read_parquet() |
| tri d’un tableau | arrange() |
window_order() ou sdf_sort() |
| opérations sur les dates | lubridate |
fonctions Hive |
| empiler des tableaux | bind_rows() |
sdf_bind_rows() |
| nombre de lignes d’un tableau | nrow() |
sdf_nrow() |
| faire pivoter un tableau | tidyr |
sdf_pivot() |
export d’un spark_data_frame |
spark_write_parquet() |
Tableau
Tri dans un groupe pour effectuer un calcul séquentiel
Tri pour une sortie :
sdf_sort(),arrange()ne fonctionne pasConcaténer les lignes (ou les colonnes
sdf_bind_cols())ODD_1 <- ODD_spark %>% filter(KDPOD <= as.Date("2017-12-31")) %>% mutate(groupe = "temoins") ODD_2 <- ODD_spark %>% filter(KDPOD >= as.Date("2021-12-31")) %>% mutate(groupe = "traites") ODD_evaluation <- sdf_bind_rows(ODD_1, ODD_2)Dédoublonner une table
droits_dans_PJC <- PJC_spark %>% sdf_distinct(id_midas, KROD3) print(head(droits_dans_PJC, 5)) PJC_dedoublonnee <- PJC_spark %>% sdf_drop_duplicates() print(head(PJC_dedoublonnee, 5))Pivot : les fonctions du packag
tidyrne fonctionnent pas sur données sparkODD_sjr_moyen <- ODD_spark %>% mutate(groupe = ifelse(KDPOD <= as.Date("2020-12-31"), "controles", "traites")) %>% sdf_pivot(groupe ~ KCRGC, fun.aggregate = list(KQCSJP = "mean") )
Statistiques
Résumé statistique :
sdf_describe(),summary()ne fonctionne pas.Dimension :
sdf_dim, la fonctionnrow()ne fonctionne pas.Quantiles approximatifs : le calcul des quantiles sur données distirbuées renvoie une approximation car toutes les données ne peuvent pas être rappatriées sur la même machine physique du fait de la volumétrie,
sdf_quantile()