Mi aprendizaje al interactuar con los clientes de Databricks
En Databricks, ayudo a grandes organizaciones de ventas a implementar y escalar canales de datos y aprendizaje automático. Estos son los 8 consejos de iluminación más importantes que he aprendido en el campo.
A lo largo de esta publicación, asumimos un conocimiento práctico general de Spark y su estructura, pero esta publicación debe ser accesible para todos los niveles.
¡Entremos!
Rápido, veamos qué hace la chispa…
Spark es un motor de procesamiento de big data. Toma python/java/scala/R/SQL y convierte ese código en un conjunto de transformación altamente optimizado.
En el nivel más bajo, las chispas crean tareas, que son: transformaciones paralelas en particiones de datos. Luego, estas tareas se distribuyen desde el nodo controlador a los nodos trabajadores, que son responsables de utilizar sus núcleos de CPU para completar las transformaciones. Al distribuir tareas entre múltiples trabajadores potenciales, Spark nos permite escalar horizontalmente y así admitir canales de datos complejos que serían imposibles en una sola máquina.
Ok, espero que no sea toda información nueva. De todos modos, en los siguientes apartados bajaremos un poco el ritmo. Estos consejos deberían ayudar tanto a los principiantes como a los intermedios.
Spark es complicado. Para ayudarlo a usted y potencialmente a otros a comprender su estructura, usemos una analogía interesante tomada de la teoría de clasificación: Spark es una tienda.
Cuando piensas en el componente de generación distribuida de una bujía, hay tres componentes principales….
- Divisiones de datos: subconjuntos de nuestros conjuntos de datos. En nuestra tienda están una comida
- Funciones de chispa: Las transformaciones de bajo nivel se realizan en una partición de datos. En nuestra tienda están clientes.
- núcleos: parte de sus procesos ejecutándose en paralelo. En nuestra tienda están cajeros.
¡Eso es todo!
Ahora, usemos estos conceptos para hablar sobre algunos conceptos básicos de Spark.
Como se muestra en la Figura 3, nuestros cajeros (principales) solo pueden atender a un cliente (servicio) a la vez. Además, algunos clientes tienen ventas múltiples (número de líneas de distribución), como lo muestra el primer cliente en el cuadro 2. A partir de estas simples observaciones…
- Cuantos más cajeros (núcleo), más clientes (trabajos) podrá procesar en paralelo. Esta es la escala horizontal/vertical.
- Si no tiene suficientes clientes (servicios) para llenar sus arcas, tendrá que pagar el acuerdo. Esto depende del tamaño de la unidad, del tamaño del grupo y del tamaño de la división.
- Si el cliente (servicio) tiene una cantidad muy diferente (números de la línea de distribución), verás un uso desigual de tu dinero. Esto es datos de forma.
- Cuanto mejores sean sus fondos (raíces), más rápido podrá atender a un solo cliente (servicio). Esto depende de la actualización de su proceso.
- etc.
Dado que la analogía proviene de la teoría de secuencias, un campo directamente relacionado con la computación distribuida, ¡es muy poderosa!
Utilice esta analogía para cometer errores, comunicarse y desarrollar chispas.
El error más común entre los novatos en flash es la percepción de una evaluación perezosa.
La evaluación diferida significa que no se realizan conversiones de datos hasta que se almacena una matriz en la memoria. Ejemplos de métodos que incluyen una colección incluyen, entre otros:
- .collect(): recupera el DataFrame como una lista de Python.
- .show(): imprimir primero
n
filas de su DataFrame. - .count(): obtiene el número de filas en su DataFrame.
- .first(): obtiene la primera fila de su DataFrame.
El método de error de cálculo más común es la explotación. .count()
a lo largo de un programa. Cada vez que llames a una combinación, se volverán a calcular todas las conversiones anteriores, por lo que si tienes 5 llamadas .count()
su programa se ejecutará asintóticamente aproximadamente 5 veces.
¡Spark tiene una calificación perezosa! Las tuberías deben tener un flujo único desde el(los) origen(es) hasta el(los) destino(s).
Un problema sorprendentemente común que surge cuando se trabaja con grandes organizaciones es que pierden de vista el panorama general y, por lo tanto, optimizan los procesos de manera imperfecta.
Así es como se deben optimizar las canalizaciones para la mayoría de los casos de uso…
- Preguntar si necesitamos hacer el proyecto. En pocas palabras, piense en lo que realmente obtiene al optimizar una canalización. Si espera mejorar el tiempo de actividad en un 20% y el proceso cuesta $100 por ejecución, ¿debería invertir su costoso salario de ingeniero de datos para ahorrar $20 por ejecución? Tal vez. Tal vez no.
- Busque la fruta madura en el código. Después de aceptar realizar el proyecto, verifique si el código tiene errores obvios. Algunos ejemplos son el mal uso de la evaluación diferida, las transiciones innecesarias y el orden incorrecto de las transiciones.
- Obtenga trabajo bajo SLA usando contabilidad. Después de comprobar que el código funciona correctamente, simplemente elimine la cuenta problemática para 1) cumplir con el SLA y 2) recopilar estadísticas de la interfaz de usuario de Spark.
- detener Si alimenta su cuenta con regularidad y el costo no es prohibitivo, realice algunas mejoras contables de último momento y luego deténgase. Tu tiempo es valioso. No lo desperdicie ahorrando dólares cuando puede ganar miles de dólares en otros lugares.
- Sumérgete profundamente. Finalmente, si realmente necesita profundizar porque el costo es inaceptable, entonces levante los pies y haga los datos, el código y los cálculos.
La belleza de este marco es que los pasos 1 a 4 requieren sólo un destello de conocimiento y son muy rápidos de ejecutar; A veces puedes recopilar información sobre los pasos 1 a 4 durante una llamada telefónica de 30 minutos. El marco también confirma que pararemos pronto. suficientemente bueno. Finalmente, si el paso 5 es necesario, podemos decírselo a aquellas personas del equipo que son más fuertes en la chispa.
Al encontrar todas las formas de evitar optimizar excesivamente una canalización, ahorra valiosas horas de desarrollador.
Las pérdidas de disco son la razón más común por la que los trabajos flash se ejecutan lentamente.
Es un concepto muy simple. Spark está diseñado para utilizar procesamiento en memoria. Si se queda sin memoria, Spark intentará escribir datos adicionales en el disco para evitar que su procesador falle. Esto se llama hernia de disco.
La escritura y lectura desde el disco es lenta, por lo que se debe evitar. Si desea aprender cómo identificar y reducir fugas, siga este tutorial. Sin embargo, algunos métodos muy comunes y simples para reducir la muda son…
- Maneje menos datos por tarea, lo que se puede lograr cambiando el recuento de particiones mediante spark.shuffle.partitions o configuración.
- Aumente la cantidad de RAM básica en su cuenta.
Si quieres que tu negocio funcione de forma óptima, evita los derrames.
Ya sea que use Scala, Java, Python, SQL o R, Spark siempre usará las mismas transformaciones bajo el capó. Por lo tanto, utilice el lenguaje adecuado para su trabajo.
¡SQL es el menos «lenguaje» para muchas operaciones entre todos los lenguajes Spark compatibles! Más específicamente:
- Si está agregando o cambiando una columna, use selectExpr o expr, especialmente en comparación con las cadenas f de Python.
- Si necesita SQL complejo, cree vistas temporales y luego spark.sql().
Aquí hay dos ejemplos rápidos…
# Column rename and cast with SQL
df = df.selectExpr((f"{c}::int as {c}_abc" for c in df.columns))# Column rename and cast with native spark
for c in df.columns:
df = df.withColumn(f"{c}_abc", F.col(c).cast("int")).drop(c)
# Window functions with SQL
df.withColumn("running_total", expr(
"sum(value) over (order by id rows between unbounded preceding and current row)"
))# Window functions with native spark
windowSpec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total_native = df.withColumn("running_total", F.sum("value").over(windowSpec))
Utilice SQL.
¿Necesita leer un montón de archivos de datos almacenados en un directorio complejo? Si es así, utilice las potentes opciones de lectura de Spark.
La primera vez que encontré este problema, reescribí os.walk para que funcionara con mi proveedor de nube donde se almacenaban los datos. Con mucho orgullo le mostré este método a mi compañero de proyecto, quien simplemente dijo: «déjame compartir mi pantalla» y comenzó a presentarme los filtros de globo.
# Read all parquet files in the directory (and subdirectories)
df = spark.read.load(
"examples/src/main/resources/dir1",
format="parquet",
pathGlobFilter="*.parquet"
)
Cuando apliqué el filtro global que se muestra arriba en lugar de mi os.walk predeterminado, la operación de recuperación fue 10 veces más rápida.
Spark tiene parámetros poderosos. Compruebe si la funcionalidad que desea está disponible antes de crear implementaciones personalizadas.
Los bucles casi siempre perjudican el rendimiento del rayo. Este es el por qué…
Spark tiene dos fases principales: planificación y ejecución. En la fase de planificación, una chispa crea un gráfico acíclico dirigido (DAG) que muestra cómo se realizarán las transformaciones especificadas. La etapa de planificación es relativamente costosa y a veces puede tardar unos segundos, por lo que conviene realizarla lo antes posible.
Analicemos un caso de uso en el que necesita iterar a través de múltiples DataFrames, realizar transformaciones costosas y luego agregarlas a una tabla.
En primer lugar, casi todos los casos de uso reutilizables, especialmente las UDF, las funciones de ventana y las uniones de Pandas, tienen soporte nativo. Pero, si realmente necesita un bucle, así es como puede invocar una fase de plan y así obtener todas las transformaciones en un DAG.
import functools
from pyspark.sql import DataFramepaths = get_file_paths()
# BAD: For loop
for path in paths:
df = spark.read.load(path)
df = fancy_transformations(df)
df.write.mode("append").saveAsTable("xyz")
# GOOD: functools.reduce
lazily_evaluated_reads = (spark.read.load(path) for path in paths)
lazily_evaluted_transforms = (fancy_transformations(df) for df in lazily_evaluated_reads)
unioned_df = functools.reduce(DataFrame.union, lazily_evaluted_transforms)
unioned_df.write.mode("append").saveAsTable("xyz")
La primera solución utiliza un bucle for para iterar sobre las rutas, realizar transformaciones sofisticadas y luego agregarlas a nuestra tabla delta de interés. En el segundo, almacenamos una lista de DataFrames evaluados de forma diferida, les aplicamos transformaciones, luego los reducimos en una unidad, implementamos un único plan de chispa y escribimos.
De hecho, podemos ver la diferencia en la arquitectura en el backend a través de la interfaz de usuario de Spark…
En la Figura 5, el DAG en el lado izquierdo correspondiente a la onda for tendrá 10 fases. Sin embargo, DAG está en el lado derecho. functools.reduce
tendrá una sola fase y por lo tanto podrá procesarse más fácilmente en paralelo.
Por el simple uso de leer 400 tablas delta únicas y luego agregarlas a una tabla delta, este método fue 6 veces más rápido que una declaración for.
Sea creativo para crear una chispa DAG.
No se trata de exageraciones.
Spark es un software bien establecido y, por tanto, bien documentado. Los LLM, especialmente GPT-4, son realmente buenos para sintetizar información compleja en explicaciones digeribles y concisas. Desde el lanzamiento de GPT-4, no he realizado ningún proyecto flash complejo que dependiera en gran medida de GPT-4.
Sin embargo, la revelación (con suerte) obvia es que tenga cuidado con los LLM. Todo lo que envíe a un modelo de código cerrado podría ser datos de entrenamiento para la organización matriz; asegúrese de no enviar nada confidencial. Además, verifique que la salida de GPT sea legítima.
Cuando se utiliza correctamente, LLM transforma la chispa del aprendizaje y el desarrollo. Cuesta $20/mes.