Initiation à Spark avec R en mode cluster

Au programme

  1. MiDAS : une base de données volumineuse 📚

  2. Utiliser MiDAS avec R : un défi 💭

  3. Sparklyr : l’outil ergonomique de spark en R 👨‍💻

  4. Les bonnes pratiques sur une infrastructure partagée 🖥️

  5. Optimiser la mémoire : pourquoi et comment ⏳

  6. Pour aller plus loin 💡

Un rapide tour de table 💬

MiDAS : une base de données volumineuse 📚

Qu’est-ce que MiDAS ?

Une des bases les plus volumineuses du SSP

Les administrations dont les données sont comparables à MiDAS utilisent un cluster Spark : Insee, Drees, Acoss, UNEDIC, Cnaf

▶️Le cluster spark est une solution très efficiente pour traiter des données de cette ampleur.

Concrètement, qu’est-ce que MiDAS ?

Pourquoi Spark ?

La manipulation des données MiDAS en l’état implique de nombreuses opérations de jointures qui nécessitent une puissance de calcul et un temps certains.

Où est MiDAS sur la bulle ?

Disponible dans l’espace commun (= Documents publics) : C:\Users\Public\Documents\MiDAS_parquet\Vague X


Au format parquet :

  • compression efficace des données : taux de compression de 5 à 10 par rapport au format csv

  • orienté colonnes

  • chargement efficace en mémoire des données

  • stockage partitionné des données avec write_dataset()

  • traiter des données sur disque

  • indépendant du logiciel utilisé : R, python, spark…

La documentation en ligne


Documentation en ligne

  • Dictionnaire des données

  • Fiches présentant les concepts de l’indemnisation, du retour à l’emploi

  • Exemples d’implémentation en R

  • Conseils qualité des variables

Et vous, quels sont vos usages de MiDAS ? 👁️‍🗨️

Traiter MiDAS en R : un défi 👨‍💻

Une bulle CASD

Des ressources partagées entre tous les utilsateurs simultanés :

  • 256 Go de mémoire vive (ou RAM)
  • Un processeur (ou CPU) composé de 16 coeurs

Une bulle CASD

Le disque dur, aussi appelé Hard Disk Drive (HDD), est une solution de stockage permanente :

  • les données sont conservées même après l’arrêt de l’appareil

  • l’espace de stockage est volumineux

  • mais les opérations d’écriture et de lecture ne sont pas du tout instantannées

La mémoire vive, aussi appelée RAM, se distingue de la mémoire de stockage (disque) :

  • par sa rapidité, notamment pour fournir des données au processeur pour effectuer des calculs

  • par sa volatilité (toutes les données sont perdues si l’ordinateur n’est plus alimenté)

  • par l’accès direct aux informations qui y sont stockées, quasi instantanné.

Le processeur :

  • permet d’exécuter des tâches et des programmes : convertir un fichier, exécuter un logiciel

  • est composé d’un ou de plusieurs coeurs : un coeur ne peut exécuter qu’une seule tâche à la fois. Si le processeur contient plusieurs coeurs, il peut exécuter autant de tâches en parallèle qu’il a de coeurs

  • se caractérise aussi par sa fréquence : elle est globalement proportionnelle au nombre d’opérations qu’il est capable d’effetuer par seconde.

Traiter MiDAS en R : les limites

  1. Charger les données en mémoire vive
  path_fna <- "C:/Users/Public/Documents/MiDAS_parquet/Vague 4/FNA/"
  
  PJC <- read_parquet(paste0(path_fna, "pjc.parquet"), memory = TRUE)
  ODD <- read_parquet(paste0(path_fna, "odd.parquet"), memory = TRUE)
  1. Réaliser des opérations coûteuses en ressources
jointure <- PJC %>%
  rename(KROD1 = KROD3) %>%
  left_join(ODD, by = c("id_midas", "KROD1"))
  1. Le partage des ressources de la bulle

Chaque utilisateur peut mobiliser toutes les ressouces de la bulle.

Traitement léger versus coûteux

  • les jointures

  • les opérations en group_by()

  • les opérations de tri avec arrange()

  • distinct()

    ▶️ Exécution séquentielle sur un coeur du processeur + beaucoup de mémoire vive (données temporaires)

    ▶️ Erreur “out of memory”.

Pourquoi spark ?


Solution testée Avantage Destination d’usage
Package « data.table » Calculs parallélisés pour bases < RAM

Format « parquet » +

package « arrow »

Stockage moins lourd

Chargement efficient

pour bases < RAM
DuckDB Gestionnaire de BDD Pour des bases < 100 Go
Spark

Traitements distribués plus rapides

Traitement de données volumineuses

Pour des bases > 100 Go

Un gain de temps considérable


Calcul de la durée moyenne du premier contrat pour tous les individus MiDAS Retour à l’emploi salarié des indemnisables
Classique R 4 heures Crash
Duckdb 8 minutes 3 heures seul sur la bulle
Spark 1 minute 2 minutes

Mais alors, pourquoi le cluster ? 🤔

Une bonne allocation des ressources entre utilisateurs

Et vous, quels sont vos problématiques et vos solutions ? ⚠️

Comment on fait du spark cluster avec R version courte ? ⏲️

Où est Midas, 2ème édition

Le cluster a son propre explorateur de fichiers à mettre en favori dans son navigateur : https://midares-deb11-nn-01.midares.local:9870/

Et mes bases sur la bulle ?

Il est possible de charger des bases enregistrées n’importe où sur la bulle sur HDFS : depuis vos documents personnels, depuis l’espace commun…

Un cluster de calcul

Connexion

library(sparklyr)
library(dplyr)
library(dbplyr)

