Mostrando las entradas con la etiqueta NiFi. Mostrar todas las entradas
Mostrando las entradas con la etiqueta NiFi. Mostrar todas las entradas

viernes, 29 de abril de 2022

Dataflow Functions o NiFi como una función

Uno de los anuncios más interesantes en el mundo NiFi ha sido el de Cloudera teniendo NiFi como una función. Es decir, poder subir un flujo y que ese se ejecute bajo demanda, permitiéndole ejecutar flujos de NiFi en clústeres de Kubernetes (K8S).

Diagrama


Esta opción es más rentable, se escala de manera eficiente (hacia arriba y hacia abajo), y proporciona aislamiento de recursos. Esto último por lo general, puede ser un desafío en un clúster multitentant con "vecinos ruidosos" (entendido como aquellas cargas que se llevan todos los recursos cada vez que corren).

Funciones como servicio

Las Funciones como servicio (FaaS) es una categoría de servicios en la nube que ofrecen todos los principales proveedores nube (AWS Lambda, Azure Functions, Google Cloud Functions, etc.).

Estos servicios permiten a los clientes ejecutar microaplicaciones que se activan bajo eventos específicos sin la complejidad de construir y mantener la arquitectura asociada.

Por serverless se entiendeque los recursos se aprovisionan y mantienen unicamente cuando la aplicación procesa los datos. De esta manera, no se necesita ningún recurso en constante funcionamiento. Esta es la forma más rentable de ejecutar aplicaciones que solo necesitan ejecutarse después de algunos desencadenantes.

Las funciones como servicio también proporcionan un escalado prácticamente ilimitado y puede ser una muy buena opción para manejar casos de uso que se ejecutan bajo eventos. 

Dashboard

 

DataFlow Functions

DataFlow Functions habilita Apache NiFi como la primera interfaz de usuario sin código para crear y ejecutar funciones de manera muy eficiente. DataFlow Functions, con tecnología de Apache NiFi, es la opción más eficiente para ejecutar flujos controlados por eventos para una amplia gama de casos de uso comunes.

Algunos de los posibles casos de uso incluyen:

  • Procesamiento en tiempo real de archivos mientras estos aterrizan en un almacén de objetos
  • Integración de servicios de terceros y API para exponer microservicios
  • Procesar flujos de datos, para IoT, ciberseguridad, detección de fraude y más.
  • Integrar con backends móviles

 



domingo, 22 de marzo de 2020

¿Como optimizar el consumo de licenciamiento de Splunk con NiFi?

Una de los mejores casos de uso con NiFi es ahorrar dinero en licenciamiento con Splunk. Splunk es una plataforma de inteligencia operacional para datos de máquina, en criollo, nos permite visualizar que pasa con datos de eventos y logs de maquinas y sistemas. Pero su licenciamiento es basado en Gb/día, lo cual, es fácilmente superable con los crecimientos normales dentro de una organización que continuamente agrega equipos y software. ¿Pero como ayuda NiFi en este caso? La idea es simple, es dejar que NiFi colecte los datos de los sistemas, los filtre y envie realmente lo necesario a Splunk. Ahorrando cientos de bytes por envio de información que no es de valor. Es decir, si analizamos un Log normal de un servidor Apache, por ejemplo, posiblemente nos interese de cientos de lineas por segundo, apenas unas pocas. En el ejemplo de este artículo, lo hacemos con aquellas lineas que invocan al carrito de compras del sitio.

Arquitectura Conceptual

También está la opción de tomar el crudo de los archivos y hacer Archiving en otro sistema, como el lago de datos de Cloudera u otro en Nube.
Para instalar Splunk en un nodo de ejemplo podemos ejecutar los siguientes comandos: 

En los nodos de NiFi debemos instalar un simulador en Python que nos generan las líneas de ejemplo de Apache:

En NiFi vamos a llamar a este script de Python por medio de un ExecuteProcess. La salida de este nos dejará muchos flowfiles con muchas lineas de logs en su contenido. Por lo que posterior vamos a utilizar un SplitText para tener una linea de log de Apache por flowfile.


Esta es la configuración del ExecuteProcess que llama al código Python:


La salida del script nos devuelve 100 lineas de log.


Y utilizamos un SplitText con un Count = 1 para separarlo línea a línea.


Dentro del procesamiento, RouteOnContent nos permite hacer el filtrado, es decir, vamos a tomar solo las líneas que contienen una llamada al carrito, es decir, que contengan /apps/cart.
El flujo del ProcessGroup quedaría como muestra la siguiente imagen:


RouteContent vamos a configurar una salida llamada Cart que solo tome aquellas líneas que contengan la ruta del carrito de compras:

