Définition UDF, qui prend un tableau d'objets dans Spark DataFrame?

Quand travaille S. Spark DataFrames Pour afficher des données dans des colonnes, des fonctions personnalisées sont requises. /UDFs/. UDFs Cela nécessite que les types d'arguments soient explicitement indiqués. Dans mon cas, j'ai besoin de manipuler une colonne constituée de tableaux d'objets et je ne sais pas quel type à utiliser. Voici un exemple:


import sqlContext.implicits._

// Start with some data. Each row /here, there's only one row/
// is a topic and a bunch of subjects
val data = sqlContext.read.json/sc.parallelize/Seq/
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
"""///


Relativement facile d'utiliser l'intégré
org.apache.spark.sql.functions

Pour effectuer des transactions de données de base dans les colonnes


import org.apache.spark.sql.functions.size
data.select/$"topic", size/$"subjects"//.show

+-----+--------------+
|topic|size/subjects/|
+-----+--------------+
| pets| 2|
+-----+--------------+


Et en général facile à écrire custom UDFs Effectuer des opérations arbitraires


import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase// }
data.select/enhance/$"topic"/, size/$"subjects"//.show

+----------+--------------+
|UDF/topic/|size/subjects/|
+----------+--------------+
| PETS| 2|
+----------+--------------+


Mais si je veux utiliser UDF Manipuler le tableau d'objets dans la colonne "subjects"? Quel type j'utilise pour l'argument dans UDF? Par exemple, si je veux remplacer la fonction de taille et ne pas utiliser la fonction fournie spark:


val my_size = udf { subjects: Array[Something] => subjects.size }
data.select/$"topic", my_size/$"subjects"//.show


Il est clair que
Array[Something]

Ne fonctionne pas ... Quel type que je dois utiliser !? Devrais-je refuser
Array[]

? Creuser autour me dit que
scala.collection.mutable.WrappedArray

Cela peut avoir une sorte d'attitude envers cela, mais il y a toujours un autre type que je dois fournir.
Invité:

Christian

Confirmation de:

Ce que vous cherchez est
Seq[o.a.s.sql.Row]

:


import org.apache.spark.sql.Row

val my_size = udf { subjects: Seq[Row] => subjects.size }


Explication

:

Vue actuelle
ArrayType

, Comme tu le sais déjà,
WrappedArray

, alors
Array

Cela ne fonctionnera pas et il est préférable de rester de la sécurité.

https://spark.apache.org/docs/ ... types
, Lan /externe/ Taper
StructType

est un
Row

. Malheureusement, cela signifie que l'accès aux champs individuels n'est pas sécurisé.

Entrées

:

Créer
struct

dans Spark < 2.3, fonction transmise dans
udf

, DOIT RETOURNER TYPE
Product

/
Tuple*

ou
case class

/, mais non
Row

. C'est parce que les options correspondantes
udf

https://spark.apache.org/docs/ ... tions$
Scala :

Définit la fermeture Scala de

n

Arguments comme fonction utilisateur /UDF/. Les types de données sont automatiquement affichés en fonction de la signature de clôture. Scala.


DANS Spark >= 2.3 Puis-je retourner
Row

directement
https://spark.apache.org/docs/ ... tions$@udf/f:AnyRef,dataType:org.apache.spark.sql.types.DataType/:org.apache.spark.sql.expressions.UserDefinedFunction
.


def udf/f: AnyRef, dataType: DataType/: UserDefinedFunction

Détermine la fonction utilisateur déterministe /UDF/ Avec l'aide de la fermeture Scala. Pour cette option, l'appelant doit spécifier le type de sortie et il n'y a pas de type de boîtier automatique.

Voir, par exemple,
https://coderoad.ru/50949384/
.

Pour répondre aux questions, connectez-vous ou registre