conf <- spark_config()
conf["spark.driver.memory"] <- "20Go"
conf["spark.executor.memory"] <- "60Go"
conf["spark.executor.cores"] <- 4
conf["spark.executor.instances"] <- 2
cont["spark.yarn.queue"] <- "prod"
conf["spark.driver.maxResultSize"] <- 0

sc <- spark_connect(master = "yarn", config = conf)
library(sparklyr)
library(dplyr)
library(dbplyr)

conf <- spark_config()
conf["spark.driver.memory"] <- "20Go"
conf["spark.executor.memory"] <- "60Go"
conf["spark.executor.cores"] <- 4
conf["spark.executor.instances"] <- 3
cont["spark.yarn.queue"] <- "prod"
conf["spark.driver.maxResultSize"] <- 0

sc <- spark_connect(master = "yarn", config = conf)

Temps de connexion

Pour se connecter au cluster, il faut environ 5 minutes, à chaque connexion. Spark cluster n’est pas du tout adapté à des traitements légers (moins de 10 minutes).

Quizz : traitement nomal ou traitement très lourd

  • Appariement de 2 bases mensuelles de la CNAF entre elles (4 millions de lignes par base)
  • Repérage de la situation en emploi (MMO) d’un champ de bénéficiaires RSA un mois donné (2 millions de lignes)
  • Calcul de la durée d’inscription (FHS, table DE), de la durée d’indemnisation (FNA, table PJC) et du retour à l’emploi d’un champ de 2 millions de demandeurs d’emploi
  • Calcul de la durée d’indemnisation (FNA, table PJC) d’un champ de 20 millions de demandeurs d’emploi
  • Calcul du retour à l’emploi d’un champ de demandeur d’emploi en fin d’un mois donné (5 millions de DEFM)

Chargement des données en spark


### Depuis HDFS
mmo_17_df_spark <- spark_read_parquet(sc,
                                  path = "hdfs:///dataset/MiDAS_v4/mmo/mmo_2017.parquet",
                                  memory = FALSE)

### Passer un dataframe R en spark
mon_data_frame <- data.frame(c("Anna", "Paul"), c(15, 20))
mon_data_frame_spark <- copy_to(sc, "mon_data_frame")

▶️ chargement en mémoire vive couteux en temps : par défaut, memory = FALSE

Spark data frames

mmo_17_df_spark est un spark data frame (sdf) : il ne peut pas être ouvert comme un data frame R classique, il n’est pas dans la session R.

Comment voir ma table ?

Récupérer une partie de la table : pas plus de 500 lignes

une_partie_de_ma_table <- ma_table %>% 
  head(100) %>%
  collect()


une_partie_de_ma_table <- ma_table %>% 
  filter(id_midas %in% ma_liste_id_midas) %>%
  collect()

# une_partie_de_ma_table est ici un data.frame R classique que vous pouvez ouvrir!

Spark data frames

une_partie_de_ma_table est un data frame R : il peut pas être ouvert, il est dans la session R. Cela signifie qu’il se situe sur la bulle

Un cluster de calcul

Sparklyr, c’est comme dplyr


Ensuite, vous pouvez programmer avec dplyr !


mmo_17_df_spark <- mmo_17_df_spark %>%
  
  rename(debut_contrat = DebutCTT) %>%
  
  filter(debut_contrat >= as.Date("2017-01-01") & debut_contrat < as.Date("2017-02-01")) %>%
  
  mutate(mois_debut_contrat = substr(debut_contrat,6,7))

La lazy evaluation

Spark distingue deux types d’opérations :

  • les transformations : prennent en entrée un spark_data_frame et retournent un spark_data_frame, elles ne déclenchent aucun calcul

    Par exemple, le programme ci-dessous ne déclenche pas d’exécution :

mmo_17_df_spark_mois <- mmo_17_df_spark %>%
  rename(debut_contrat = DebutCTT) %>%
  filter(debut_contrat >= as.Date("2017-01-01") & debut_contrat < as.Date("2017-06-01")) %>%
  mutate(mois_debut_contrat = substr(debut_contrat,6,7))
  • les actions : forcent le calcul d’un résultat pour le récupérer et déclenchent l’exécution de toutes les transformations compilées jusqu’à l’appel de l’action.

    Par exemple, le programme ci-dessous déclenche le calcul de toute la cellule précédente :

nb_debut_contrat_fev_17 <- mmo_17_df_spark_mois %>%
  group_by(mois_debut_contrat) %>%
  summarise(nb_contrats = n()) %>%
  print()

La lazy evaluation : un gain de temps considérable


La gestion des erreurs

En réalité, lorsqu’on appuie sur le bouton run, il ne se passe pas “rien”. Le code est compilé par spark : les erreurs sont repérées avant même que le code soit exécuté !

Récupérer un résultat

Les principales actions sont :

  • print()

  • head() + collect()

  • ⚠️ collect() pour de petites tables : ne fonctionne pas sur des grosses tables

  • tbl_cache() (écrire un spark_data_frame en mémoire pour le réutiliser)

Le bouton stop

Il est recommandé de ne pas utiliser ce bouton en programmant en sparklyr : il rend la session spark inutilisable par la suite, il faut fermer RStudio et rouvrir ensuite.

Les erreurs spark

Les erreurs de programmation sont soulevées avant que les calculs commencent.

Des erreurs peuvent survenir pendant l’exécution du code, quelques minutes après l’appel d’une action par exemple.

Collecter des données trop volumineuses

Une source fréquente d’erreur pendant l’exécution est l’appel d’un collect() sur des données trop volumineuses pour être collectées. La première étape du débuggage consiste à limiter les collect().

Les erreurs sparklyr