Y por último, vamos a enviar la información a Splunk. En este caso, en Splunk configuramos el puerto 8080 como un puerto de entrada de datos.


Para más información: https://docs.splunk.com/Documentation/Splunk/latest/Data/Monitornetworkports
El máximo de bytes de salida por el Socket, debemos configurarlo como el estándart para un Linux de 64 bits, en este caso, 212992 bytes.

 Finalmente, el flujo final en NiFi nos va a quedar así:


 Y el resultado que vamos a visualizar en Splunk, será este:


sábado, 21 de marzo de 2020

¿Como mover datos de tablas con NiFi?

En este post, vamos a realizar el movimiento de datos entre dos tablas de una base de datos Oracle. Pueden estar en la misma instancia o en diferentes, pero la idea es mostrar como hacerlo con NiFi.
Nuestras tablas de ejemplo. Como puede observarse, la tabla V1, tiene un campo menos que la tabla V2. En nuestro ejemplo, vamos a hacerlo, cuando el campo acepta nulos, y cuando el campo, como en este caso, not acepta nulos.

 


 Datos de ejemplo en la tabla V1:

 


¿Como hacer para insertar si la tabla destino tiene mas o menos campos, pero estos aceptan Nulos?

Este caso es el más simple, donde simplemente configuramos el PutDatabaseRecord para que ignore si la tabla destino no tiene todas las columnas, con lo cual, el valor quedará en nulo.



El AVROReader en este caso es simple, dado que la definición del esquema de los datos viene en el mismo AVRO, pero si tuvieramos un caso donde queremos ahorrar en el footprint de los metadatos del AVRO, podemos usar Schema Registry. Este es un repositorio central de esquemas, con versionamiento y que nos ayuda a tener centralizado el esquema de los datos.


 

El resultado final del flujo será algo parecido a esto, donde tenemos un ExecuteSQL que toma los datos de la tabla fuente, y el resultado es una serie de flowfiles en formato AVRO. Estos AVRO los tomamos con el PutDatabaseRecord y los interpretamos con su esquema embebido en el mismo AVRO, y llevado como INSERT a la tabla destino. Como configuramos que ignore si no tiene todas los campos necesarios, entonces, insertará los que tiene y los demás los ignora. Si la tabla acepta NULLs, esto resultará en un INSERT exitoso.



¿Como hacer para insertar si la tabla destino tiene mas o menos campos, pero estos no aceptan Nulos?


En este caso, debemos ejecutar una variante que es la conversión del AVRO. Por ejemplo, en nuestro es un campo EMAIL que no acepta nulos, por lo que vamos a agregar este processor en el medio del ExecuteSQL y PutDatabaseRecord para transformar el esquema y el de salida sea el que use para insertar en la tabla destino.

Error que marca al intentar Insertar

Agregamos un procesador de tipo ConvertAVROSchema

 
  
Este procesador nos pide un esquema de entrada y otro de salida. También podemos agregar reglas dinámicas que nos permiten traducir desde el esquema fuente al destino por ejemplo, nombre de campos que cambian.
 


Nuestro esquema de salida quedaría marcado por una propiedad de "Default" para el campo EMAIL. En nuestro caso pusimos que tuviera el valor "NOT SET".

{
   "type":"record",
   "name":"NiFi_ExecuteSQL_Record",
   "namespace":"any.data",
   "fields":[
      {
         "name":"EMPLOYEE_ID",
         "type":[
            "null",
            {
               "type":"bytes",
               "logicalType":"decimal",
               "precision":6,
               "scale":0
            }
         ]
      },
      {
         "name":"FIRST_NAME",
         "type":[
            "null",
            "string"
         ]
      },
      {
         "name":"LAST_NAME",
         "type":[
            "null",
            "string"
         ]
      },
      {
         "name":"SALARY",
         "type":[
            "null",
            {
               "type":"bytes",
               "logicalType":"decimal",
               "precision":8,
               "scale":2
            }
         ]
      },
      {
         "name":"EMAIL",
         "type":[
            "string"
         ],
         "default":"NOT SET"
      }
   ]
}




Finalmente el flujo estaría de esta manera:



Y el dato de salida hacia la tabla tendría este formato:


Y la tabla finalmente con los datos se vería así:

 


jueves, 20 de febrero de 2020

¿Como se agreganda la memoria de Java en NiFi?

Por defecto, el seteo que trae es apena de 512 mb de RAM. En Cluster pequeños usamos al menos 3 equipos con 16GB de RAM. Por lo que, expandir este uso es necesario.
Esto se puede hacer desde Cloudera Manager o desde Ambari, dependiendo del sabor de Clúster de Cloudera que tengamos instalado NiFi o Cloudera Flow Management (nombre comercial de NiFi en Cloudera).


