Exercices big-dataAuteur : Julien Gauchet le 04/07/2017 (revision 1)

Cette page présente les exercices associés à la formation. Les fichiers servant de base aux exercices sont disponnibles ici : Exercices. La correction est présentée sur cette page. Libre à vous de la consulter ou non pendant ou après la formation.

1. Installation et configuration de cloudera quickstart

La machine virtuelle cloudera permet de travailler sur une installation standalone (un seul noeud qui est à la fois namenode et datanode). Pour travailler sur cette machine, nous allons utiliser virtual box.

  1. Télécharger (ou récupérer) la machnine virtuelle cloudera quickstart sur le site de cloudera
  2. Importer cette machine dans virtual box
  3. Créer un dossier partagé : une fois la machine importée et avant de la démarer, cliquer sur configuration et choisir le menu Dossiers partagés. Sélectionner un dossier de votre espace de travail et lui donner un nom. Par exemple Partage
  4. Démarrer la machine
  5. Attendre longtemps et m'écouter avancer sur ce cours si passionnant
  6. Configurer la langue du clavier : System > Preferences > Keyboard onglet Layout. Ajouter French et supprimer english
  7. Configurer le gestionnaire de fichiers pour qu'il n'ouvre pas une nouvelle fenêtre à chaque clic (comportement très énervant) : System > Preferences > File Management. Onglet Behavior : cocher la case Always open in browser windows.
  8. Configurer gedit pour ne pas qu'il génère de fichier de sauvegarde : fichiers parasites. Pour celà, ouvrir gedit, Edit > Preferences onglet editor et décocher Create a backup copy of file before saving. Dans l'onglet view, je vous recommande de cocher display line number et de décocher enable text wrapping
  9. Monter le dossier partagé. Par convention, les systèmes de fichiers sont montés dans le répertoire mnt.
    sudo mkdir -p /mnt/Partage
    sudo mount -t vboxsf -o uid=$(id -u) -o gid=$(id -g) Partage /mnt/Partage

Vote machine est prête à être utilisée

2. Initialisation de hdfs

Réaliser les opérations suivantes sur hdfs :

  • Lister les dossiers présents à la racine
  • Lister les fichiers présents sous /user/cloudera
  • Créer le dossier /user/cloudera/reception (le supprimer ainsi que son contenu avant de le faire)
  • Déposer dans le dossier /user/cloudera/reception les fichiers fournis pour les exercices
  • Donner les droits à tout le monde sur ces fichiers
# 1. Liste des fichiers à la racine
hadoop fs -ls /

# 2. Liste des fichiers sous le répertoire /user/cloudera
hadoop fs -ls /user/cloudera

# 3. Création du dossier /user/cloudera/reception
hadoop fs -rm -f -r /user/cloudera/reception
hadoop fs -mkdir /user/cloudera/reception