Les erreurs envoyées par spark sont “traduites” par sparklyr pour être affichées dans la console de R. Elles ne sont pas toujours très lisibles, ou très précises sur la nature de l’erreur/sa source.

… presque tout comme dplyr

La majorité des commandes dplyr fonctionnent sur un spark_data_frame avec le package sparklyr. Les divergences principales sont les suivantes :

Fonctionnalité tidyverse sparklyr
import d’un fichier .parquet read_parquet spark_read_parquet()
tri d’un tableau arrange() window_order() ou sdf_sort()
opérations sur les dates lubridate fonctions Hive
empiler des tableaux bind_rows() sdf_bind_rows()
nombre de lignes d’un tableau nrow() sdf_nrow()
faire pivoter un tableau tidyr sdf_pivot()
export d’un spark_data_frame spark_write_parquet()

Quelques fonctions R pas encore traduites

Les fonctions de lubridate()ne sont pas adaptées au spark_data_frames.

  • Convertir une chaîne de caractère de la forme AAAA-MM-DD en Date
    date_1 <- as.Date("2024-05-26")
  • Calculer une durée entre deux dates
    PJC_spark <- spark_read_parquet(sc,
                                    path = "hdfs:///dataset/MiDAS_v4/pjc.parquet",
                                    memory = FALSE)

    duree_pjc_df <- PJC_spark %>%
      rename(date_fin_pjc = as.Date(KDFPJ),
             date_deb_pjc = as.Date(KDDPJ)) %>%
      mutate(duree_pjc = datediff(date_fin_pjc, date_deb_pjc) + 1) %>%
      head(5)
  • Ajouter ou soustraire des jours ou des mois à une date
    duree_pjc_bis_df <- duree_pjc_df %>%
      mutate(duree_pjc_plus_5 = date_add(duree_pjc, int(5)),
             duree_pjc_moins_5 = date_sub(duree_pjc, int(5)),
             duree_pjc_plus_1_mois = add_months(duree_pjc, int(1))) %>%
      head(5)

Add_months

Si la date en entrée est le dernier jour d’un mois, la date retournée avec add_months(date_entree, int(1)) sera le dernier jour calendaire du mois suivant.

Format

Le int() est important car ces fonctions Hive n’accepte que les entiers pour l’ajout de jours : taper uniquement 5 est considéré comme un flottant dans R.

  • Tri dans un groupe pour effectuer un calcul séquentiel
    ODD_spark <- spark_read_parquet(sc,
                                    path = "hdfs:///dataset/MiDAS_v4/odd.parquet",
                                    memory = FALSE)

    ODD_premier <- ODD_spark %>%
      group_by(id_midas) %>%
      window_order(id_midas, KDPOD) %>%
      mutate(date_premier_droit = first(KDPOD)) %>%
      ungroup() %>%
      distinct(id_midas, KROD3, date_premier_droit) %>%
      head(5)
  • Tri pour une sortie : sdf_sort() , arrange() ne fonctionne pas

  • Concaténer les lignes (ou les colonnes sdf_bind_cols())

    ODD_1 <- ODD_spark %>%
      filter(KDPOD <= as.Date("2017-12-31")) %>%
      mutate(groupe = "temoins")
    
    ODD_2 <- ODD_spark %>%
      filter(KDPOD >= as.Date("2021-12-31")) %>%
      mutate(groupe = "traites")
    
    ODD_evaluation <- sdf_bind_rows(ODD_1, ODD_2)
  • Dédoublonner une table

    droits_dans_PJC <- PJC_spark %>%
      sdf_distinct(id_midas, KROD3)

    print(head(droits_dans_PJC, 5))

    PJC_dedoublonnee <- PJC_spark %>%
      sdf_drop_duplicates()

    print(head(PJC_dedoublonnee, 5))
  • Pivot : les fonctions du packag tidyr ne fonctionnent pas sur données spark
    ODD_sjr_moyen <- ODD_spark %>%
      mutate(groupe = ifelse(KDPOD <= as.Date("2020-12-31"), "controles", "traites")) %>%
      sdf_pivot(groupe ~ KCRGC,
        fun.aggregate = list(KQCSJP = "mean")
      )
  • Résumé statistique : sdf_describe() , summary()ne fonctionne pas.

  • Dimension : sdf_dim, la fonction nrow()ne fonctionne pas.

  • Quantiles approximatifs : le calcul des quantiles sur données distirbuées renvoie une approximation car toutes les données ne peuvent pas être rappatriées sur la même machine physique du fait de la volumétrie, sdf_quantile()

  • Echantillonnage aléatoire : sdf_random_split

Je veux voir ma table

  1. Vérifier le nombre de lignes sans collecter
ma_table %>% 
  sdf_nrow()
  1. Vérifier la présence de doublons
nb_doublons <- ma_table %>% 
  group_by(id_midas) %>%
  summarise(nb_ligne_ind = n()) %>%
  ungroup() %>%
  filter(nb_ligne_ind > 1) %>%
  sdf_nrow()
  1. Récupérer une partie de la table : pas plus de 500 lignes
une_partie_de_ma_table <- ma_table %>% 
  head(100) %>%
  collect()


une_partie_de_ma_table <- ma_table %>% 
  filter(id_midas %in% ma_liste_id_midas) %>%
  collect()

# une_partie_de_ma_table est ici un data.frame R classique que vous pouvez ouvrir!

Exporter des données sur disque

Sur la pause déjeuner par exemple 😉

Pourquoi ❓ Pour des données qui ne peuvent pas être collectées en mémoire vive

Export des spark data frames directement sous HDFS : à aucun moment on n’ouvre la table : on peut traiter des données beaucoup plus volumnieuses que la mémoire RAM !

