Apache Ignite + Apache Spark Data Frames: ensemble plus de plaisir

Bonjour, Habr! Je m'appelle Nikolai Izhikov, je travaille pour Sberbank Technologies dans l'équipe de développement de solutions Open Source. DerriÚre 15 ans de développement commercial en Java. Je suis un contributeur Apache Ignite et un contributeur Apache Kafka.

Sous le chat, vous trouverez une version vidĂ©o et texte de mon rapport sur Apache Ignite Meetup sur la façon d'utiliser Apache Ignite avec Apache Spark et les fonctionnalitĂ©s que nous avons mises en Ɠuvre pour cela.



Ce qu'Apache Spark peut faire


Qu'est-ce que Apache Spark? Il s'agit d'un produit qui vous permet d'effectuer rapidement des requĂȘtes de calcul et d'analyse distribuĂ©es. Fondamentalement, Apache Spark est Ă©crit en Scala.

Apache Spark possĂšde une API riche pour se connecter Ă  diffĂ©rents systĂšmes de stockage ou recevoir des donnĂ©es. L'une des caractĂ©ristiques du produit est un moteur de requĂȘte universel de type SQL pour les donnĂ©es reçues de diverses sources. Si vous avez plusieurs sources d'informations, que vous souhaitez les combiner et obtenir des rĂ©sultats, Apache Spark est ce dont vous avez besoin.

L'une des abstractions clĂ©s que Spark fournit est Data Frame, DataSet. En termes de base de donnĂ©es relationnelle, il s'agit d'une table, une source qui fournit des donnĂ©es de maniĂšre structurĂ©e. La structure, le type de chaque colonne, son nom, etc., est connu. Les trames de donnĂ©es peuvent ĂȘtre créées Ă  partir de diverses sources. Les exemples incluent les fichiers json, les bases de donnĂ©es relationnelles, divers systĂšmes hadoop et Apache Ignite.

Spark prend en charge les jointures dans les requĂȘtes SQL. Vous pouvez combiner des donnĂ©es provenant de diverses sources et obtenir des rĂ©sultats, effectuer des requĂȘtes analytiques. De plus, il existe une API pour enregistrer les donnĂ©es. Lorsque vous avez terminĂ© les requĂȘtes, effectuĂ© une Ă©tude, Spark offre la possibilitĂ© d'enregistrer les rĂ©sultats sur le rĂ©cepteur qui prend en charge cette fonctionnalitĂ© et, en consĂ©quence, de rĂ©soudre le problĂšme du traitement des donnĂ©es.

Quelles fonctionnalitĂ©s avons-nous mises en Ɠuvre pour intĂ©grer Apache Spark Ă  Apache Ignite


  1. Lecture des données des tables SQL Apache Ignite.
  2. Écriture de donnĂ©es dans des tables SQL Apache Ignite.
  3. IgniteCatalog dans IgniteSparkSession - la possibilité d'utiliser toutes les tables Ignite SQL existantes sans s'enregistrer «à la main».
  4. Optimisation SQL - la possibilité d'exécuter des instructions SQL dans Ignite.

Apache Spark peut lire les donnĂ©es des tables SQL Apache Ignite et les Ă©crire sous la forme d'une telle table. Tout DataFrame formĂ© dans Spark peut ĂȘtre enregistrĂ© en tant que table Apache Ignite SQL.

Apache Ignite vous permet d'utiliser toutes les tables SQL Ignite existantes dans Spark Session sans vous inscrire «à la main» - en utilisant IgniteCatalog dans l'extension SparkSession standard - IgniteSparkSession.

Ici, vous devez aller un peu plus loin dans l'appareil Spark. En termes de base de donnĂ©es rĂ©guliĂšre, un rĂ©pertoire est un endroit oĂč les mĂ©ta-informations sont stockĂ©es: quelles tables sont disponibles, quelles colonnes s'y trouvent, etc. Lorsqu'une demande arrive, les mĂ©ta-informations sont extraites du catalogue et le moteur SQL fait quelque chose avec les tables et les donnĂ©es. Par dĂ©faut, dans Spark, toutes les tables de lecture (peu importe, Ă  partir d'une base de donnĂ©es relationnelle, Ignite, Hadoop) doivent ĂȘtre enregistrĂ©es manuellement dans la session. Par consĂ©quent, vous avez la possibilitĂ© d'effectuer une requĂȘte SQL sur ces tables. Spark les dĂ©couvre.

Pour travailler avec les données que nous avons téléchargées sur Ignite, nous devons enregistrer les tables. Mais au lieu d'enregistrer chaque table avec nos mains, nous avons implémenté la possibilité d'accéder automatiquement à toutes les tables Ignite.

