domingo, 13 de octubre de 2019

Extraer multiples direcciones IP de una linea de log

Una de los procesadores más usuales para usar es el ExtractText que nos permite extraer atributos del contenido por medio de expresiones regulares. En este ejemplo, vamos a analizar como hacer para obtener de una línea de Log que normalmente tienen varias direcciones IP, como extraerlas en un atributo con diferentes valores.
En nuestro ejemplo, tenemos una línea de log del estilo, donde aparecen tres direcciones IP: 10.180.1.3, 172.22.199.165 y 187.189.106.238.


La configuración que vamos a realizar con ExtractText, debe tener las siguiente configuración, solo he copiado los valores que debemos poner y cambiar a los que trae por defecto el procesador:


 El resultado que nos arrojará será el siguiente desde el punto de vista de atributos

La expresión regular, tiene la particularidad de un marcador inicial, final y el grupo de extracción:

Marcador inicio: (?<=)
Regex: ((?:[0-9]{1,3}\.){3}[0-9]{1,3})
Marcador final: (?=)
 
 

sábado, 12 de octubre de 2019

¿Donde encontrar a NiFi en un nodo con Flow Management y Cloudera Manager?

NiFi está instalado en forma distribuida con Cloudera Manager en los nodos donde este corre. Estos son los directorios más importantes:
  • /run/cloudera-scm-agent/process: Aquí están todos los archivos de configuración relacionados con la instancia que actualmente se encuentra corriendo bajo la administración de Cloudera Manager.
  • /var/lib/nifi: Aquí encontramos todos los repositorios por default de NiFi como content_repository, database_repository, flowfile_repository, provenance_repository y también el archivo de flujos flow.xml.gz.
  • /var/log/nifi: Aquí se encuentran los archivos de logs como nifi-app.log, nifi-bootstrap.log y nifi-user.log

Change data Capture con Apache NiFi y SQLServer 2017 SP1


¿Cómo hacer para que NiFi arranque sin correr los flujos en automático?

Para hacer que NiFi arranque con los flujos en modo parado, hay que cambiar una propiedad en el archivo de configuración nifi.properties. Esta propiedad es nifi.flowcontroller.autoResumeState=false. Por defecto, la instalación viene con esta propiedad habilitada. En Cloudera Manager esta propiedad no viene expuesta por defecto, al menos, hasta la versión 1.0.1.0 de Cloudera Flow Management. Para hacer el cambio, sencillamente, agregamos la propiedad y se replica en todos los nodos del clúster. Luego del cambio, hay que reiniciar el clúster.
En Cloudera Manager podemos agregar la propiedad en Advanced > Nifi Node Advanced Configuration Snippet (Safety Valve). Se agrega el Key/Value y luego grabamos la configuración.




miércoles, 9 de octubre de 2019

¿Cómo consumir datos de una tabla CDC en SQL Server?

La consulta que ejecutamos una vez que tenemos el CDC en SQL Server habilitado es:


Esta consulta retorna columna en BLOB/Binario, lo cual, es difícil para convertir con NiFi.



La mejor opción encontrada, fue convertir directamente esa columnas, y toda la consulta ponerla a través de una vista, lo cual, facilita toda la conversión y NiFi hace el tratamiento como cualquier tabla.
Ejemplo de vista



¿Como consumir datos de una base de datos en NiFi?

Una vez que tengamos configurado un Servicio de Control, tenemos varios procesadores que nos permiten correr consultas contra una base de datos. Un caso típico es tener una consulta, y luego, solo consumir los registros que hayan cambiado o que sean nuevos. Para este caso, tenemos que tener en cuenta que debe existir un campo que nos ayude a discriminar esa lectura incremental, ya sea un timestamp, id o cualquier otro que permita hacer un ejercicio incremental de la información. Este procesador es el QueryDatabaseTable. Miremos un ejemplo:

 

Una vez configurado, su salida va a ser un registro AVRO con todos los rows dentro. Para hacer un tratamiento de los AVRO, hay muchos procesadores, uno muy usado, es el SplitAVRO, que nos permite tener un AVRO que contenga solo un Row, y por lo general, luego lo podemos convertir a JSON para trabajarlo en un formato de intercambio basado en Texto.


¿Como conectarnos a un SQLServer desde NiFi?

En otro articulo anterior, escribíamos sobre como habilitar el CDC en SQL Server. Una vez que tenemos hecho esto, la idea es poder consumir esos cambios en NiFi. Para poder hacerlo, lo primero es poder configurar una conexión a la base de datos. Para ello, debemos configurar nuestros nodos de NiFi con la conectividad hasta la base de datos. Esto es mediante JDBC. Para ello, debemos bajar a cada nodo el JAR correspondiente a SQL Server. En nuestro caso, simplemente buscamos el JAR llamado sqljdbc4-2.0.jar. Una buena práctica, es instalar y correr NiFi con su propio usuario, por lo tanto, cuando bajemos este JAR a los nodos, ponerlo en un path con permisos de lectura y ejecución para el usuario de NiFi. 
Luego, debemos averiguar, dado que para cada base de datos es diferente, la clase de Java correspondiente al drive, en el caso de SQL Server es com.microsoft.sqlserver.jdbc.SQLServerDriver. 
Por último, pero no menos importante, la cadena de conexión de JDBC. En SQL Server es del tipo jdbc:sqlserver://<SERVER_ADDRESS>;databaseName=<DATABASE_NAME>;

Un ejemplo configurado y funcionando se pone a continuación


Una vez finalizada esta configuración, podemos habilitar el Servicio, y ya tendremos nuestro Pool de Conexión a SQL Server listo para usarse.
 

martes, 8 de octubre de 2019

Habiltando el CDC para SQL Server

Esta es una receta para habilitar el CDC (Change Data Capture) de SQLServer 2017 SP1 sobre Ubunt 16.04. Las versiones de SQLServer que traen esta funcionalidad son las versiones Standart y Enterprise. No las traen las versiones gratuitas, solo las pagas de SQLServer.
En nuestro ejemplo, vamos a realizar sobre una versión Trial de Microsoft SQL Server 2017 SP1.

Algunos puntos importantes dado que este módulo de SQLServer no es muy amigable con los errores, y la documentación que se encuentra suele ser un poco dispersa. Para que nos funcione el CDC tenemos que verificar estos puntos importantes:
  1. La tabla debe contar con una clave primary (Primary Key)
  2. El SQLAgent debe estar instalado.
  3. Debemos habilitar el CDC para la base de datos y para la tabla que queremos rastrear sus cambios


Y esto es todo, es bastante sencillo, siempre y cuando se respeten los puntos anteriores, de lo contrario, genera errores muy genéricos y poco perceptibles de donde está el problema.

jueves, 25 de julio de 2019

¿Como sacar un valor puntual de un XML?

En NiFi tenemos el procesador EvaluateXPath, el cual, nos permite extraer valor usando el formato de XPath. XPath es un "lenguaje" que permite a través de expresiones simples procesar elementos dentro de un archivo XML. Es un concepto muy parecido a las expresiones regulares. XPath permite buscar y seleccionar teniendo en cuenta la estructura jerárquica del XML con el uso del estándar XSLT.
Una forma de testear rápidamente una expresion XPath y nuestro XML es usando un Tester, en Internet hay varios, por ejemplo: https://codebeautify.org/Xpath-Tester.
Supongamos que tenemos este XML.

La expresión:

string(/CATALOG/CD/TITLE)

Nos retornará Empire Burlesque. 
Esto funciona sin problemas en Nifi. Podemos configurar como destino de nuestras extracciones a los atributos del Flowfile.
Esto es en un caso simple de XML, pero los XML de la vida real, normalmente tiene encabezados con XSLT, Namespaces, y demas detalles y complejidades. Lo cual, puede que no funcione tan fácilmente porque va a necesitar del XSLT o del namespace.
Por ejemplo, si tenemos un XML complejo como el de la imagen siguiente:

Sobre este ejemplo, si quisieramos extraer el valor del beginTime dentro del tag de measCollect, tecnimcamente deberiamos hacer

string(/measCollectFile/fileHeader/measCollect/@beginTime)
 
Pero quizás no funcione, porque tenemos un namespace en este XML, por lo que, el truco aqui, es buscar el tag que necesitamos sin tener en cuenta el camino hacia el elemento.
Para ello, podemos hacer:

string(//*[local-name()='measCollect']/@beginTime)

Marcamos que no queremos validar el DTD del XML, y usamos como retorno de la expresion un String o Number, segun sepamos.

 
En el caso de la imagen, estamos buscando un elemento dentro del XML que se llama measCollect, sin importar en que lugar este dentro del XML, y nos vamos a quedar con el valor de un atributo de ese tag llamado beginTime y luego con un segundo, llamado endTime.
Si luego inspeccionamos los elementos de salida, deberiamos ver nuestras extracciones como atributos del FlowFile.