ma_table_spark <- MMO %>%
  
  right_join(mon_champ_individuel, by = c("id_midas")) %>%
  
  mutate(fin_ctt_bis = ifelse(is.na(FinCTT), as.Date("2023-12-31"), FinCTT)) %>%
  
  mutate(duree_ctt = DATEDIFF(FinCTT, DebutCTT) + 1)

spark_write_parquet(ma_table_spark, "hdfs:///resultats/ma_table.parquet")

Possibilité de récupérer ce fichier sur la bulle MiDARES = en local.

Exports simultanés

HDFS supporte les exports simultanés, mais le temp d’export est plus long lorsque le NameNode est requêté par plusieurs personnes simultanément

Si on souhaite la récupérer en local

Les exports sur HDFS

Lorsqu’on exporte une table depuis notre session R vers HDFS, celle-ci est automatiquement partitionnée, comme le reste des données.

Ainsi, cette table sera stockée en plusieurs morceaux sous HDFS et répliquée.

Il est possible de maîtriser le nombre de partitions avec la commande sdf_coalesce(partitions = 1) du package sparklyr.

Avec sdf_coalesce(partitions = 1), on n’aura qu’un seul fichier à télécharger depuis HDFS.

Avec sdf_coalesce(partitions = 200), on aura 200 morceaux de notre fichier à télécharger à la main (pas possible de faire tout sélectionner sous HDFS !).

L’idéal est d’adapter le nombre de partitions à la taille d’un bloc : un bloc mesure 128 MB.

ma_table <- data.frame(c("Anne", "Paul"), c(25,30))

ma_table_spark <- copy_to(sc, ma_table) %>%
  sdf_coalesce(partitions = 1)

spark_write_parquet(ma_table_spark, "hdfs:///resultats/ma_table.parquet")

Fermer sa session

Il faut impérativement fermer sa session spark après une session de travail. Deux moyens pour ça :

  • fermer R Studio

  • si on ne ferme pas RStudio, utiliser la fonction spark_disconnect_all() dans son code

Si on souhaite lancer un code le soir en partant, on n’oublie pas le spark_disconnect_all() à la fin du code.

Partage des ressources

Les ressources réservés par un utilisateur ne sont libérées pour les autres que lorsqu’il se déconnecte. Ne pas se déconnecter, c’est bloquer les ressources. Si j’ai réservé deux ordinateurs du cluster sur 15, personne d’autres ne peut les réserver tant que je n’ai pas déconnecter ma session spark.

Nous fermerons les sessions ouvertes trop longtemps (départ de congés sans déconnexion) si des utilisateurs présents en ont besoin : risque de perte du travail non enregistré.

Télécharger des données en local

Et ensuite ?

Spark est un outil de traitement de données volumineuses. Il n’est pas toujours adapté :

  • pour de petites tables : il ne va pas engendrer de gain de temps, voire augmenter le temps

  • pour faire de l’économétrie poussée : tous les packages R ne sont pas traduits en spark

  • pour ouvrir sa table : on perd les avantages de spark si on collecte toute la table en mémoire RAM

Conseils :

  1. Créer sa table d’étude en appariant les tables de MiDAS avec le cluster spark

  2. L’exporter sous HDFS

  3. La télécharger en local

  4. La charger en R classique pour faire de l’économétrie

Quizz : spark ou pas spark ?

  • faire des statistiques descriptives sur une unique table de 1 million d’individus et 30 variables déjà créée
  • créer une table de 5 millions demandeurs d’emploi avec leur situation au regard de l’emploi (MMO), leur durée d’inscription (DE du FHS)
  • apparier 4 tables mensuelles de la CNAF pour repérer la liste des id_midas bénéficiaires de minima sociaux 4 mois donnés (4 millions chaque mois)
  • faire de l’économétrie sur une unique table déjà créée

Econométrie et Machine Learning avec Spark

Il existe des outils pour faire de l’économétrie avec spark, la librairie Apache Spark MLlib. Elle relève d’une utilisation plus avancée de spark que nous ne traitons pas ici. Elle ne contient pas autant de modèles que le CRAN R pour la recherche en économétrie.

Il vous est conseillé de créer une unique table d’étude puis de la traiter en R classique pour l’économétrie.

Les bonnes pratiques 🤝

Mode local : schéma

Mode local : inadapté et mauvaise pratique


Spark et le mode local :

  • un seul ordinateur alors que spark est fait pour plusieurs ordinateurs distincts

  • beaucoup moins de ressources disponibles sur la bulle que sur le cluster

  • mauvaise gestion de l’allocation des ressources entre utilisateurs : pas faite pour plusieurs utilisateurs

  • ralentissements considérables et bugs : bloque les autres utilisateurs

    ▶️spark n’est adapté que pour le cluster de calcul, la bulle pour faire du R sans spark sur des données peu volumineuses

Inefficient de prendre beaucoup de ressources

Les ordinateurs du cluster ont besoin de s’envoyer des données par le réseau : c’est la partie la plus lente d’un programme spark !

Si j’augmente les ressources : par exemple, je réserve 3 ordinateurs du cluster plutôt que 2

  1. Effet puissance de calcul : plus de ressources pour faire les calculs = réduction du temps de calcul

  2. Effet augmentation des échanges réseau (shuffles) : augmentation du temps de calcul

  3. Gêne des autre utilisateurs

Ne pas collecter

Collecter, c’est quoi ?

Collecter c’est utiliser l’instruction collect(). Elle permet de rapatrier l’ensemble des résultats du cluster vers la bulle et la session R de l’utilisateur en format R, par exemple des data.frames.