Quelle est la fonctionnalitĂ© ici? Pour une raison inconnue, le rĂ©pertoire dans Spark est une API interne, c'est-Ă -dire un Ă©tranger ne peut pas venir crĂ©er sa propre implĂ©mentation de catalogue. Et, depuis que Spark est sorti de Hadoop, il ne prend en charge que Hive. Et vous devez enregistrer tout le reste avec vos mains. Les utilisateurs demandent souvent comment contourner ce problĂšme et effectuer immĂ©diatement des requĂȘtes SQL. J'ai implĂ©mentĂ© un rĂ©pertoire qui vous permet de parcourir et d'accĂ©der aux tables Ignite sans enregistrer ~ et sms ~, et j'ai initialement proposĂ© ce patch dans la communautĂ© Spark, auquel j'ai reçu une rĂ©ponse: un tel patch n'est pas intĂ©ressant pour certaines raisons internes. Et ils n'ont pas donnĂ© l'API interne.

DĂ©sormais, le catalogue Ignite est une fonctionnalitĂ© intĂ©ressante implĂ©mentĂ©e Ă  l'aide de l'API interne de Spark. Pour utiliser ce rĂ©pertoire, nous avons notre propre implĂ©mentation de la session, c'est la SparkSession habituelle, dans laquelle vous pouvez faire des requĂȘtes, traiter des donnĂ©es. Les diffĂ©rences sont que nous y avons intĂ©grĂ© ExternalCatalog pour travailler avec les tables Ignite, ainsi que IgniteOptimization, qui sera dĂ©crit ci-dessous.

Optimisation SQL - la possibilitĂ© d'exĂ©cuter des instructions SQL dans Ignite. Par dĂ©faut, lors de l'exĂ©cution d'une jointure, d'un regroupement, d'un calcul d'agrĂ©gation et d'autres requĂȘtes SQL complexes, Spark lit les donnĂ©es ligne par ligne. La seule chose que la source de donnĂ©es peut faire est de filtrer efficacement les lignes.

Si vous utilisez la jointure ou le regroupement, Spark extrait toutes les donnĂ©es de la table dans sa mĂ©moire vers le programme de travail, Ă  l'aide des filtres spĂ©cifiĂ©s, puis les regroupe ou effectue d'autres opĂ©rations SQL. Dans le cas d'Ignite, ce n'est pas optimal, car Ignite lui-mĂȘme a une architecture distribuĂ©e et a connaissance des donnĂ©es qui y sont stockĂ©es. Par consĂ©quent, Ignite lui-mĂȘme peut calculer efficacement les agrĂ©gats et effectuer le regroupement. De plus, il peut y avoir beaucoup de donnĂ©es, et pour les regrouper, vous devrez tout soustraire, augmenter toutes les donnĂ©es dans Spark, ce qui est assez cher.

Spark fournit une API avec laquelle vous pouvez modifier le plan initial de la requĂȘte SQL, effectuer une optimisation et transfĂ©rer la partie de la requĂȘte SQL qui peut y ĂȘtre exĂ©cutĂ©e dans Ignite. Cela sera efficace en termes de vitesse et de consommation de mĂ©moire, car nous ne l'utiliserons pas pour extraire des donnĂ©es qui seront immĂ©diatement regroupĂ©es.

Comment ça marche




Nous avons un cluster Ignite - c'est la moitiĂ© infĂ©rieure de l'image. Il n'y a pas de gardien de zoo, car il n'y a que cinq nƓuds. Il y a des ouvriers spark, Ă  l'intĂ©rieur de chaque ouvrier le nƓud client Ignite est levĂ©. GrĂące Ă  lui, nous pouvons faire une demande et lire les donnĂ©es, interagir avec le cluster. En outre, le nƓud client monte dans IgniteSparkSession pour que le rĂ©pertoire fonctionne.

Allumer la trame de données


Passons au code: comment lire les donnĂ©es d'une table SQL? Dans le cas de Spark, tout est assez simple et bon: nous disons que nous voulons calculer certaines donnĂ©es, indiquer le format - c'est une certaine constante. De plus, nous avons plusieurs options - le chemin d'accĂšs au fichier de configuration pour le nƓud client, qui dĂ©marre lors de la lecture des donnĂ©es. Nous indiquons la table que nous voulons lire et demandons Ă  Spark de charger. Nous obtenons les donnĂ©es et nous pouvons en faire ce que nous voulons.

spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) .option(OPTION_TABLE, "person") .load() 

AprĂšs avoir gĂ©nĂ©rĂ© les donnĂ©es - Ă©ventuellement depuis Ignite, Ă  partir de n'importe quelle source - nous pouvons tout aussi facilement tout sauvegarder en spĂ©cifiant le format et le tableau correspondant. Nous demandons Ă  Spark d'Ă©crire, nous spĂ©cifions un format. Dans la configuration, nous prescrivons Ă  quel cluster se connecter. SpĂ©cifiez la table dans laquelle nous voulons enregistrer. De plus, nous pouvons prescrire des options d'utilitaire - spĂ©cifiez la clĂ© primaire que nous crĂ©ons sur ce tableau. Si les donnĂ©es bouleversent simplement sans crĂ©er de table, ce paramĂštre n'est pas nĂ©cessaire. À la fin, cliquez sur Enregistrer et les donnĂ©es sont Ă©crites.

 tbl.write. format(FORMAT_IGNITE). option(OPTION_CONFIG_FILE, CFG_PATH). option(OPTION_TABLE, tableName). option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, pk). save 

Voyons maintenant comment tout cela fonctionne.


LoadDataExample.scala

Cette application évidente démontrera d'abord les capacités d'enregistrement. Par exemple, j'ai choisi les données sur les matchs de football, les statistiques téléchargées à partir d'une ressource bien connue. Il contient des informations sur les tournois: ligues, matches, joueurs, équipes, attributs des joueurs, attributs des équipes - données qui décrivent les matchs de football dans les ligues des pays européens (Angleterre, France, Espagne, etc.).

Je veux les télécharger sur Ignite. Nous créons une session Spark, spécifions l'adresse de l'assistant et appelons le chargement de ces tables en passant des paramÚtres. L'exemple est en Scala, pas en Java, car Scala est moins verbeux et donc meilleur par exemple.

Nous transfĂ©rons le nom du fichier, le lisons, indiquons qu'il est multiligne, il s'agit d'un fichier json standard. Ensuite, nous Ă©crivons dans Ignite. La structure de notre fichier n'est nulle part Ă  dĂ©crire - Spark lui-mĂȘme dĂ©termine quelles donnĂ©es nous avons et quelle est leur structure. Si tout se passe bien, une table est créée dans laquelle se trouvent tous les champs nĂ©cessaires des types de donnĂ©es requis. C'est ainsi que nous pouvons tout charger dans Ignite.

Lorsque les donnĂ©es sont chargĂ©es, nous pouvons les voir dans Ignite et les utiliser immĂ©diatement. À titre d'exemple simple, une requĂȘte qui vous permet de savoir quelle Ă©quipe a jouĂ© le plus de matchs. Nous avons deux colonnes: hometeam et awayteam, hĂŽtes et invitĂ©s. Nous sĂ©lectionnons, groupons, comptons, additionnons et joignons les donnĂ©es de la commande - pour entrer le nom de la commande. Ta-dam - et les donnĂ©es de json-chiks que nous avons obtenues dans Ignite. Nous voyons Paris Saint-Germain, Toulouse - nous avons beaucoup de donnĂ©es sur les Ă©quipes françaises.



Nous rĂ©sumons. Nous avons maintenant tĂ©lĂ©chargĂ© des donnĂ©es depuis la source, le fichier json, vers Ignite, et assez rapidement. Peut-ĂȘtre, du point de vue des mĂ©gadonnĂ©es, ce n'est pas trop grand, mais dĂ©cent pour un ordinateur local. Le schĂ©ma de la table est extrait du fichier json dans sa forme d'origine. La table a Ă©tĂ© créée, les noms des colonnes ont Ă©tĂ© copiĂ©s Ă  partir du fichier source, la clĂ© primaire a Ă©tĂ© créée. L'ID est partout et la clĂ© primaire est l'ID. Ces donnĂ©es sont entrĂ©es dans Ignite, nous pouvons les utiliser.

IgniteSparkSession et IgniteCatalog


Voyons comment cela fonctionne.


CatalogExample.scala

D'une maniÚre assez simple, vous pouvez accéder à toutes vos données et les interroger. Dans le dernier exemple, nous avons démarré la session spark standard. Et il n'y avait aucune spécificité Ignite là-bas - sauf que vous devez mettre un pot avec la bonne source de données - un travail complÚtement standard via l'API publique. Mais, si vous souhaitez accéder automatiquement aux tables Ignite, vous pouvez utiliser notre extension. La différence est qu'au lieu de SparkSession, nous écrivons IgniteSparkSession.