# 4. Dépot des fichiers
hadoop fs -copyFromLocal /home/cloudera/Desktop/formation/donnees/* /user/cloudera/reception

# 5. Droits sur les fichiers
hadoop fs -chmod 777 /user/cloudera/reception/'*'

3. Pig chargement simple

Mettre au format le fichier origines.csv fournit à l'aide de Pig. Le fichier en sortie sera un fichier csv délimité par des | qui contiendra les colonnes suivantes

  • Prénom
  • Genre
  • Identifiant Langue

Le fichier mis au format sera enregistré sous /user/cloudera/data/origines. Une fois le traitement pig terminé, supprimer le fichier _SUCCESS créé

hadoop fs -mkdir /user/cloudera/data
pig -x mapreduce /home/cloudera/Desktop/formation/pig-chargement1.pig
hadoop fs -ls /user/cloudera/data/origines
hadoop fs -rm -f /user/cloudera/data/origines/_SUCCESS

data = LOAD '/user/cloudera/reception/origines.csv' USING PigStorage(';') AS (prenom:chararray, genre:chararray, origine:chararray, frequence:double);
resultat = FOREACH data GENERATE prenom, genre, origine;
STORE resultat INTO '/user/cloudera/data/origines' USING PigStorage('|');

4. Pig chargement via une UDF

Mettre au format les fichiers prenoms_paris_2014.txt et prenoms_paris_2015.txt en lisant le fichier avec une udf de chargement pig et enregistrer le résultat sous la forme d'un fichier csv dont les champs sont séparés par des | et contenant les colonnes suivantes

  • prenom : en minuscule
  • nombre
  • genre : m ou f en minuscule

Pour cela, vous aurez besoin d'utiliser le classpath suivant Classpath pig. Vous penserez à tester votre code avec JUnit (uniquement la partie qui permet de découper la ligne en champs)

La solution de cet exercice est téléchargeable ici : Solution chargeur pig

hadoop fs -rm -r -f /user/cloudera/data/prenoms2014
pig -x mapreduce  -p "annee=2014" /home/cloudera/Desktop/formation/pig-chargement2.pig

hadoop fs -rm -r -f /user/cloudera/data/prenoms2015
pig -x mapreduce  -p "annee=2015" /home/cloudera/Desktop/formation/pig-chargement2.pig

hadoop fs -rm -f /user/cloudera/data/prenoms2015/_SUCCESS /user/cloudera/data/prenoms2014/_SUCCESS

REGISTER '/home/cloudera/Desktop/formation/udf-pig.jar';
data = LOAD '/user/cloudera/reception/prenoms_paris_$annee.txt' USING fr.udf.ChargementPrenoms();
STORE data INTO '/user/cloudera/data/prenoms$annee' USING PigStorage('|');

package fr.udf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.log4j.Logger;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;

public class ChargementPrenoms extends LoadFunc {

    private static final Logger LOG = Logger.getLogger(ChargementPrenoms.class);
    private RecordReader reader = null;

    @Override
    public InputFormat getInputFormat() throws IOException {
        InputFormat res = null;
        String inputFormatClassName = TextInputFormat.class.getName();
        try {
            res = (InputFormat) PigContext.resolveClassName(inputFormatClassName).newInstance();
        }
        catch (InstantiationException | IllegalAccessException e) {
            LOG.error(e.getMessage(), e);
        }
        return res;
    }

    @Override
    public Tuple getNext() throws IOException {
        Tuple res = null;
        try {
            if (!reader.nextKeyValue()) {
                reader.close();
                return null;
            }
            Text value = (Text) reader.getCurrentValue();
            String line = value.toString();
            List champs = genererChamps(line);
            if (champs != null) {
                res = TupleFactory.getInstance().newTuple(champs);
            }
        }
        catch (InterruptedException e) {
            throw new ExecException(e.getMessage(), 202, PigException.REMOTE_ENVIRONMENT, e);
        }
        return res;
    }

    protected List genererChamps(String line) {
        List champs = new ArrayList<>();
        champs.add(line.substring(0, 31).trim().toLowerCase());
        champs.add(line.substring(31, 36).trim());
        champs.add(line.substring(36, line.length()).trim().toLowerCase());
        return champs;
    }

    @Override
    public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader recordReader, PigSplit arg1) throws IOException {
        reader = recordReader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

5. Création de tables

Créer deux tables permettant d'accéder de charger les fichiers prénoms origine et prenoms_paris

La table prenoms_paris est partitionnée par année, les champs sont délimités par des |, les fichiers sont enregistrés dans /user/cloudera/tables/prenoms_paris et les colonnes sont les suivantes :

  • prenom : string
  • effectif : int
  • sexe : string

La table origines_prenoms n'est pas partitionnée, les champs sont délimités par des | et les fichiers sont enregistrés dans /user/cloudera/tables/origines_prenoms. Il faudra ignorer la première lignes des fichiers chargés dans cette table. Les colonnes sont les suivantes:

  • prenom : string
  • sexe : string
  • origine : string

Une fois les tables créées, charger le contenu des fichiers dans les tables et notez que les fichiers du dossier data ont été supprimés.

DROP DATABASE IF EXISTS formation CASCADE;
CREATE DATABASE formation;

USE formation;

CREATE TABLE prenoms_paris (
 prenom string,
 effectif int,
 sexe string
) 
PARTITIONED BY (annee string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
LOCATION '/user/cloudera/tables/prenoms_paris' 
tblproperties('skip.header.line.count'='0');

CREATE TABLE origines_prenoms (
 prenom string,
 sexe string,
 origine string
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
LOCATION '/user/cloudera/tables/origines_prenoms' 
tblproperties('skip.header.line.count'='1');

LOAD DATA INPATH '/user/cloudera/data/origines' INTO TABLE origines_prenoms;
LOAD DATA INPATH '/user/cloudera/data/prenoms2014' INTO TABLE prenoms_paris PARTITION(annee=2014);
LOAD DATA INPATH '/user/cloudera/data/prenoms2015' INTO TABLE prenoms_paris PARTITION(annee=2015);

SELECT * FROM prenoms_paris LIMIT 10;
SELECT * FROM origines_prenoms LIMIT 10;

6. Quelques requête SQL

Sélectionner les 10 prénoms les plus donnés en 2015

SELECT * 
FROM prenoms_paris
WHERE annee='2015'
ORDER BY effectif DESC
LIMIT 10;

Sélectionner le nombre de prénoms présents en 2015 et qui n'existaient pas en 2014. Ce nombre est assez élevé étant donné que tous les prénoms de moins de 5 occurences n'ont pas été diffusés.

SELECT count(*)
FROM prenoms_paris p
WHERE annee='2015' 
AND NOT EXISTS (SELECT 1 FROM prenoms_paris q WHERE annee='2014' AND q.prenom=p.prenom)

Créer une requête SQL permettant de séléctionner les origines des prénoms données en 2015 en les classant par fréquence d'apparition. Limiter les résultats aux 10 premiers

SELECT o.origine, count(*) as c
FROM formation.prenoms_paris p 
INNER JOIN formation.origines_prenoms o 
ON p.prenom=o.prenom 
AND p.sexe=o.sexe
WHERE annee='2015'
GROUP BY o.origine
ORDER BY c DESC
LIMIT 10;
Résultat console :
+---------------------+------+
|     o.origine       |  c   |
+---------------------+------+
| french              | 148  |
| english             | 119  |
| arabic              | 50   |
| italian             | 25   |
| french, english     | 19   |
| english, biblical   | 12   |
| english, french     | 10   |
| russian             | 9    |
| spanish             | 9    |
|                     | 7    |
+---------------------+------+

Créer une requête SQL permettant de sélectionner les prénoms donnés à la fois à des filles et à des garcons en 2015 à paris ainsi que le nombre de prénoms

SELECT prenom, nombre FROM (
 SELECT prenom, SUM(effectif) as nombre, count(*) as nb FROM prenoms_paris
 WHERE annee='2015'
 GROUP BY prenom
)t
WHERE t.nb > 1;
Résultat console :
+-------------+---------+
|  prenom     | nombre  |
+---------  --+---------+
| adama       | 17      |
| alix        | 89      |
| andrea      | 41      |
| andréa      | 25      |
| ange        | 11      |
| aïssa       | 10      |
| camille     | 234     |
| charlie     | 35      |
| clarence    | 18      |
| eden        | 73      |
| elia        | 36      |
| elie        | 36      |
| lou         | 109     |
| louison     | 32      |
| maxime      | 162     |
| noa         | 46      |
| sacha       | 124     |
| sasha       | 56      |
| swann       | 22      |
+-----------+-----------+

7. UDF Hive

Créer une udf permettant de définir une indicatrice sur la variable origine de la table origines_prenoms

  • Si le prénom a une origine francaise au moins, l'indicatrice vaut 1
  • Dans les autres cas, elle vaut 0

Pour cela, vous aurez besoin d'utiliser le classpath suivant Classpath hive

A l'aide de cette UDF, créer une requête permettant de sélectionner les 50 premières origines de la table origines_prenoms ainsi que l'indicatrice définie par l'udf. Puis créer une requête permettant de compter le nombre de prénoms donnés à Paris en 2015 qui avaient une origine francaise au moins ainsi que le nombre de prénoms d'origine étrangère

package fr.hive.formation;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

public class OrigineFrancaiseUDF extends GenericUDF {

    private StringObjectInspector origineInspector;

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        String res = "0";
        if (arguments[0] != null) {
            String value = origineInspector.getPrimitiveJavaObject(arguments[0].get());
            if (value != null && value.matches(".*french.*")) {
                res = "1";
            }
        }
        return res;
    }

    @Override
    public String getDisplayString(String[] arg0) {
        return "UDF de classement des origines en francaise : 1 ou étrangères : 0";
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length != 1) {
            throw new UDFArgumentLengthException("La fonction doit contenir un paramètre seulement");
        }
        ObjectInspector a = arguments[0];
        if (!(a instanceof StringObjectInspector)) {
            throw new UDFArgumentException("Le premier argument doit être un String");
        }
        origineInspector = (StringObjectInspector) a;
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }
}

-- Ajout du jar et création de la fonction
ADD JAR /home/cloudera/Desktop/formation/udf-hive.jar;
CREATE TEMPORARY FUNCTION francaise as 'fr.hive.formation.OrigineFrancaiseUDF';

-- Sélection de l'origine des 50 premiers prénoms de la table origines_prenoms
SELECT origine, francaise(origine) FROM origines_prenoms LIMIT 50;

-- Sélection du nombre de prénoms ayant au moins une origine francaise et du nombre de prénoms n'en ayant pas
SELECT 
 case when francaise(o.origine)=0 then 'Prénom origine étrangère' else 'Prénom origine francaise' end as origine, 
 count(*) as nombre
FROM formation.prenoms_paris p 
INNER JOIN formation.origines_prenoms o 
ON p.prenom=o.prenom 
AND p.sexe=o.sexe
WHERE annee='2015'
GROUP BY francaise(o.origine);