Collect() :

  1. est une action : elle déclencher tous les calculs

  2. implique des échanges réseau très importants : entre ordinateurs du cluster et du cluster vers la bulle : c’est extrêmement long, moins efficient que l’enregistrement sur disque directement depuis spark

  3. rappatrie les résultats (une table) dans la mémoire vive de R, qui est sur la bulle : si le résultat est volumineux, cela bloque les autres utilisateurs

Recommandations :

  • Ne pas collecter des tables de plus de 15 Go

  • Utiliser les autres méthodes proposées pour ne pas bloquer les utilisateurs qui ont besoin de R en mode classique

  • Ne pas changer les configurations

Fermer sa session




Pour ne pas bloquer les collègues 👨‍💻

Yarn

Yarn permet de consulter la réservation des ressources par les utilisateurs.

On peut y accéder en copiant le lien suivant dans Google chrome sur la bulle (mettre en favori) : midares-deb11-nn-01.midares.local:8088/cluster

Vérifier que notre session est fermée et qu’on ne prend pas trop de ressources : yarn

Mutualiser les expériences

  • Aide au passage d’un code sur le cluster

  • Programmer entre collègues

  • Contributions à la documentation MiDAS : section fiches, à l’aide de pull requests sur github

Optimiser le code : non ! Mais optimiser la mémoire…

Comment fonctionne spark ?

  • Apache Spark : librairie open source développée dans le langage scala

    val TopHorrorsIGN2022 = Seq(
      (9, "Pearl"),
      (6, "The Sadness"),
      (6, "Offseason"),
      (7, "Hatching"),
      (8, "x")
    ).toDF("IMDB Rating", "IGN Movie Picks")
    
    import org.apache.spark.sql.functions.col
    
    val cols = List(col("IGN Movie Picks"), col("AVC Movie Picks"))
    
    val query = TopHorrorsIGN2022(
      "IGN Movie Picks"
    ) === TopHorrorsTheAVClub2022("AVC Movie Picks")
    
    val outerJoin = TopHorrorsIGN2022
      .join(TopHorrorsTheAVClub2022, query, "outer")
      .select(cols: _*)
    
    outerJoin.show()
  • scala adapté pour maîtriser toutes les fonctionnalités de spark et optimiser au maximum les traitements en spark

  • spark est compatible avec les langages scala, R, python, java, et peut interpréter des commandes SQL.

Le driver en sparklyr

  • Le programme R est traduit en scala grâce au package sparklyr

  • Le driver évalue le programme, il lit le code scala mais n’exécute rien du tout

  • S’il remarque une erreur, l’erreur est envoyée directement à l’utilisateur en session R avant l’exécution du programme : c’est la force de la lazy evaluation.

Pas besoin d’optimiser son code !

source : documentation CASD disponible à Documentation Data Science

Catalyst optimise le code pour nous

Le driver contient un programme nommé Catalyst qui optimise le code scala automatiquement.

Spark optimise automatiquement les programmes soumis :

  1. Compilation des transformations pour soulever les éventuelles erreurs

  2. Intégration dans un plan d’exécution contenant les étapes nécessaires pour parvenir au résultat demandé par le programme

  3. Optimisation du plan logique par le module Catalyst (driver Spark)

Catalyst optimise le code pour nous

Catalyst optimise le code pour nous

Par exemple si j’écris le programme :

non_optimal <- table_1 %>%   
    mutate(duree_contrat = DATEDIFF(fin_contrat, debut_contrat)) %>%   
    filter(debut_contrat >= as.Date("2023-01-01"))


Catalyst réécrit :


optimal <- table_1 %>%   
    filter(debut_contrat >= as.Date("2023-01-01")) %>%   
    mutate(duree_contrat = DATEDIFF(fin_contrat, debut_contrat))

Cette optimisation est réalisée sur toutes les transformations compilée avant qu’une action déclenche l’exécution.

Catalyst optimise le code pour nous : laissons-le travailler !

Déclencher le moins d’actions possibles dans son programme permet de tirer pleinement parti de Catalyst et de gagner un temps certain.

Pour profiter des avantages de spark, la manière de programmer recommandée est différente de celle prédominante en R classique. On évite quoi ?

On évite :

  • de mettre des collect()sur chaque table intermédiaire

  • de collect() une table entière

  • de print() à chaque étape

Sinon Catalyst n’a pas assez de code pour optimiser !

Catalyst optimise le code pour nous : laissons-le travailler !

non_optimal <- table_1 %>% 
    collect() %>%
    mutate(duree_contrat = DATEDIFF(fin_contrat, debut_contrat)) %>%   
    filter(debut_contrat >= as.Date("2023-01-01"))

versus

optimal <- table_1 %>%
    mutate(duree_contrat = DATEDIFF(fin_contrat, debut_contrat)) %>%   
    filter(debut_contrat >= as.Date("2023-01-01")) %>%
    head(5) %>% 
    collect()

La longueur du plan logique

Fournir un plan logique très long sans déclencher d’action peut créer une erreur en spark : spark “refuse” d’optimiser un plan si long.

La bonne pratique consiste à “cacher” des résultats intermédiaires, pour déclencher l’exécution régulièrement et conserver les résultats en mémoire, tout en nettoyant la mémoire des résultat intermédiaires précédents :

table_1 <- mon_champ %>%
  left_join(table_2) %>%
  mutate(variable_1 = indicatrice_1 + indicatrice_2,
         regroupement_variable_2 = case_when(variable_2 %in% c(1,2,3) ~ "A",
                                             variable_2 %in% c(5,8,9) ~ "B",
                                             TRUE ~ "C")) %>%
  left_join(table_3) %>%
  sdf_register("table_1")

tbl_cache(sc, "table_1")

