viernes, 29 de abril de 2022

Dataflow Functions o NiFi como una función

Uno de los anuncios más interesantes en el mundo NiFi ha sido el de Cloudera teniendo NiFi como una función. Es decir, poder subir un flujo y que ese se ejecute bajo demanda, permitiéndole ejecutar flujos de NiFi en clústeres de Kubernetes (K8S).

Diagrama


Esta opción es más rentable, se escala de manera eficiente (hacia arriba y hacia abajo), y proporciona aislamiento de recursos. Esto último por lo general, puede ser un desafío en un clúster multitentant con "vecinos ruidosos" (entendido como aquellas cargas que se llevan todos los recursos cada vez que corren).

Funciones como servicio

Las Funciones como servicio (FaaS) es una categoría de servicios en la nube que ofrecen todos los principales proveedores nube (AWS Lambda, Azure Functions, Google Cloud Functions, etc.).

Estos servicios permiten a los clientes ejecutar microaplicaciones que se activan bajo eventos específicos sin la complejidad de construir y mantener la arquitectura asociada.

Por serverless se entiendeque los recursos se aprovisionan y mantienen unicamente cuando la aplicación procesa los datos. De esta manera, no se necesita ningún recurso en constante funcionamiento. Esta es la forma más rentable de ejecutar aplicaciones que solo necesitan ejecutarse después de algunos desencadenantes.

Las funciones como servicio también proporcionan un escalado prácticamente ilimitado y puede ser una muy buena opción para manejar casos de uso que se ejecutan bajo eventos. 

Dashboard

 

DataFlow Functions

DataFlow Functions habilita Apache NiFi como la primera interfaz de usuario sin código para crear y ejecutar funciones de manera muy eficiente. DataFlow Functions, con tecnología de Apache NiFi, es la opción más eficiente para ejecutar flujos controlados por eventos para una amplia gama de casos de uso comunes.

Algunos de los posibles casos de uso incluyen:

  • Procesamiento en tiempo real de archivos mientras estos aterrizan en un almacén de objetos
  • Integración de servicios de terceros y API para exponer microservicios
  • Procesar flujos de datos, para IoT, ciberseguridad, detección de fraude y más.
  • Integrar con backends móviles

 



jueves, 2 de abril de 2020

Agrandar el disco a una instancia en nube

Hacer esto en las nubes es muy sencillo, dependiendo el cloud, podemos encontrar el disco que nos quedó chico en espacio, modificarlo y luego grabar los cambios. Una vez confirmado, tenemos que volver a nuestro Linux, y hacer los cambios que ahora "fisicamente" tenemos disponible.
Para ello, una vez en Linux, con credenciales como root, hacemos la siguiente receta:

domingo, 22 de marzo de 2020

¿Como optimizar el consumo de licenciamiento de Splunk con NiFi?

Una de los mejores casos de uso con NiFi es ahorrar dinero en licenciamiento con Splunk. Splunk es una plataforma de inteligencia operacional para datos de máquina, en criollo, nos permite visualizar que pasa con datos de eventos y logs de maquinas y sistemas. Pero su licenciamiento es basado en Gb/día, lo cual, es fácilmente superable con los crecimientos normales dentro de una organización que continuamente agrega equipos y software. ¿Pero como ayuda NiFi en este caso? La idea es simple, es dejar que NiFi colecte los datos de los sistemas, los filtre y envie realmente lo necesario a Splunk. Ahorrando cientos de bytes por envio de información que no es de valor. Es decir, si analizamos un Log normal de un servidor Apache, por ejemplo, posiblemente nos interese de cientos de lineas por segundo, apenas unas pocas. En el ejemplo de este artículo, lo hacemos con aquellas lineas que invocan al carrito de compras del sitio.

Arquitectura Conceptual

También está la opción de tomar el crudo de los archivos y hacer Archiving en otro sistema, como el lago de datos de Cloudera u otro en Nube.
Para instalar Splunk en un nodo de ejemplo podemos ejecutar los siguientes comandos: 

En los nodos de NiFi debemos instalar un simulador en Python que nos generan las líneas de ejemplo de Apache:

En NiFi vamos a llamar a este script de Python por medio de un ExecuteProcess. La salida de este nos dejará muchos flowfiles con muchas lineas de logs en su contenido. Por lo que posterior vamos a utilizar un SplitText para tener una linea de log de Apache por flowfile.


Esta es la configuración del ExecuteProcess que llama al código Python:


La salida del script nos devuelve 100 lineas de log.


Y utilizamos un SplitText con un Count = 1 para separarlo línea a línea.