DĂšs que vous crĂ©ez un objet IgniteSparkSession, vous voyez dans le rĂ©pertoire toutes les tables qui viennent d'ĂȘtre chargĂ©es dans Ignite. Vous pouvez voir leur diagramme et toutes les informations. Spark connaĂźt dĂ©jĂ  les tables qu'Ignite possĂšde et vous pouvez facilement obtenir toutes les donnĂ©es.



Igniteoptimization


Lorsque vous effectuez des requĂȘtes complexes dans Ignite Ă  l'aide de JOIN, Spark extrait d'abord les donnĂ©es, puis seulement JOIN les regroupe. Pour optimiser le processus, nous avons créé la fonction IgniteOptimization - elle optimise le plan de requĂȘte Spark et vous permet de transmettre les parties de la demande qui peuvent ĂȘtre exĂ©cutĂ©es dans Ignite dans Ignite. Nous montrons l'optimisation sur une demande spĂ©cifique.

 SQL Query: SELECT   city_id,   count(*) FROM   person p GROUP BY city_id HAVING count(*) > 1 

Nous satisfaisons la demande. Nous avons une table de personnes - certains employés, des gens. Chaque employé connaßt l'identifiant de la ville dans laquelle il vit. Nous voulons savoir combien de personnes vivent dans chaque ville. Nous filtrons - dans quelle ville vit plus d'une personne. Voici le plan initial que Spark construit:

 == Analyzed Logical Plan == city_id: bigint, count(1): bigint Project [city_id#19L, count(1)#52L] +- Filter (count(1)#54L > cast(1 as bigint))  +- Aggregate [city_id#19L], [city_id#19L, count(1) AS count(1)#52L, count(1) AS count(1)#54L] +- SubqueryAlias p    +- SubqueryAlias person       +- Relation[NAME#11,BIRTH_DATE#12,IS_RESIDENT#13,SALARY#14,PENSION#15,ACCOUNT#16,AGE#17,ID#18L,CITY_ID#19L]         IgniteSQLRelation[table=PERSON] 

La relation n'est qu'une table Ignite. Il n'y a pas de filtres - nous pompons simplement toutes les données de la table Person sur le réseau à partir du cluster. Ensuite, Spark agrÚge tout cela - conformément à la demande et renvoie le résultat de la demande.

Il est facile de voir que tous ces sous-arbres avec filtre et agrĂ©gation peuvent ĂȘtre exĂ©cutĂ©s dans Ignite. Cela sera beaucoup plus efficace que d'extraire toutes les donnĂ©es d'une table potentiellement grande dans Spark - c'est ce que fait notre fonction IgniteOptimization. AprĂšs avoir analysĂ© et optimisĂ© l'arbre, nous obtenons le plan suivant:

 == Optimized Logical Plan == Relation[CITY_ID#19L,COUNT(1)#52L]   IgniteSQLAccumulatorRelation(     columns=[CITY_ID, COUNT(1)], qry=SELECT CITY_ID, COUNT(1) FROM PERSON GROUP BY city_id HAVING count(1) > 1) 

En conséquence, nous n'obtenons qu'une seule relation, car nous avons optimisé tout l'arbre. Et à l'intérieur, vous pouvez déjà voir qu'Ignite enverra une demande suffisamment proche de la demande d'origine.

Supposons que nous joignions différentes sources de données: par exemple, nous avons un DataFrame d'Ignite, le second de json, le troisiÚme d'Ignite à nouveau et le quatriÚme d'une sorte de base de données relationnelle. Dans ce cas, seul le sous-arbre sera optimisé dans le plan. Nous optimisons ce que nous pouvons, le déposons dans Ignite et Spark fera le reste. Pour cette raison, nous obtenons un gain de vitesse.

Un autre exemple avec JOIN:

 SQL Query - SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 FROM jt1 JOIN jt2 ON jt1.val1 = jt2.val2 

Nous avons deux tableaux. Nous restons unis par valeur et sélectionnons parmi eux tous - ID, valeurs. Spark propose un tel plan:

 == Analyzed Logical Plan == id1: bigint, val1: string, id2: bigint, val2: string Project [id#4L AS id1#84L, val1#3, id#6L AS id2#85L, val2#5] +- Join Inner, (val1#3 = val2#5) :- SubqueryAlias jt1 : +- Relation[VAL1#3,ID#4L] IgniteSQLRelation[table=JT1] +- SubqueryAlias jt2    +- Relation[VAL2#5,ID#6L] IgniteSQLRelation[table=JT2] 

Nous voyons qu'il va extraire toutes les donnĂ©es d'une table, toutes les donnĂ©es de la seconde, les joindre en lui et donner les rĂ©sultats. AprĂšs le traitement et l'optimisation, nous obtenons exactement la mĂȘme demande qui va Ă  Ignite, oĂč elle est exĂ©cutĂ©e relativement rapidement.

 == Optimized Logical Plan == Relation[ID#84L,VAL1#3,ID#85L,VAL2#5] IgniteSQLAccumulatorRelation(columns=[ID, VAL1, ID, VAL2], qry= SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2 FROM JT1 JOIN JT2 ON JT1.val1 = JT2.val2 WHERE JT1.val1 IS NOT NULL AND JT2.val2 IS NOT NULL) 

Je vais vous montrer un exemple.


OptimizationExample.scala

Nous crĂ©ons une session IgniteSpark dans laquelle toutes nos capacitĂ©s d'optimisation sont dĂ©jĂ  automatiquement incluses. Voici la demande: trouvez les joueurs avec la note la plus Ă©levĂ©e et affichez leurs noms. Dans la table des joueurs, leurs attributs et donnĂ©es. Nous nous joignons, filtrons les donnĂ©es indĂ©sirables et affichons les joueurs avec la note la plus Ă©levĂ©e. Voyons quel type de plan nous avons obtenu aprĂšs l'optimisation et montrons les rĂ©sultats de cette requĂȘte.



Nous commençons. Nous voyons des noms de famille familiers: Messi, Buffon, Ronaldo, etc. Soit dit en passant, certains pour une raison quelconque se rencontrent sous deux formes - Messi et Ronaldo. Les amateurs de football peuvent trouver Ă©trange que des joueurs inconnus apparaissent sur la liste. Ce sont des gardiens de but, des joueurs avec des caractĂ©ristiques assez Ă©levĂ©es - dans le contexte des autres joueurs. Maintenant, nous regardons le plan de requĂȘte qui a Ă©tĂ© exĂ©cutĂ©. Dans Spark, presque rien n'a Ă©tĂ© fait, c'est-Ă -dire que nous avons de nouveau envoyĂ© la demande entiĂšre Ă  Ignite.

Apache Ignite Development


Notre projet est un produit open source, nous sommes donc toujours satisfaits des correctifs et des commentaires des dĂ©veloppeurs. Votre aide, vos commentaires, vos correctifs sont les bienvenus. Nous les attendons. 90% de la communautĂ© Ignite est russophone. Par exemple, pour moi, jusqu'Ă  ce que je commence Ă  travailler sur Apache Ignite, la meilleure connaissance de l'anglais n'Ă©tait pas dissuasive. Cela ne vaut guĂšre la peine d'Ă©crire en russe sur une liste de dĂ©veloppeurs, mais mĂȘme si vous Ă©crivez quelque chose de mal, ils vous rĂ©pondront et vous aideront.

Que peut-on améliorer sur cette intégration? Comment puis-je vous aider si vous avez un tel désir? Liste ci-dessous. Les astérisques indiquent la complexité.


Pour tester l'optimisation, vous devez Ă©crire des tests avec des requĂȘtes complexes. Ci-dessus, j'ai montrĂ© quelques requĂȘtes Ă©videntes. Il est clair que si vous Ă©crivez beaucoup de groupements et beaucoup de jointures, alors quelque chose peut tomber. C'est une tĂąche trĂšs simple - venez le faire. Si nous trouvons des bogues basĂ©s sur les rĂ©sultats des tests, ils devront ĂȘtre corrigĂ©s. Ce sera plus difficile lĂ -bas.

Une autre tùche claire et intéressante est l'intégration de Spark avec un client léger. Il est initialement capable de spécifier certains ensembles d'adresses IP, et cela suffit pour rejoindre le cluster Ignite, ce qui est pratique en cas d'intégration avec un systÚme externe. Si vous souhaitez soudainement rejoindre la solution à ce problÚme, je vais personnellement vous aider.

Si vous souhaitez rejoindre la communauté Apache Ignite, voici quelques liens utiles:


Nous avons une liste de dĂ©veloppeurs rĂ©actifs, qui vous aidera. C'est encore loin d'ĂȘtre idĂ©al, mais en comparaison avec d'autres projets, il est vraiment vivant.

Si vous connaissez Java ou C ++, vous cherchez du travail et souhaitez développer l'Open Source (Apache Ignite, Apache Kafka, Tarantool, etc.) écrivez ici: join-open-source@sberbank.ru.

Source: https://habr.com/ru/post/fr427297/


All Articles