table_4 <- table_1 %>%
  left_join(table_5) %>%
  mutate(variable_y = ifelse(variable_x > 50, 1, 0)) %>%
  sdf_register("table_4")

tbl_cache(sc, "table_4")

tbl_uncache(sc, "table_1")

Le rôle du cluster manager

Le cluster manager distribue les traitements physiques aux ordinateurs du cluster :

  • il connaît le meilleur plan physique fourni par Catalyst ;

  • il connaît les ressources disponibles et occupées par toutes les machines du cluster ;

  • il affecte les ressources disponibles à la session spark.

Le rôle du worker

Le worker effectue le morceau de programme qu’on lui affecte :

  • il ne connaît que les tâches qu’on lui a affectées ;

  • il peut communiquer avec le driver en réseau pour renvoyer un résultat ;

  • il peut communiquer avec les autres workers en réseau pour partager des données ou des résultats intermédiaires : c’est un shuffle.

Calcul distribué et récupération des résultats

Le réseau

  • Les workers communiquent avec le driver de la bulle MiDARES en réseau

  • Les workers communiquent entre eux en réseau pour s’échanger des données

  • Le réseau est un mode de communication lent

Traitement MAP distribué

Traitement REDUCE distribué

Le stockage distribué avec HDFS

Calcul distribué, calcul vectoriel


Les opérations les plus coûteuses en spark sont :

  • les opérations par groupe de lignes, qui impliquent des shuffles, ou échanges de données entre workers via le réseau

  • les opérations d’écriture sur disque avec spark_write_parquet()

  • les opérations 🛑non vectorisées🛑, qui entraînent des shuffles lourds et inutiles : boucles for, jointures volumineuses…


Les données MiDAS sont structurées de manière proche d’une base de données relationnelles : leur traitement nécessite des jointures. Une partie des données sont mensuelles : cette structure peut inciter à programmer en boucle for sur le mois, ce qui est long et inefficient.

Calcul distribué, calcul vectoriel : boucles for


Cas d’usage : je veux repérer si un groupe d’individus est au RSA un mois, deux mois, trois mois etc. après la sortie de l’assurance-chômage

J’utilise :

  • le FNA, dont j’extraie une table individu avec le mois de sortie de l’assurance-chômage, table sorties

  • les tables mensuelles de la CNAF cnaf_prestations_mois_m

Calcul distribué, calcul vectoriel : boucles for

Lance 12 X tous les mois de sortie jobs spark, beaucoup de shuffles

library(dplyr)
library(sparklyr)

res_all <- NULL

for (mois in mois_sortie_vec) { # 1ère boucle sur les mois de sorties
  # Sous-ensemble des sortants de ce mois
  sorties_mois <- sorties %>%
    filter(mois_sortie == !!mois) %>%
    select(id, mois_sortie)

  for (h in 1:12) { # 2ème boucle sur les 12 mois d'horizon
    
    mois_cible <- as.Date(mois) + months(h)
    nom_tbl <- paste0("cnaf_prestations_", format(mois_cible, "%Y_%m"))
    table_mois_cnaf <- spark_read_parquet(paste0(chemin_table, nom_tbl))

    perception_RSA_mois_h <- sorties_mois %>%
      mutate(mois_h = sql(paste0("add_months(mois_sortie, ", h, ")"))) %>%
      inner_join(table_mois_cnaf, by = c("id" = "id", "mois_h" = "mois")) %>%
      mutate(perception_RSA = ifelse(RSAVERS == "C" & MTRSAVER > 0, 1, 0)) %>%
      transmute(id, mois_sortie, h = !!h, perception_RSA)

    # empiler et mettre en cache le résultat
    res_all <- if (is.null(res_all)) perception_RSA_mois_h else sdf_bind_rows(res_all, perception_RSA_mois_h) %>% sdf_register(paste0("temp", mois, h))
  tbl_cache(sc, paste0("temp", mois, h))
  }
}

Lance 12 jobs et une action à chaque tour

for (mois in liste_mois) {
  
  nom_tbl <- paste0("cnaf_prestations_", format(mois, "%Y_%m"))
  table_mois_cnaf <- spark_read_parquet(paste0(chemin_table, nom_tbl))

  # Jointure globale, calcul de l'horizon h 
  perception_RSA_mois <- table_mois_cnaf %>%
    inner_join(sorties, by = "id") %>%
    mutate(h = sql("cast(months_between(mois, mois_sortie) as int)")) %>%
    filter(h >= 1, h <= 12) %>%
    select(id, mois_sortie, h, prest)

  # Cache pour enregistrer le résultat en mémoire
  perception_RSA_mois_cache <- perception_RSA_mois %>% sdf_register(paste0("temp", mois))
  tbl_cache(sc, paste0("temp", mois))

  res_stack <- if (is.null(res_stack)) joined_cached else sdf_bind_rows(res_stack, joined_cached)
}

Meilleure solution : un seul plan logique que Catalyst peut entièrement optimsier, en limitant les shuffles, beaucoup plus rapide

# Charger la première table CNAF
nom_1 <- paste0("cnaf_prestations_", format(as.Date("2023-01-01"), "%Y_%m"))
cnaf_total <- spark_read_parquet(paste0(chemin_table, nom_1))

# Empiler les autres
for (mois in liste_mois) {
  
  nom_tbl <- paste0("cnaf_prestations_", format(mois, "%Y_%m"))
  table_mois_cnaf <- spark_read_parquet(paste0(chemin_table, nom_tbl))
  
  cnaf_total <- sdf_bind_rows(cnaf_total, table_mois_cnaf) %>%
    sdf_register("cnaf_total")
  tbl_cache(sc, "cnaf_total")
}


# Joindre UNE fois avec la table des sorties (broadcast si petit)
sorties_b <- sdf_broadcast(sorties %>%
  select(id, mois_sortie)
)

perception_RSA_horizon <- cnaf_total %>%
  inner_join(sorties_b, by = "id") %>%
  mutate(
    h = sql("cast(months_between(mois, mois_sortie) as int)")
  ) %>%
  filter(h >= 1, h <= 12) %>%
  select(id, mois_sortie, h, prest)

Calcul distribué, calcul vectoriel : jointures

Une jointure implique pour spark de rappatrier les lignes avec les mêmes valeurs de clef sur le même worker : les jointures engendrent des shuffles.


Lorsque l’on joint une grosse table avec une petite table, Spark peut optimiser le calcul en utilisant un broadcast join : la petite table est diffusée en entier sur chaque worker, ce qui évite un shuffle massif.

res <- grosse_table %>%
  inner_join(sdf_broadcast(petite_table), by = "clef")


👉 Ici petite_table est diffusée (broadcast) sur tous les workers : chaque partition de grosse_table fait alors le join localement, sans transfert réseau coûteux.

Calcul distribué, calcul vectoriel : jointures

Broadcast automatique : par défaut, Spark choisit automatiquement le broadcast join si la table à diffuser fait moins de 10 MB (paramètre configurable).

Au-delà, il utilise un shuffle join classique (plus lent).

Astuce right_join : dans sparklyr, la position de la table peut influencer le plan choisi par Spark :

petite_table %>% left_join(grosse_table) → Spark n’essaie pas forcément de diffuser la petite.

grosse_table %>% right_join(petite_table) → Spark choisit plus volontiers un broadcast join.

👉 Bonne pratique : forcer avec sdf_broadcast() quand on sait que la table est petite, plutôt que de compter sur ce comportement implicite.

Calcule distribué, calcul vectoriel

Opération logique Solution non vectorielle Solution vectorielle
situation à m + 1, 2… Boucle for sur les mois utilisation de lag() et lead(), ou auto jointure sur les dates
Répéter une fonction user defined functions (UDF) éviter les UDF
opération groupe de lignes group_by %>% mutate %>% distinct group_by %>% summarise
joindre petite table avec grosse table petite_table %>% left_join(grosse_table) broadcast join
joindre deux grosses tables boucle for en divisant les tables en petits morceaux unique jointure et partition par la clef de jointure
construire un panel cylindré boucle for cross_join() de deux tables

La mémoire du driver

L’utilisation de la mémoire du driver

Cette configuration permet de collecter des statistiques descriptives et de petites tables sans gêner les autres utilisateurs.

conf <- spark_config()
conf["spark.driver.memory"] <- "20Go"
conf["spark.executor.memory"] <- "80Go"
conf["spark.executor.cores"] <- 5
conf["spark.executor.instances"] <- 2
cont["spark.yarn.queue"] <- "prod"
conf["spark.driver.maxResultSize"] <- 0

sc <- spark_connect(master = "yarn", config = conf)

Bonne pratique de partage des ressources

Le driver est dans la bulle Midares, qui a vocation à être réduite suite à la généralisation du cluster.

  • La bulle Midares a besoin de RAM pour fonctionner, 100% des ressources ne sont donc pas disponibles pour sparklyr.

  • Pour permettre le travail simultané fluide de 10 utilisateurs, la mémoire allouée au driver recommandée pour chaque utilisateur est de maximum 20 Go.

  • Il existe des alternatives pour ne pas collecter des résultats trop volumineux dans le driver.

Programmer sans collecter

La programmation en spark doit être adaptée aux contraintes de volumétrie des données : test de chaque étape, puis ne forcer le calcul qu’à la fin pour que Catalyst optimise l’ensemble du programme

La principale différence avec la programmation en R classique est que la visualisation de tables complètes volumineuses n’est pas toujours possible et n’est pas recommandée :

  • goulets d’étranglement même avec spark, car toutes les données sont rapatriées vers le driver puis vers la session R : erreurs Out of Memory

  • longue : échange entre tous les noeuds impliqués dans le calcul et le driver, puis un échange driver-session R en réseau = lent ;

  • beaucoup moins efficace que l’export direct en parquet du résultat (qui fonctionne toujours) : charger ensuite sa table finale en data frame R classique pour effectuer l’étude.

S’il est nécessaire de collecter, il faut prévoir beaucoup de RAM pour le driver avec le paramètre spark.driver.memory, ce qui empêche les autres utilisateurs de travailler.

Programmer sans collecter

Les résultats qu’il est recommandé de récupérer en mémoire vive en session R sont de la forme suivante :

  • une table filtrée avec les variables nécessaires à l’étude uniquement : sous MiDAS, toutes les jointures, les calculs de variable et les filtres peuvent être effectués de manière efficiente sous la forme de spark_data_frame, sans jamais collecter les données MiDAS ;

  • des statistiques descriptives synthétiques ;

  • les premières lignes de la table pour vérifier que le programme retourne bien le résultat attendu ;

  • une table agrégée pour un graphique par exemple, à l’aide de la fonction summarise().

Programmer sans collecter

Je sais que la création de ma table donne le résultat souhaitée (car j’ai regardé ce dont elle a l’air avvec head()), maintenant je vais l’appeler une dizaine de fois pour collecter uniquement des statistiques descriptives.

Que se passe-t-il à chaque fois que je collecte une statistique descriptive ?

La création de la table va être exécutée à nouveau : très long ?

Comment faire ?

La création de la table est exécutée une seule fois, le résultat est conservé en mémoire vive