Dentro del procesamiento, RouteOnContent nos permite hacer el filtrado, es decir, vamos a tomar solo las líneas que contienen una llamada al carrito, es decir, que contengan /apps/cart.
El flujo del ProcessGroup quedaría como muestra la siguiente imagen:


RouteContent vamos a configurar una salida llamada Cart que solo tome aquellas líneas que contengan la ruta del carrito de compras:

Y por último, vamos a enviar la información a Splunk. En este caso, en Splunk configuramos el puerto 8080 como un puerto de entrada de datos.


Para más información: https://docs.splunk.com/Documentation/Splunk/latest/Data/Monitornetworkports
El máximo de bytes de salida por el Socket, debemos configurarlo como el estándart para un Linux de 64 bits, en este caso, 212992 bytes.

 Finalmente, el flujo final en NiFi nos va a quedar así:


 Y el resultado que vamos a visualizar en Splunk, será este:


sábado, 21 de marzo de 2020

Como extraer datos de Oracle con NiFi

La mejor práctica en NiFi indica que debemos generar para cada flujo con un caso de uso, un Process Group dedicado.
De manera que en el Lienzo principal solo veamos las cajas de los Process Group y quizás los puertos de entrada de ellos si tenemos ingesta por parte de MiniFi u otros Clústers de NiFi.
Una vez que creo mi Process Group y estoy dentro, cuando configuro Servicios de Control, su alcance es solo este Process Group. Es decir, todos los Processor que estan dentro de este grupo, pueden ver estos servicios, como por ejemplo, una conexión a una base de datos.
Para hacer esto, sobre el lienzo, llamo al menú contextual (botón secundario de pad o mouse) y aparece este menú con la opción de configurar.

 


Una vez en esta parte, vamos a configurar una conexión a una base de datos. En nuestro ejemplo, será contra una base de datos Oracle. 


Para ello, seleccionamos un DBCPConnectionPool.



En este componente, vamos a tener que considerar hacer algunos temas adicionales en forma manual en cada uno de los nodos del Cluster. Lo primero, es bajar desde el sitio de Oracle el Driver JDBC para la versión de base de datos requerida, y para el JDK que tengamos usando con NiFi, en mi caso, es la versión 8.
Por lo tanto, vamos a bajar e instalar en la misma locación con permisos de lectura para el usuario de NiFi en Linux el driver. En mi caso, he creado en cada equipo el path /opt/nifi/driver y le cambié al usuario NiFi como owner del directorio nifi.
Allí copié el driver ojdbc8.jar.



Luego, como punto importante, es configurar la clase del driver que debe ejecutarse para la conexión, esto es fácil de encontrar en cada documentación de cada driver que usemos. En nuestro caso con Oracle es oracle.jdbc.OracleDriver.
El URL usado para conectar a la base de datos fue: 

jdbc:oracle:thin:@<URL_SERVER_DB>:1521:<ORACLE_SID>

Por último debemos agregar el usuario y password de la base de datos para poder concretar la conexión a dicha base de datos.
Una vez terminado este paso, ya podemos sobre el lienzo de nuestro proyecto comenzar a delinear el flujo, en este caso, el que vamos a usar es el ExecuteSQL.

 

Este procesador, va a utilizar la conexión ya configurada, y luego vamos a completar el SQL que queremos configurar. Un detalle más que podemos configurar es poner que la cantidad Máxima de filas que devuelva por Flowfile en cada AVRO de salida, sea 1. De esta manera podemos tener solo un Row por FlowFile, en caso de que se requiera procesar fila a file. En otro caso, podemos poner muchos más y hacer un procesamiento masivo por cada flowfile. Esto es muy dependiente de nuestro caso de uso y también la experiencia de desarrollar flujos sobre NiFi.

 

Un detalle es que cada procesor en NiFi por defecto ejecuta en todos los nodos del cluster. Para las conexiones a Base de datos, API y otras plataformas, se recomienda como mejor práctica que solo un nodo sea el que acceda. Este nodo es el denominado "Primario". Este nodo es seleccionado mediante Zookeeper y puede cambiar en cualquier instante, ya sea por temas de tráfico o porque el nodo no responde. Es por ello, que no sabemos que nodo se trata puntualmente. Una vez que cambiamos la ejecución de este procesador en solo el nodo primario, este hará la conexión y luego, debe "distribuir" los datos al resto de los nodos.



Esta distribución de los datos al resto de los nodos, sea hace balanceando la carga en la cola de salida del procesador hacia el siguiente procesor en caso de éxito.

 

La selección que debemos hacer, es en la estrategia de balanceo en "Load Balance Strategy", debemos seleccionar "Round Robin" que es una distribución básica a cada nodo del resto del clúster.