Estos valores son establecidos en el archivo de bootstrap de Nifi.

Para modificarlo desde Ambari:
Ambari UI --> Nifi --> Configs --> Advanced --> "Advanced nifi-bootstrap-env" --> "Template for bootstrap.conf"

Para modificarlo desde Cloudera Manager:
 Cloudera Manager --> Nifi --> Configuration --> Advanced --> "NiFi Node Advanced Configuration Snippet (Safety Valve) for staging/bootstrap.conf.xml"

¿Cómo establecer los valores?
Hay que tener en cuenta como hacerlo, tenemos dos valores, el de la memoria inicial, y el máximo de memoria.
# JVM memory settings
java.arg.2=-Xms{{nifi_initial_mem}}
java.arg.3=-Xmx{{nifi_max_mem}}
Para nuestro ejemplo de 16GB, podemos usar como mínimo 512MB y como máximo 12GB de RAM.
# JVM memory settings
java.arg.2=-Xms512MB
java.arg.3=-Xmx12G 

En Cloudera Manager, quedaría así:


Requiere que se reinicie el clúster de Nifi / CFM para tomar los nuevos cambios.



martes, 5 de noviembre de 2019

¿Como monitorear NiFi con Graphite?

Si bien NiFi o a través de Cloudera Manager tienes la oportunidad de monitorear el servicio, no siempre es suficiente en una organización. Una forma fácil de hacerlo, es a través del uso de Graphite y la parte visual puede ser a través de Grafana.

¿Cómo instalar Graphite en Linux?
La instalación de esta herramienta es bastante sencillo. Abajo una receta simple para instalar sobre CentOS 7.5 sobre el puerto 80.


Habilitando el envio de estadisticas desde NiFi a Graphite

Lo primero que tenemos que agregar, es un servicio de GraphiteMetricReporterService en Controller Settings. Para entrar en Controller Settings tenemos que ir al menu de la hamburguesa en NiFi (Derecha arriba del menu).


Una vez alli, con el botón del signo +, agregamos nuestro servicio de GraphiteMetricReporterService

Para configurar este servicio, necesitamos el nombre del host donde estan corriendo Graphite y Carbon. El port default de Carbon, que es el proceso que escucha por las métricas para ser enviadas a Graphite, corre en el puerto 2003. Otra configuración importante, puede ser el prefijo que queremos dentro de Graphite. Las metricas se arman en formato de arbol, como una URL en Internet, separada por puntos. Entonces, este prefijo es la raíz de nuestro envio. En nuestro caso lo mantenemos simple, como NiFi.


Una vez que tenemos el controlador del servicio, ahora necesitamos configurar en el tab del Report Task. 


En nuestro caso, vamos a agregar la tarea de MetricReportingTask para enviar todas las estadísticas de la JVM (Java Virtual Machine) y las estadisticas de la Instancia de NiFi. 


Luego de configurar estos pasos, solo es habilitar los servicios y task para empezar a enviar las estadisticas. En nuestro Graphite, deberiamos ver algo parecido a esto en la consola.


 

domingo, 13 de octubre de 2019

Extraer multiples direcciones IP de una linea de log

Una de los procesadores más usuales para usar es el ExtractText que nos permite extraer atributos del contenido por medio de expresiones regulares. En este ejemplo, vamos a analizar como hacer para obtener de una línea de Log que normalmente tienen varias direcciones IP, como extraerlas en un atributo con diferentes valores.
En nuestro ejemplo, tenemos una línea de log del estilo, donde aparecen tres direcciones IP: 10.180.1.3, 172.22.199.165 y 187.189.106.238.


La configuración que vamos a realizar con ExtractText, debe tener las siguiente configuración, solo he copiado los valores que debemos poner y cambiar a los que trae por defecto el procesador:


 El resultado que nos arrojará será el siguiente desde el punto de vista de atributos

La expresión regular, tiene la particularidad de un marcador inicial, final y el grupo de extracción:

Marcador inicio: (?<=)
Regex: ((?:[0-9]{1,3}\.){3}[0-9]{1,3})
Marcador final: (?=)
 
 

sábado, 12 de octubre de 2019

¿Donde encontrar a NiFi en un nodo con Flow Management y Cloudera Manager?

NiFi está instalado en forma distribuida con Cloudera Manager en los nodos donde este corre. Estos son los directorios más importantes:
  • /run/cloudera-scm-agent/process: Aquí están todos los archivos de configuración relacionados con la instancia que actualmente se encuentra corriendo bajo la administración de Cloudera Manager.
  • /var/lib/nifi: Aquí encontramos todos los repositorios por default de NiFi como content_repository, database_repository, flowfile_repository, provenance_repository y también el archivo de flujos flow.xml.gz.
  • /var/log/nifi: Aquí se encuentran los archivos de logs como nifi-app.log, nifi-bootstrap.log y nifi-user.log

Change data Capture con Apache NiFi y SQLServer 2017 SP1


¿Cómo hacer para que NiFi arranque sin correr los flujos en automático?

Para hacer que NiFi arranque con los flujos en modo parado, hay que cambiar una propiedad en el archivo de configuración nifi.properties. Esta propiedad es nifi.flowcontroller.autoResumeState=false. Por defecto, la instalación viene con esta propiedad habilitada. En Cloudera Manager esta propiedad no viene expuesta por defecto, al menos, hasta la versión 1.0.1.0 de Cloudera Flow Management. Para hacer el cambio, sencillamente, agregamos la propiedad y se replica en todos los nodos del clúster. Luego del cambio, hay que reiniciar el clúster.
En Cloudera Manager podemos agregar la propiedad en Advanced > Nifi Node Advanced Configuration Snippet (Safety Valve). Se agrega el Key/Value y luego grabamos la configuración.




jueves, 25 de julio de 2019

¿Como sacar un valor puntual de un XML?

En NiFi tenemos el procesador EvaluateXPath, el cual, nos permite extraer valor usando el formato de XPath. XPath es un "lenguaje" que permite a través de expresiones simples procesar elementos dentro de un archivo XML. Es un concepto muy parecido a las expresiones regulares. XPath permite buscar y seleccionar teniendo en cuenta la estructura jerárquica del XML con el uso del estándar XSLT.
Una forma de testear rápidamente una expresion XPath y nuestro XML es usando un Tester, en Internet hay varios, por ejemplo: https://codebeautify.org/Xpath-Tester.
Supongamos que tenemos este XML.

La expresión:

string(/CATALOG/CD/TITLE)

Nos retornará Empire Burlesque. 
Esto funciona sin problemas en Nifi. Podemos configurar como destino de nuestras extracciones a los atributos del Flowfile.
Esto es en un caso simple de XML, pero los XML de la vida real, normalmente tiene encabezados con XSLT, Namespaces, y demas detalles y complejidades. Lo cual, puede que no funcione tan fácilmente porque va a necesitar del XSLT o del namespace.
Por ejemplo, si tenemos un XML complejo como el de la imagen siguiente:

Sobre este ejemplo, si quisieramos extraer el valor del beginTime dentro del tag de measCollect, tecnimcamente deberiamos hacer

string(/measCollectFile/fileHeader/measCollect/@beginTime)
 
Pero quizás no funcione, porque tenemos un namespace en este XML, por lo que, el truco aqui, es buscar el tag que necesitamos sin tener en cuenta el camino hacia el elemento.
Para ello, podemos hacer:

string(//*[local-name()='measCollect']/@beginTime)

Marcamos que no queremos validar el DTD del XML, y usamos como retorno de la expresion un String o Number, segun sepamos.

 
En el caso de la imagen, estamos buscando un elemento dentro del XML que se llama measCollect, sin importar en que lugar este dentro del XML, y nos vamos a quedar con el valor de un atributo de ese tag llamado beginTime y luego con un segundo, llamado endTime.
Si luego inspeccionamos los elementos de salida, deberiamos ver nuestras extracciones como atributos del FlowFile.





viernes, 12 de julio de 2019

¿Cómo crear un CSV de prueba y usarlo con NiFi?

Muchas veces requerimos hacer pruebas rápidas, y por lo tanto, generar datos lo más reales posibles para tener una prueba lo más real posible. Esto lo podemos lograr con Python gracias al paquete Faker. Pero aún asi, nos falta generar un archivo CSV. Existe un proyecto en Github que une ambos mundos y nos permite generar un CSV con datos de prueba muy reales a partir del paquete Faker.

¿Cómo lo realizamos?

En alguno de los nodos de NiFi ejecutamos en una terminal los siguientes comandos para instalar Python3.6, crear un directorio en uno de los nodos de CFM (Cloudera Flow Management), el Apache NiFi de Cloudera, para luego generar el bash que vamos a utilizar desde NiFi para generar CSV en automático.

Ahora, desde NiFi, vamos a crear un flujo que llame a nuestro script, en este caso, /opt/csvgen/csvgen.sh

Lo primero que vamos a requerir es un procesador de tipo ExecuteProcess. La idea es que su salida simplemente lo envie a un puerto de salida para futuro procesamiento.

Ahora, en el procesor, necesitamos establecer la llamada en Command a /opt/csvgen/csvgen.sh, y en el directorio de trabajo (working directory) en el directorio del script en /opt/csvgen



















 

 Referencias

  1. https://github.com/pereorga/csvfaker
  2. https://pypi.org/project/Faker/