ma_table_spark <- MMO_2017 %>%
  filter(DebutCTT > as.Date("2017-06-01")) %>%
  mutate(duree_CTT = DATEDIFF(FinCTT,DebutCTT) + 1) %>%
  sdf_register(name = "ma_table_spark")

tbl_cache("ma_table_spark")

Optimiser la mémoire : conclusion

Pour programmer en spark sans aucune erreur :

  1. Déclencher une action avec plusieurs transformations pour laisser Catalyst optimiser

  2. Ne pas collecter tout une table

  3. Persister ou cacher une table qu’on va appeler plusieurs fois pour ne collecter que des statistiques descriptives

  4. Ne pas persister trop de tables : occupe de la mémoire RAM

  5. Consulter le programme exemple sur la bulle CASD si besoin

Pour aller plus loin

Partitionnement

Le format .parquet (avec arrow) et le framework spark permettent de gérer le partitionnement des données.

Le partitionnement a un impact sur la manière dont les données sont organisées physiquement sur le système de fichiers.

Partitionnement


Partitions 2 5 1000
Colonne qui a servi au partitionnement 74,50% 46,30% 16,66%
Vers une autre colonne 89,51% 191,01% 556,99%
Select distinct(*) 136,79% 163,68% 1194,88%


spark_write_parquet(ma_table, "hdfs:///resultats/ma_table.parquet", partition_by = c("age","sex"))

Éviter le problème des exécuteurs inactifs

Supposons que le jeu de données ait 8 partitions, un exécuteur (avec seulement 1 core) ne peut exécuter qu’une tâche(task) à la fois, et une partition = une tâche.

Cas 1 : 6 exécuteurs, au dernier tour, il ne reste que 2 tâches, 4 exécuteurs seront inactifs. Cas 2 : 4 exécuteurs, 2*4, aucun exécuteur inactif.

# Get the number of partitions
num_partitions <- sdf_num_partitions(df)
print(num_partitions)

# Repartition the DataFrame to a specific number of partitions
df_repartitioned <- sdf_repartition(df, partitions = 10)
  • Le nombre de partitions doit être divisible par le nombre d’exécuteurs.
  • Le nombre de partitions doit être supérieur au nombre d’exécuteurs.

Éviter trop de partitions

La création de tâches entraîne des surcharges, qui doivent toujours être inférieures à 50 % du temps total d’exécution de la tâche.

  • # Repartition the DataFrame to a specific number of partitions
    df_repartitioned <- sdf_repartition(df, partitions = 10)
    
    # Repartition the DataFrame by a specific column, e.g., "commune_code".
    # The partition number will be the distinct value number
    df_repartitioned <- sdf_repartition(df, partition_by = "commune_code")
  • La répartition est une opération très coûteuse, utilisez-la judicieusement.

  • En général, la taille recommandée des partitions est d’environ 128 à 512 Mo.

Optimiser la configuration des exécuteurs

Configuration recommandée :

  • Un exécuteur devrait avoir entre 3 et 5 cores.
  • Pour chaque core, il faut réserver entre 4 et 8 Go de mémoire.

En mode cluster, chaque exécuteur fonctionne dans une JVM (la JVM nécessite une mémoire supplémentaire et du CPU pour exécuter le GC).

  • Évitez 1 core par exécuteur.
  • Évitez trop de cores dans un seul exécuteur, cela peut causer des problèmes de contention de threads ou la surcharge du garbage collector.

Tâches Maximales en parallèles


Parallélisation

Max_Parallel_Tasks = Number_of_Executors * Cores_per_Executor


Par exemple, une session Spark dispose de la configuration suivante :

conf["spark.executor.memory"] <- "32Go"
conf["spark.executor.cores"] <- 4
conf["spark.executor.instances"] <- 5

5 executor * 4 core = 20 tâches en parallèle. Pour un jeu de données de 200 partitions, il faut 10 tours pour terminer tous les calculs.


▶️Il n’existe pas de configuration universelle optimale pour tous, seulement la meilleure configuration pour vos tâches.

SparkUI

Spark UI permet de consulter le plan logique et physique du traitement demandé. Trois outils permettent d’optimiser les traitements :

Vérifier que le gc time est inférieur à 10% du temps pour exécuter la tâche ✅

Vérifier que la storage memory ne sature pas la mémoire ✅

Sparkhistory

  • Sparkhistory pour des traitements de sessions fermées

Le sparkhistory entraîne l’enregistrement de logs assez lourdes, il est donc désactivé par défaut. Pour l’activer sur un programme :

conf <- spark_config()
conf["spark.eventLog.enabled"] <- "true"
conf["spark.eventLog.dir"] <- "hdfs://midares-deb11-nn-01.midares.local:9000/spark-logs"
conf["appName"] <- "un_nom_de_traitement"

sc <- spark_connect(master = "yarn", config = conf)

Pyspark


Annexe

Le stockage distribué avec HDFS

Hadoop Distributed File System (HDFS)

  • stockage sur différentes machines : les différents ordinateurs workers du cluster

  • données divisées en blocs plus petits de taille fixe et répartis sur les machines : aucune table de MiDAS n’existe en entier sur le cluster

  • chaque bloc est répliqué trois fois : il existe trois fois les 10 premières lignes de la table FNA sur trois ordinateurs différents du cluster (résilience)

  • un NameNode supervise les métadonnées et gère la structure du système de fichiers : il sait où sont quels fichiers

  • les DataNodes stockent effectivement les blocs de données : les datanodes sont en fait les disques durs des workers du cluster, chaque ordinateur du cluster dispose d’un disque avec une partie des données MiDAS

  • le système HDFS est relié à la bulle Midares : possible de charger des données en clique-bouton de la bulle vers HDFS de manière très rapide et de télécharger des tables de HDFS pour les récupérer en local