La salida de los datos, que vamos a observar va a ser en formato AVRO.
Este formato es un JSON comprimido y con el esquema de lectura del JSON como parte de la carga en los metadatos. Por ejemplo, en el siguiente ejemplo, podemos ver la definición que trae.  



Si cambiamos la vista en NiFi de "Original" a "Formated", nos presentará el dato formateado con la lectura del esquema.
 

¿Como mover datos de tablas con NiFi?

En este post, vamos a realizar el movimiento de datos entre dos tablas de una base de datos Oracle. Pueden estar en la misma instancia o en diferentes, pero la idea es mostrar como hacerlo con NiFi.
Nuestras tablas de ejemplo. Como puede observarse, la tabla V1, tiene un campo menos que la tabla V2. En nuestro ejemplo, vamos a hacerlo, cuando el campo acepta nulos, y cuando el campo, como en este caso, not acepta nulos.

 


 Datos de ejemplo en la tabla V1:

 


¿Como hacer para insertar si la tabla destino tiene mas o menos campos, pero estos aceptan Nulos?

Este caso es el más simple, donde simplemente configuramos el PutDatabaseRecord para que ignore si la tabla destino no tiene todas las columnas, con lo cual, el valor quedará en nulo.



El AVROReader en este caso es simple, dado que la definición del esquema de los datos viene en el mismo AVRO, pero si tuvieramos un caso donde queremos ahorrar en el footprint de los metadatos del AVRO, podemos usar Schema Registry. Este es un repositorio central de esquemas, con versionamiento y que nos ayuda a tener centralizado el esquema de los datos.


 

El resultado final del flujo será algo parecido a esto, donde tenemos un ExecuteSQL que toma los datos de la tabla fuente, y el resultado es una serie de flowfiles en formato AVRO. Estos AVRO los tomamos con el PutDatabaseRecord y los interpretamos con su esquema embebido en el mismo AVRO, y llevado como INSERT a la tabla destino. Como configuramos que ignore si no tiene todas los campos necesarios, entonces, insertará los que tiene y los demás los ignora. Si la tabla acepta NULLs, esto resultará en un INSERT exitoso.



¿Como hacer para insertar si la tabla destino tiene mas o menos campos, pero estos no aceptan Nulos?


En este caso, debemos ejecutar una variante que es la conversión del AVRO. Por ejemplo, en nuestro es un campo EMAIL que no acepta nulos, por lo que vamos a agregar este processor en el medio del ExecuteSQL y PutDatabaseRecord para transformar el esquema y el de salida sea el que use para insertar en la tabla destino.

Error que marca al intentar Insertar

Agregamos un procesador de tipo ConvertAVROSchema

 
  
Este procesador nos pide un esquema de entrada y otro de salida. También podemos agregar reglas dinámicas que nos permiten traducir desde el esquema fuente al destino por ejemplo, nombre de campos que cambian.
 


Nuestro esquema de salida quedaría marcado por una propiedad de "Default" para el campo EMAIL. En nuestro caso pusimos que tuviera el valor "NOT SET".

{
   "type":"record",
   "name":"NiFi_ExecuteSQL_Record",
   "namespace":"any.data",
   "fields":[
      {
         "name":"EMPLOYEE_ID",
         "type":[
            "null",
            {
               "type":"bytes",
               "logicalType":"decimal",
               "precision":6,
               "scale":0
            }
         ]
      },
      {
         "name":"FIRST_NAME",
         "type":[
            "null",
            "string"
         ]
      },
      {
         "name":"LAST_NAME",
         "type":[
            "null",
            "string"
         ]
      },
      {
         "name":"SALARY",
         "type":[
            "null",
            {
               "type":"bytes",
               "logicalType":"decimal",
               "precision":8,
               "scale":2
            }
         ]
      },
      {
         "name":"EMAIL",
         "type":[
            "string"
         ],
         "default":"NOT SET"
      }
   ]
}




Finalmente el flujo estaría de esta manera:



Y el dato de salida hacia la tabla tendría este formato:


Y la tabla finalmente con los datos se vería así:

 


viernes, 20 de marzo de 2020

¿Cómo habilitar que se muestren los productores de datos en Cloudera SMM?

SMM (Cloudera Streams Messaging Manager) es una solución para monitorear y operar una plataforma de Apache Kafka. Esta consola es parte del paquete de Cloudera para Streaming Processing.


Si uno instala Kafka sin modificar las opciones por defecto al producto y luego instala SMM, y lo comienza a usar va a notar rapidamente que SMM no tiene datos de quienes son los productores que están poniendo datos en los tópicos de Kafka.



Esto se debe a que se debe habiltar la colección de estadísticas de Kafka para los productores. Esto lo debemos habilitar desde Cloudera Manager, dentro de la configuración de Kafka.




jueves, 20 de febrero de 2020

¿Como se agreganda la memoria de Java en NiFi?

Por defecto, el seteo que trae es apena de 512 mb de RAM. En Cluster pequeños usamos al menos 3 equipos con 16GB de RAM. Por lo que, expandir este uso es necesario.
Esto se puede hacer desde Cloudera Manager o desde Ambari, dependiendo del sabor de Clúster de Cloudera que tengamos instalado NiFi o Cloudera Flow Management (nombre comercial de NiFi en Cloudera).


Estos valores son establecidos en el archivo de bootstrap de Nifi.

Para modificarlo desde Ambari:
Ambari UI --> Nifi --> Configs --> Advanced --> "Advanced nifi-bootstrap-env" --> "Template for bootstrap.conf"

Para modificarlo desde Cloudera Manager:
 Cloudera Manager --> Nifi --> Configuration --> Advanced --> "NiFi Node Advanced Configuration Snippet (Safety Valve) for staging/bootstrap.conf.xml"

¿Cómo establecer los valores?
Hay que tener en cuenta como hacerlo, tenemos dos valores, el de la memoria inicial, y el máximo de memoria.
# JVM memory settings
java.arg.2=-Xms{{nifi_initial_mem}}
java.arg.3=-Xmx{{nifi_max_mem}}
Para nuestro ejemplo de 16GB, podemos usar como mínimo 512MB y como máximo 12GB de RAM.
# JVM memory settings
java.arg.2=-Xms512MB
java.arg.3=-Xmx12G 

En Cloudera Manager, quedaría así:


Requiere que se reinicie el clúster de Nifi / CFM para tomar los nuevos cambios.



domingo, 26 de enero de 2020

¿Como correr Spark en un cluster Hortonworks?

Spark tiene varias modalidad de correr distribuido, una de ellas es sobre YARN de Hadoop. Cuando lo corremos al script o usamos el Spark Shell, debemos tener en cuenta que nuestro usuario en Linux debe tener algunas variable de entorno establecidas, o de lo contrario, se van a generar muchos errores y advertencias extrañas. Estos son: JAVA_HOME, HADOOP_CONF_DIR y SPARK_HOME.

Para ello, podemos usar estas configuraciones en Hortonworks para configurarlas correctamente

export JAVA_HOME=$(grep 'java.home' /etc/ambari-server/conf/ambari.properties | sed -n -e 's/^.*java.home=//p')
export HADOOP_CONF_DIR=/etc/hadoop/conf/
export SPARK_HOME=/usr/hdp/current/spark2-client/

Luego, podemos llamar a nuestro Spark-Shell, como
spark-shell --master yarn --driver-memory 512m --executor-memory 512m 

El indicador, que todo correrá sobre YARN es esta línea:

Spark context available as 'sc' (master = yarn, app id = application_XXX)

martes, 5 de noviembre de 2019

¿Como monitorear NiFi con Graphite?

Si bien NiFi o a través de Cloudera Manager tienes la oportunidad de monitorear el servicio, no siempre es suficiente en una organización. Una forma fácil de hacerlo, es a través del uso de Graphite y la parte visual puede ser a través de Grafana.

¿Cómo instalar Graphite en Linux?
La instalación de esta herramienta es bastante sencillo. Abajo una receta simple para instalar sobre CentOS 7.5 sobre el puerto 80.


Habilitando el envio de estadisticas desde NiFi a Graphite

Lo primero que tenemos que agregar, es un servicio de GraphiteMetricReporterService en Controller Settings. Para entrar en Controller Settings tenemos que ir al menu de la hamburguesa en NiFi (Derecha arriba del menu).


Una vez alli, con el botón del signo +, agregamos nuestro servicio de GraphiteMetricReporterService

Para configurar este servicio, necesitamos el nombre del host donde estan corriendo Graphite y Carbon. El port default de Carbon, que es el proceso que escucha por las métricas para ser enviadas a Graphite, corre en el puerto 2003. Otra configuración importante, puede ser el prefijo que queremos dentro de Graphite. Las metricas se arman en formato de arbol, como una URL en Internet, separada por puntos. Entonces, este prefijo es la raíz de nuestro envio. En nuestro caso lo mantenemos simple, como NiFi.


Una vez que tenemos el controlador del servicio, ahora necesitamos configurar en el tab del Report Task. 


En nuestro caso, vamos a agregar la tarea de MetricReportingTask para enviar todas las estadísticas de la JVM (Java Virtual Machine) y las estadisticas de la Instancia de NiFi. 


Luego de configurar estos pasos, solo es habilitar los servicios y task para empezar a enviar las estadisticas. En nuestro Graphite, deberiamos ver algo parecido a esto en la consola.