avril
2014
Les processeurs multi-coeurs sont de plus en plus courant, mais pas toujours exploité correctement car la majorité des applications restent sur une structure séquentielle des opérations.
Alors comment utiliser toute la puissance des processeurs multi-coeurs pour améliorer le traitement de flux de données ?
C’était une des réponses apportées par l’API Fork/Join de Java 7, mais son API nécessite d’importante modification du code afin de gérer les aspects lié à la parallélisation du traitement… ce qui amène également une certain lourdeur du code, malheureusement.
Ce n’est pas réellement compliqué en soit une fois qu’on a compris le principe, mais ce n’est pas non plus forcément évident ni simple à mettre en oeuvre pour autant.
D’ailleurs je ne pense que cette API doit être assez méconnu chez les développeurs Java, hormis de nom…
Heureusement l’API Stream de Java 8 vient apporter une nouvelle solution, en proposant une API simple permettant d’effectuer divers traitements sur les flux de données, que ce soit en séquentielle ou en parallèle !
Comparons donc les diverses solutions qui s’offre désormais à nous…
Pour mieux comprendre le principe, on va donc simuler un traitement « lourd » sur plusieurs éléments.
Pour cela on va imaginer une classe Data possédant une méthode calculateValue() qui simulera un traitement « long » en effectuant une pause d’environ une milliseconde via Thread.sleep()
.
Le code de cette classe ressemble à ceci :
private long value;
public Data() {
this.value = (int) (Math.random() * 500);
}
public long calculateValue() {
// On simule un traitement "long", avec un sleep d'1 ms
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return this.value;
}
}
On va alors générer une liste de 1000 éléments Data, et notre objectif sera donc de calculer la somme des valeurs retournées par calculateValue() de chaque élément qui la compose, et cela le plus rapidement possible…
Itération séquentielle
On va commencer par une implémentation classique, via un simple boucle for :
long result = 0L;
for (Data data : dataList) {
result += data.calculateValue();
}
return result;
}
Le code est simplissime et immédiatement compréhensible par tout développeur Java, je ne m’y attarderais pas.
Stream séquentiel avec Java 8
Passons à Java 8 avec l’API des Streams :
return dataList.stream()
.mapToLong(Data::calculateValue)
.reduce(0L, (a, b) -> a + b);
}
La logique est inversé : on ne s’occupe pas de l’itération ou du traitement, mais simplement de définir les méthodes à utiliser pour convertir les données ou les collecter.
C’est sûrement moins compréhensible dans un premier temps pour le développeur Java, mais c’est sûrement une question d’habitude.
Elle se déroule en 3 étapes :
- La méthode stream() permet d’obtenir un Stream<Data> à partir de notre liste, ce qui nous permet d’utiliser cette nouvelle API.
- La méthode mapToLong() change le type de donnée du flux en long, via
LongStream
), en exécutant la méthode calculateValue() sur chaque élément. - Enfin la méthode reduce() effectue une opération d’addition sur tous les éléments, afin de générer le résultat que l’on désire.
Note : l’API Stream est bien plus complète que cela, pour plus de détail je vous renvoi vers sa javadoc : http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
Résultat des traitements séquentiels
Lorsque j’utilise ces méthodes pour parcourir une liste de 1000 éléments, je me retrouve logiquement avec des temps de 1000ms, soit 1ms par éléments. Pour être plus exact, sur ma machine j’obtiens des temps de réponse tournant aux alentour de 1300ms, sûrement lié au manque de précision de Thread.sleep()
.
Je rencontre également des différences allant de -32ms à +32ms pour l’une ou l’autre selon les exécutions, mais c’est insignifiant car là aussi c’est probablement dû au manque de précision de la capture du temps.
Il faudrait augmenter la quantité de données pour en être sûr, mais ce n’est pas la question.
On va plutôt revenir au sujet initial de ce billet : la parallélisation des traitements.
En effet les 8 coeurs de mon CPU sont complètement inutile dans ce cas précis, puisqu’on n’utilise qu’un seul d’entre eux.
Fork/Join de Java 7
L’API Fork/Join de Java 7 pose les bases d’un traitement parallèle.
Le principe de base est relativement simple :
- Si la tâche que l’on a à effectuer est « simple », on va l’effectuer directement.
- Sinon on la découpe en deux sous-tâches qui seront exécuté en parallèle.
La difficulté vient du fait de déterminer à partir de quel niveau on peut considérer que la tâche est simple.
Dans notre exemple on considèrera arbitrairement qu’une liste de 10 éléments est suffisamment simple pour être traiter directement.
Dans le cas inverse on découpera la liste en deux partie, afin de les traiter dans différents threads…
Pour cela on va implémenter un RecursiveTask
.
Comme c’est un peu plus complexe j’ai essayé de commenter le code le plus clairement possible :
private final List<Data> list;
public DataSumTask(List<Data> list) {
this.list = list;
}
@Override
protected Long compute() {
final int size = this.list.size();
// Si la liste comporte moins de 10 éléments,
// on considère que le traitement est "simple" :
if (size < 10) {
// On se contente alors d'effectuer le traitement itératif
// via l'implémentation séquentielle précédente
long result = 0L;
for (Data data : this.list) {
result += data.calculateValue();
}
return result;
}
// Sinon on découpe la liste en deux :
final int middle = size / 2;
final List<Data> list1 = this.list.subList(0, middle);
final List<Data> list2 = this.list.subList(middle, size);
// On crée une première tâche pour la première liste :
DataSumTask task1 = new DataSumTask(list1);
// que l'on exécute dans un autre thread :
task1.fork();
// On crée une seconde tâche pour la seconde liste :
DataSumTask task2 = new DataSumTask(list2);
// que l'on execute directement
// (sinon le thread courant n'aurait rien à faire)
long result2 = task2.compute();
// Une fois fini, on attend la fin de la première tâche :
long result1 = task1.join();
// Et on cumule le résultat des deux tâches :
return result1 + result2;
}
}
}
Notre implémentation se contentera alors de créer un DataSumTask qui s’occupera du traitement :
return new DataSumTask(dataList).compute();
}
Au final on se retrouve quand même avec un code bien plus complexe et très différent de la version originale utilisant l’itération classique (itération que l’on retrouve d’ailleurs intégralement dans le cas « simple » du traitement).
Tout le reste du code est lié au découpage des tâches et à la gestion des tâches, sans rapport avec notre code métier initial (sommer les valeurs retournée par calculateValue()).
Par contre du coté des performances on voit une nette différence, puisque j’obtiens un temps de traitement moyen de 190ms.
Le code est presque 8 fois plus rapide, ce qui est logique puisque désormais j’utilise bien les 8 coeurs de mon CPU…
Les performances sont bel et bien là… en dépit de la clarté du code.
Stream parallèle avec Java 8
Revenons à Java 8 et à son API de Stream.
La modification est ici nettement plus simple, puisqu’il nous suffit de 8 petits caractères pour transformer notre implémentation séquentielle en une implémentation parallèle : on va se contenter de remplacer l’appel de stream() par parallelStream() :
return dataList.parallelStream()
.mapToLong(Data::calculateValue)
.reduce(0L, (a, b) -> a + b);
}
Et c’est tout ! Notre code est désormais parallélisé !
En effet l’API de Stream gère parfaitement les deux implémentations, et il suffit de respecter quelques règles simples pour s’assurer un code parallélisable :
- Les opérations ne doivent pas interférer avec les données du Stream (par exemple ne pas modifier la source de données).
- Les opérations doivent être stateless, c’est à dire sans état ou objet externe qui pourrait varier selon l’exécution du traitement.
Il n’y a rien de tout cela dans notre code original, et l’on peut donc le paralléliser sans problème !
Bref coté lisibilité c’est que du bonheur (une fois qu’on a pris l’habitude de cette API Stream).
Et coté performance ? J’obtiens un temps moyen de 170ms.
On est donc bien dans le même ordre de grandeur que l’API Fork/Join, avec une différence constamment en faveur de l’API de Stream, sûrement mieux optimisé que mon DataSumTask.
Simple et efficace !
Vous pouvez télécharger le code source complet de cet exemple à cette adresse : http://adiguba.developpez.com/blog/parallel/Main.java
6 Commentaires + Ajouter un commentaire
Tutoriels
Discussions
- Définition exacte de @Override
- Recuperation du nom des parametres
- Classes, méthodes private
- Possibilité d'accéder au type générique en runtime
- Difference de performances Unix/Windows d'un programme?
- [ fuite ] memoire
- jre 1.5, tomcat 6.0 et multi processeurs
- [REFLEXION] Connaitre toutes les classes qui implémentent une interface
- L'apparition du mot-clé const est-il prévu dans une version à venir du JDK?
Oui en fait j’ai oublié un détail assez important sur les opérations de réduction : elles doivent être associatives.
C’est à dire que si on a 3 éléments, A op B op C est équivalent à (A op B) op C qui est équivalent à A op (B op C)…
Bref l’ordre ne doit pas modifier le résultat (justement en cas d’utilisation en parallèle).
Sinon c’est tout simplement que le calcul ne peut pas être paralléliser. Ce qui est le cas de (a,b)->a/b puisque tu as forcément besoin d’avoir le résultat précédent pour calculer le suivant…
A noter également que plusieurs des méthodes de Stream prennent en compte (ou pas) l’ordre des données dans la parallélisation
Par exemple findFirst() te renverra le premier élément correspondant aux critères, en respectant l’ordre des éléments même en parallèle.
Mais à l’inverse findAny() fera la même chose sans respecter l’ordre, ce qui pourrait aboutir à des résultats différents en parallèle, mais qui serait aussi plus performant (pas besoin de synchronisation).
a++
Parmi les choses qui trompent au premier regard c’est le fait que les expressions comme (a,b)->a+b, on a tendance à voir comme si le a et le b peuvent être tous les deux des éléments initiaux de stream avant l’entré en opération de réduction, alors qu’en fait par exemple pour le séquentiel on a toujours a c’est la somme de ce qui est calculé une étape précédente et puis ajouté à b à l’étape courante, donc a==0 au début et b ne fait que jouer le vrai élément qui itère sur les éléments initiaux.
Mais la question reste intéressante sur son comportement lors de la parallélisation car là il peut aller dans tous les sens c’est le cas quand j’ai fais:
int S=Stream.of(2,3,5).reduce(0, (a, b) -> {
out.print(« a= »+a+ », « );
out.print(« b= »+b+ », « );
return a+b;});
out.println(« S= »+S);
Pour un teste donné il m’affiche a=0, a=0, b=5, a=0, b=2, b=3, a=3, b=5, a=2, b=8, S=10
On voit que b a pris un moment donné la valeur 8, donc 5+3, or sans parallélisation on a seulement:
a=0, b=2, a=2, b=3, a=5, b=5, S=10
C’est ce qui me fait qu’au début là, j’ai pu calculé la sommation des carrés avec le (a, b) -> a+b*b), mais je n’arrive pas à paralléliser.
Au fait j’ai trouvé une astuce pour transformer certaines opérations non associatives à des décomposés pour donner une opération associative donc parallélisable :D.
C’est très simple j’ai pensé à mapper avant de réduire
Par exemple pour mon cas là de la sommation des carrés, j’ai pu paralléliser en faisant :
int S=Stream.of(7,4,9).parallel().map(c->c*c).reduce(0, (a, b) -> a+b); ce qui me donne S==146, exactement si je fais S=Stream.of(2,3,5).reduce(0, (a, b) -> a+b*b);
Même si cela ne marchera pas à tous les coups, car ça ne va jamais marcher pour le cas de (a,b)->a/b , vu qu’en fait c’est le caractère d’indépendance des sommes des carrés qui nous permis de mapper avant de réduire au cas précédant, mais ici …
On voit bien qu’avec cet API il va nous falloir un peu plus d’attention pour ne pas faire du n’importe quoi et surtout comment bien en tirer profit.
Oui en parallèle les données vont être coupées en plusieurs parties (selon le nombre de CPU disponible), et les données seront donc traité séparément « par petits bouts » avant d’être regroupées.
Par exemple pour la somme si tu as les valeurs (1, 2, 3, 4, 5, 6, 7, 8), on peut se retrouver avec 2 threads, l’un effectuant la somme de (1, 2, 3, 4) et l’autre la somme de (5, 6, 7, 8) par exemple.
A la fin, le résultat des deux threads (10, 26) est utilisé dans l’opération pour finaliser le résultat final : 36
C’est pour cela que l’opération de réduction doit être associative.
A noter que le reduce() peut être effectuer également via l’opération collect(), qui correspond grosso-modo à la même chose mais en plus complet (l’objet intermédiaire peut être de type différent)
a++
En fait je savais qu’il utilise des thread et il découpe les données en plusieurs pour calculer comme tu as dis par groupement et réunir le tout après, mais je n’arrivais pas à prouver à 100% le comportement exacte de l’expression lambda une fois découper le tout, ainsi j’ai dis qu’il peut commencer de tous les sens, mais en fait la règle reste la même.
Alors là si on se positionne sur l’opération de réduction, avec les deux paramètres dans notre exemple a et b, dans tous les cas il passe la valeur cumulé à a en commençant avec 0 et b itère sur les données.
Il fait la même chose chaque sous ensemble, et il fait la même chose l’ensemble résultat, donc ici donc ton exemple une fois encore trouver la nouvelle ensemble (10, 26), il fera forcement encore a=0, b=10, et puis a+b, et puis a=10 b=26, pour terminer avec a+b=36.
Là à présent j’ai levé toute ambiguïté. Ça reste qu’il faut être intelligent sur la manipulation de l’API, car on peut facilement se tomber sur la bonne manière d’écrire une expression qu’on veut, donc le fait de pouvoir toujours tester et démontrer manuellement le comportement du programme est très important, car faire un programme dont on ignore une partie de l’algorithme c’est un danger.
Surtout avec la programmation parallèle, il y a beaucoup de notions intéressantes à apprendre car des notions de base on les traite avec des boucles, des contions…, ici on a autre chose, là il va falloir apprendre les bonnes pratiques par exemple lorsque on a des conditions avant calcul (les inhibiteur et les ralentisseurs) déjà il faudra savoir comment bien mettre en pratique l’API au service de la programmation parallèle. Quel sont les avantages et les inconvénients.
Et pour notre cas d’opération associative alors moi ayant aussi fais des math, je sais que pour des expressions un peu complexe ce n’est pas toujours facile de démontrer qu’elle est associative et pourtant il peut l’être. Déjà dans notre cas aussi, on a juste parlé de simple opération, et il y a aussi les cas des données structurés de façon complexe(tableau, matrice, objet) on peut avoir a traité des opérations associatives avec et on retourne des données complexes, on peut avoir des opérations où si c’est à nous de définir l’opération par une fonction. Ça reste un champs à percer profondément.
Déjà ça reste un peu moins claire sur comment déterminer la complexité d’un algorithme ici. Y a-t-il une doc sur les règles de base du fonctionnement de Stream, par exemple sur une machine à 4 cœurs avec quel quantité de données il va les consommer tous, est que toujours, pourquoi selon les données il bosse avec deux thread au lieu de 4, je veux comprendre les règles de Fork/Join utilisé par Stream, quand est-ce que par exemple il dit que cette opération est simple?
Oui l’utilisation des Streams implique quand même quelques connaissances spécifiques, surtout si on veut utiliser la parallélisation.
Sinon par défaut l’API de Stream utilise un ForkJoinPool basé sur le nombre de CPU disponible sur la machine. Info que l’on peut récupérer via Runtime.getRuntime().availableProcessors().
Il y a moyen de changer cela en exécutant le stream dans un ForkJoinPool plus spécifique…
Sinon pour le reste je pense que le Spliterator a une part importante. C’est lui qui permet de découper (ou pas) le flux de données en deux, en plus de conserver des caractéristiques sur le flux de données (ordre, taille, etc.)
a++
Très bon billet, Félicitation!
J’ai juste une petite question: quand vous avez cité les règles à respecter pour que le code soit parallélisé, N’avez vous pas oublié de mentionner le problème sur les résultats erronés au cas où on fait de la parallélisation sur des opération non associatives en particulier sur les opération de réduction ou bien cela rentrera dans la modification des sources.
Je vais illustrer un peu les chose, d’abord le cas présenté (a,b)->a+b, c’est simple car (a+b)+c= a+(b+c), ainsi ça va se passer S=a+b puis S+c ou S=b+c, S+a, si on suppose qu’il a découper b et c à part pour cumuler puis ajouter l’élément restant. Ainsi on veut a+b+c+d quelque soit la manière dont il va découper les opérations pour paralléliser à la fin ça sera la somme.
Mais si on fait (a,b)->a/b car (a/b)/c est différent de a/(b/c), de la même manière si on fait (a,b)->a*a+b*b car là il va prendre toujours le carré de ce qui est cumulé pour l’ajouter au nouveau carré. Donc s’il fait a*a+b*b pour une étape donc pour passer à l’étape suivante il va faire (a*a+b*b)*(a*a+b*b)+c*c ce qui est totalement différent de a*a+(b*b+c*c)*(b*b+c*c)
Même si ce n’est pas la solution pour la somme des carré car en réalité pour faire la somme des carrés ce qui est sûr en mode séquentiel il faudrait faire:
reduce(0, (a, b) -> a + b*b); Car il cumule sur a et on ajoute le b au carré, j’ai bien testé ça c’est correcte.
Mais là se pose le problème si on parallélise on a des résultats complètement fausses
C’est bien d’exposer sur le Fork/join. En fait, j’ai lu un article comme quoi ils ont bien bossés pour Fork/Join sur JDK 8, les performances affichés par JDK 8 sont supérieurs à celle affichés par JDK7 :ccool: