ApacheNiFi处理器
Apache NiFi 技术学习
处理器
ExecuteSQL
编写SQL语句执行查询,查询结果将被转换为 Avro 格式。
关键属性
- 数据库连接池服务
- 预查询SQL:用分号分隔的查询列表,在执行主SQL查询之前执行。如果没有错误,这些查询的执行结果不会被输出。
- 查询SQL:要执行的SQL。设置了此属性,则使用此SQL;不设置,则使用流中的SQL。
- 查询后的查询SQL:用分号分隔的SQL查询列表,在执行主SQL查询后执行。如果没有错误,这些查询的执行结果不会被输出。
- 最长等待时间:运行 SQL 选择查询所允许的最长时间,零表示没有限制。
- 规范化表/列名称:是否将表名、列名中可能存在的Avro格式不兼容的字符进行转换。
- 压缩格式:写入 Avro 文件时使用的压缩类型。
- 默认小数精度:默认十进制精度。
- 默认小数位数:默认十进制
- 每个流文件的最大行数:如果指定的值为零,则在单个流文件中返回所有行。
- 输出批量大小:当设置为零时,会话将在处理完所有结果集行并准备好将输出流文件传输到下游关系时提交。
- 设置自动提交:启用或禁用数据库连接的自动提交功能。
ConvertAvroToJSON
将 Binary Avro 记录转换为 JSON 对象。
关键属性
- JSON 容器选项:作为单个对象序列(即将每个对象写入新行),或作为对象数组(数组)。
- 包装单条记录:确定是否应将空记录或单个记录的结果输出包装在由“JSON 容器选项”指定的容器数组中
ConvertJSONToSQL
Json快速转化SQL处理器,将 JSON 格式的 FlowFile 转换为 UPDATE、INSERT 或 DELETE SQL 语句。
关键属性
- JDBC 连接池:指定用于将 JSON 消息转换为 SQL 语句的 JDBC 连接池。
- SQL语句类型:指定要生成的 SQL 语句的类型
- 表名:语句执行的表的名称
- 翻译字段名称:如果为true,处理器将JSON字段名转换为指定表的适当列名。如果为false,则JSON字段名必须与列名完全匹配,否则不会更新列名
- 不匹配字段后的行为:忽略、执行失败
- 更新主键:以逗号分隔的列名列表,用于唯一标识数据库中 UPDATE 语句的行。如果为 UPDATE语句 且未设置此属性,则使用表的主键。在这种情况下,如果不存在主键,如果 Unmatched Column Behavior 设置为 FAIL,则转换为 SQL 将失败。如果语句类型为 INSERT,则忽略这个属性
- 引用列标识符:启用此选项将导致所有列名都被引用,允许您使用保留字作为表中的列名。
- SQL 参数属性前缀:要附加到传出流文件属性的字符串,例如
.args.1.value,其中 替换为指定值 - 表Schema缓存大小:指定应该缓存多少表Schema
PutSQL
作为SQL DDL语句(INSERT,UPDATE或DELETE)执行 FlowFile的内容。
关键属性
- JDBC 连接池:指定用于将 JSON 消息转换为 SQL 语句的 JDBC 连接池。
- SQL语句类型:要执行的 SQL 语句。该语句可以是空的、常量值或使用表达式语言从属性构建的。如果指定了此属性,则无论传入流文件的内容如何,都将使用它。如果此属性为空,则传入流文件的内容应包含有效的 SQL 语句,由处理器发出到数据库。
- 支持碎片事物:如果为 true,则当此处理器使用 FlowFile 时,处理器将首先检查该 FlowFile 的 fragment.identifier 和 fragment.count 属性。如果 fragment.count 值大于 1,处理器将不会处理具有该 fragment.identifier 的任何 FlowFile,直到所有可用;那时,它将按照 FlowFiles 的 fragment.index 属性指定的顺序将所有带有该 fragment.identifier 的 FlowFiles 作为单个事务处理。这提供了这些 SQL 语句的原子性。一旦该事务的任何语句在执行时抛出异常,该事务将被回滚。当事务回滚发生时,这些流文件都不会被路由到“成功”。如果失败回滚属性设置为 true,则这些 FlowFiles 将保留在输入关系中。当失败回滚属性设置为false,如果这些FlowFiles中的任何一个将被路由到’retry’,所有这些FlowFiles都将被路由到’retry’。否则,它们将被路由到’failure’。如果此值为 false,则这些属性将被忽略,并且更新将相互独立。
- 数据库会话自动提交:在正在使用的数据库连接上设置的自动提交模式。如果设置为 false,则操作将被显式提交或回滚(分别基于成功或失败),如果设置为 true,则驱动程序/数据库处理提交/回滚。
- 事务超时:如果支持碎片事物属性设置为 true,则指定等待特定 fragment.identifier 属性的所有 FlowFiles 到达多长时间,然后才将具有该标识符的所有 FlowFiles 传输到“失败”关系
- 批量大小:在单个事务中放入数据库的流文件的数量
- 失败回滚:指定如何处理错误。默认情况下 (false),如果在处理 FlowFile 时发生错误,FlowFile 将根据错误类型路由到“失败”或“重试”关系,处理器可以继续处理下一个 FlowFile。相反,您可能希望回滚当前处理的流文件并立即停止进一步处理。在这种情况下,您可以通过启用此“失败时回滚”属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,不会对其进行惩罚并重复处理,直到成功处理或通过其他方式删除。设置足够的“Yield Duration”以避免过于频繁地重试非常重要。
QueryDatabaseTable
生成 SQL 选择查询,或使用提供的语句,并执行它以获取指定最大值列中的值大于先前看到的最大值的所有行。 查询结果将被转换为 Avro 格式。不允许传入连接。
关键属性
- 数据库连接池服务
- 数据库类型:数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,Generic 类型就足够了,但某些数据库(例如 Oracle)需要自定义 SQL 子句。
- 表名
- 返回的列:在查询中使用的以逗号分隔的列名列表。如果您的数据库需要对名称进行特殊处理(例如引用),则每个名称都应包括此类处理。如果未提供列名,则将返回指定表中的所有列。
- 附加 WHERE 子句:在 WHERE 条件中添加的自定义子句。
- 自定义查询:自定义 SQL 查询。此查询将被包装为子查询,而不是从其他属性构建 SQL 查询。查询必须没有 ORDER BY 语句。
- 最大值列:以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每一列的最大值。使用多列意味着列列表的顺序,并且每一列的值预计比前列的值增加得更慢。因此,使用多列意味着列的层次结构,这通常用于分区表。该处理器可用于仅检索自上次检索以来已添加/更新的那些行。注意,一些 JDBC 类型如 bit/boolean 不利于保持最大值,所以这些类型的列不应该在这个属性中列出,并且在处理过程中会导致错误。如果未提供列,则将考虑表中的所有行,这可能会对性能产生影响。
- 初始加载策略:当处理器第一次启动(或它的状态已被清除)时,如何处理数据库表中的现有行。如果还配置了任何“initial.maxvalue.*”动态属性,则该属性将被忽略。
- 最长等待时间:运行 SQL 选择查询所允许的最长时间,零表示没有限制。小于 1 秒的最大时间将等于零。
- 获取大小:一次从结果集中获取的结果行数。如果指定的值为零,则忽略提示。
- 每个流文件的最大行数:将包含在单个 FlowFile 中的最大结果行数。这将允许您将非常大的结果集分解为多个 FlowFile。如果指定的值为零,则所有行都在单个 FlowFile 中返回。
- 输出批量大小:在提交进程会话之前要排队的输出流文件的数量。
- 规范化表/列名称:是否将列名中的非 Avro 兼容字符更改为 Avro 兼容字符。
- 事务隔离级别
PutDatabaseRecord
PutDatabaseRecord 处理器使用指定的 RecordReader 从传入流文件中输入(可能是多个)记录。
关键属性
- 记录读取器:和上游的记录写入器格式一样
- 数据库类型:数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,Generic 类型就足够了,但某些数据库(例如 Oracle)需要自定义 SQL 子句。
- SQL语句类型:增、删、改
- 语句类型记录路径:指定一个 RecordPath 来评估每个 Record 以确定语句类型。RecordPath 应该等同于 INSERT、UPDATE、UPSERT 或 DELETE。仅当[Statement Type] 属性的值为“使用记录路径”
- 数据记录路径:如果指定,此属性表示将针对每个传入记录评估的 RecordPath,并且从评估 RecordPath 得到的记录将被发送到数据库,而不是发送整个传入记录。如果未指定,则整个传入记录将发布到数据库。
- 数据库连接池服务
- 表名
- 翻译字段名称:如果为true,处理器将JSON字段名转换为指定表的适当列名。如果为false,则JSON字段名必须与列名完全匹配,否则不会更新列名
- 不匹配字段后的行为:忽略、执行失败
- 不匹配列名后的行为:忽略、警告、执行失败
- 最大等待时间:运行 SQL 语句允许的最长时间,零表示没有限制。小于 1 秒的最大时间将等于零。
- 失败回滚:指定如何处理错误。默认情况下 (false),如果在处理 FlowFile 时发生错误,FlowFile 将根据错误类型路由到“失败”或“重试”关系,处理器可以继续处理下一个 FlowFile。相反,您可能希望回滚当前处理的流文件并立即停止进一步处理。在这种情况下,您可以通过启用此“失败时回滚”属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,不会对其进行惩罚并重复处理,直到成功处理或通过其他方式删除。设置足够的“Yield Duration”以避免过于频繁地重试非常重要。
ConvertAvroToParquet
将 Avro 记录转换为 Parquet 文件格式。
关键属性
- 压缩类型:正在写入的文件的压缩类型。
- 行组大小:Parquet 编写器使用的行组大小。
- 页面大小:Parquet writer 使用的页面大小。
- 字典页面大小:Parquet 编写器使用的字典页面大小。
- 最大填充尺寸:用于将行组与底层文件系统中的块对齐的最大填充量。
- 启用字典编码
- 启用验证
- 写入版本
PutParquet
使用提供的 Record Reader 从传入的 FlowFile 读取记录,并将这些记录写入 Parquet 文件。
关键属性
- Hadoop 配置资源:包含 Hadoop 文件系统配置的文件或逗号分隔的文件列表。如果没有这个,Hadoop 将在类路径中搜索“core-site.xml”和“hdfs-site.xml”文件,或者将恢复为默认配置。
- 其他类路径资源:将添加到类路径并用于加载本机文件或目录的路径的列表。指定目录时,该目录中的所有文件都将添加到类路径中,但不包括子目录。
- 记录读取器
- 目录:应写入文件的父目录。如果它不存在,自动创建。
- 压缩类型
- 覆盖文件
- 删除 CRC 文件:在成功写入 Parquet 文件后是否应删除相应的 CRC 文件
ConvertCharacterSet
将NiFi数据流的FlowFile的内容从一种字符集转换成另外一种字符集
属性
- 输入的字符集
- 输出的字符集
JVM支持的字符集
1 | public static void main(String[] args) { |
常用字符集
- GB2312
- GBK
- US-ASCII
- UTF-8
ConvertExcelToCSVProcessor
使用 Microsoft Excel 文档并将每个工作表转换为 csv。 来自传入 Excel 文档的每个工作表都将生成一个新的流文件,该流文件将从该处理器输出。 每个输出流文件的内容将被格式化为 csv 文件,其中 excel 工作表中的每一行都作为 csv 文件中的换行符输出。
关键属性
- 要提取的工作表:应从 Excel 文档中提取的 Excel 文档工作表名称的逗号分隔列表。如果此属性留空,则将从 Excel 文档中提取所有工作表。名称列表不区分大小写。
- 要跳过的行数:要开始处理的第一行的行号。使用它可以跳过工作表顶部不属于数据集的数据行。
- 要跳过的列:要跳过的列号的逗号分隔列表。使用列号而不是字母名称。使用它可以跳过工作表中您不想作为记录的一部分提取的任何列。
- 格式化单元格值:应该使用 Excel 中应用的格式将单元格值写入 CSV,还是应该将它们打印为原始值。
- CSV 格式
- 值分隔符:用于分隔 CSV 记录中的值/字段的字符。
- 包括标题行:指定是否应将 CSV 列名写为第一行。
- 引用字符:用于引用值的字符,以便不必使用转义字符。如果属性已通过表达式语言指定,但表达式在运行时被评估为无效的引号字符,则将跳过它并使用默认的引号字符。
- 转义字符:用于转义对 CSV 解析器具有特定含义的字符的字符。如果属性已通过表达式语言指定,但表达式在运行时被评估为无效的转义字符,则将跳过它并使用默认的转义字符。将其设置为空字符串意味着不应使用转义字符。
- 评论标记:用于表示注释开始的字符。任何以此注释开头的行都将被忽略。
- 空字符串:指定一个字符串,如果在 CSV 中作为值存在,则应将其视为空字段,而不是使用文字值。仅当[CSV 格式] 属性的值为“自定义格式”
- 截取字段:是否应从字段的开头和结尾删除空格仅当**[CSV 格式] 属性的值为“自定义格式”**
- 记录分隔符:指定用于分隔 CSV 记录的字符仅当**[CSV 格式] 属性的值为“自定义格式”**
- 包括尾随分隔符:如果为 true,则将在写入的每个 CSV 记录中添加一个尾随分隔符。如果为 false,则将省略尾随分隔符。
FlattenJson
使用户能够获取嵌套的 JSON 文档并将其展平为简单的键/值对文档。
关键属性
- 分隔符:用于连接键的分隔符。必须是 JSON 合法字符。
- 扁平化模式:默认保留数组
- 忽略保留字符:如果为true,键中的保留字符将被忽略
- 返回类型
- 字符集
- 完美输出json:格式化json
JoltTransformJSON
java开放语言工具组 (java open language toolkit),应用JOLT来转换JSON内容
关键属性
- Jolt转为DSL:指定应与提供的规范一起使用的变换
- 自定义转换类名称
- 自定义模块目录
- Jolt规范
- 转换缓存大小:控制我们在内存中缓存的转换数量,以避免每次都编译转换。
- 完美输出
高级
- Jolt规范
- JSON输入
- JSON输出
ReplaceText
根据处理器属性配置的正则表达式对FlowFile的内容进行匹配,如果匹配成功将会将匹配成功的字段替换为配置属性中的字段
关键属性
搜索值:通过正则表达式搜索需要替换的值
替换的值:替换后的值。支持对正则表达式捕获组使用“正则表达式替换”反向引用,但引用正则表达式中不存在的捕获组的反向引用将被视为文字值。 反向引用也可以使用表达式语言引用,如
$1,$2等。必须包含单引号,因为这些变量不是“标准”属性名称字符集
最大缓冲区大小
替换策略:
- Prepend:前置。在流文件的开头或每行的开头插入替换值(取决于评估模式)。对于“逐行”评估模式,该值将附加到每一行。同样,对于“First-Line”、“Last-Line”、“Except-Last-Line”和“Except-First-Line”评估模式,该值将单独添加到页眉、页脚、除页眉和除了页脚之外的所有行。对于“整个文本”评估模式,该值将附加到整个文本。
- Append:附加。在流文件的末尾或每行的末尾插入替换值(取决于评估模式)。对于“逐行”评估模式,该值将附加到每一行。同样,对于“First-Line”、“Last-Line”、“Except-Last-Line”和“Except-First-Line”评估模式,该值将附加到单独的页眉、单独的页脚、除页眉和除了页脚之外的所有行。对于“整个文本”评估模式,该值将附加到整个文本。
- Regex Replace:正则替换。将搜索值解释为正则表达式并将所有匹配项替换为替换值。替换值可以通过使用美元符号后跟捕获组编号来引用搜索值中使用的捕获组,例如 $1 或 $2。如果搜索值设置为 .*,则所有内容都将被替换,甚至无需评估正则表达式。
- Literal Replace:文字替换。搜索搜索值的所有实例并将匹配项替换为替换值。
- Always Replace:总是替换。总是替换流文件的整行或全部内容(取决于
属性的值)并且不费心搜索任何值。选择此策略时,将忽略 属性。 - Substitute Variables:替换变量。使用 FlowFile 属性替换变量引用(以 ${var} 形式指定)以通过变量名称查找替换值。选择此策略时,将忽略
和 属性。
评估模式:分别对每一行运行“替换策略”(逐行)或将整个文件缓冲到内存中(整个文本)并针对它运行。
逐行评估模式:对流文件中的所有行单独(逐行)运行“替换策略”,仅第一行(标题),仅最后一行(页脚),除第一行(标题)或除最后一行 (页脚)。
ReplaceTextWithMapping
通过评估正则表达式并用映射文件中提供的一些替代值替换与正则表达式匹配的内容来更新流文件的内容。
关键属性
- 正则表达式:通过正则表达式搜索需要替换的内容
- 匹配的组:提供的正则表达式的匹配组的编号,用映射文件中的相应值替换(如果存在)。
- 匹配的文件:包含映射的文件的名称(包括完整路径)。
- 映射文件刷新间隔:检查映射文件更新的轮询间隔(以秒为单位)。 默认值为 60 秒。
- 字符集
- 最大缓冲大小
TransformXml
将提供的XSLT文件转换为XML文件。
关键属性
- XSLT文件名:提供 XSLT 文件的名称(包括完整路径)以应用于 FlowFile XML 内容。
- XSLT 查找:用于存储 XSLT 定义的控制器查找。
- XSLT 查找key:用于从 XSLT 查找控制器检索 XSLT 定义的key。使用 XSLT 控制器属性时必须设置此属性。
- 缩进
- 安全处理:是否缓解各种与 XML 相关的攻击,例如 XXE(XML 外部实体)攻击。
- 缓存大小:要缓存的最大样式表数。零禁用缓存。
- 上次访问后缓存生存时间:上次访问后样式表在缓存中保留多长时间。
AttributesToCSV
根据输入的流文件属性生成CSV文件。生成的 CSV 可以写入新生成的属性名为“CSVAttributes”的属性,也可以作为内容写入流文件。如果属性值包含逗号、换行符或双引号,则属性值将使用双引号进行转义。属性值中的任何双引号字符都使用另一个双引号进行转义。
关键属性
- 属性列表:要包含在生成的 CSV 中的属性的逗号分隔列表。如果此值留空,则将包括所有现有属性。此属性列表区分大小写并支持包含逗号的属性名称。如果未找到列表中指定的属性,它将根据“空值”属性以空字符串或 null 发送到生成的 CSV。如果在此列表中指定了核心属性并且“包含核心属性”属性为 false,则将包含核心属性。
- 属性正则表达式:将根据流文件属性评估以选择匹配属性的正则表达式。此属性可以与属性列表属性结合使用。最终输出将包含在 ATTRIBUTE_LIST 和 ATTRIBUTE_REGEX 中找到的匹配项组合。
- 目标:包含流文件属性、流文件内容。控制 CSV 值是作为新的流文件属性“CSVData”写入还是写入流文件内容。
- 包括核心属性:确定每个流文件中包含的 FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes 是否应包含在生成的最终 CSV 值中。核心属性将添加到 CSVData 和 CSVSchema 字符串的末尾。属性列表属性会覆盖此设置。
- 空值:如果为true,则结果 CSV 中的不存在或空属性将为“null”。如果为 false,则会在 CSV 中放置一个空字符串。
- 包含Schema:如果为 true,则架构(属性名称)也将转换为 CSV 字符串,该字符串将应用于名为“CSVSchema”的新属性或应用于内容中的第一行,具体取决于 DESTINATION 属性设置。
AttributesToJSON
根据输入的流文件生成JSON。得到的JSON既可以写入新属性 JSONAttributes,也可以作为内容写入流文件。
关键属性
- 属性列表:要包含在生成的 JSON 中的属性的逗号分隔列表。如果此值留空,则将包括所有现有属性。此属性列表区分大小写并支持包含逗号的属性名称。如果未找到列表中指定的属性,它将根据“空值”属性以空字符串或 null 发送到生成的 JSON 。如果在此列表中指定了核心属性并且“包含核心属性”属性为 false,则将包含核心属性。
- 属性正则表达式:将根据流文件属性评估以选择匹配属性的正则表达式。此属性可以与属性列表属性结合使用。最终输出将包含在 ATTRIBUTE_LIST 和 ATTRIBUTE_REGEX 中找到的匹配项组合。
- 目标:包含流文件属性、流文件内容。控制 JSON值是作为新的流文件属性“JSONAttributes”写入还是写入流文件内容。
- 包括核心属性:确定每个流文件中包含的 FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes 是否应包含在生成的最终 JSON 值中。
- 空值:如果为true,则结果 JSON中的不存在或空属性将为“null”。如果为 false,则会在 JSON中放置一个空字符串。
CountText
计算传入文本的各种指标。请求的结果将被记录为属性。生成的流文件不会修改其内容。
关键属性
- 计数行数:如果启用,将计算传入文本中存在的行数。
- 计数非空行数:如果启用,将计算传入文本中包含非空白字符的行数。
- 计数字数:如果启用,将计算传入文本中存在的单词数(由空格限制的字母数字字符组)。除非 ‘Split Words on Symbols’ 为真,否则通用逻辑分隔符 [_-.] 不会绑定单词。
- 计数字符数:如果启用,将计算传入文本中存在的字符数(包括空格和符号,但不包括换行符和回车符)。
- 通过符号拆分后的词:如果启用,字数统计将识别由常用逻辑分隔符 [ _ - 分隔的字符串。] 作为独立词(例如,符号上的拆分词 = 4 个词)。
- 字符编码
- 实时调用:如果为 true,则计数器将立即更新,而不考虑 ProcessSession 是提交还是回滚;否则,仅当提交 ProcessSession 时计数器才会增加。
数据库操作示例
记录数据库处理器操作示例
字符集转换
Excel转CSV
扁平化json示例
原数据
1 | [{"aa":"bbbb","ccc":{"ddd":"eeee"}},{"ff":"aaa"}] |
normal模式
1 | {"[0].aa":"bbbb","[0].ccc.ddd":"eeee","[1].ff":"aaa"} |
keep arrays模式
1 | [{"aa":"bbbb","ccc.ddd":"eeee"},{"ff":"aaa"}] |
dot notation模式
1 | {".0.aa":"bbbb",".0.ccc.ddd":"eeee",".1.ff":"aaa"} |
keep primitive arrays模式
1 | {"[0].aa":"bbbb","[0].ccc.ddd":"eeee","[1].ff":"aaa"} |
Jolt转化JSON示例
传入JSON数据
1 | { |
Jolt规范
1 | [ |
输出JSON
1 | { |
替换文本示例
使用映射替换文本示例
生成的流文件内容
1 | id,name,star |
增加属性(可选)
记录读取器:CSVReader
记录写入器:CSVReaderSetWriter
属性名称:/level
属性值:star
替换映射处理器的正则表达式
1 | [0-9](?!,) |
映射文件内容
1 | 1 1S |
替换结果
1 | id,name,star,level |
属性生成CSV数据
属性生成JSON数据
统计文本示例
流文件文本
1 | -----BEGIN OPENSSH PRIVATE KEY----- |
统计配置
输出结果属性
服务
Database Connection Pooling Service
数据库连接池服务
关键属性
数据库连接URL:jdbc:mysql://localhost:3306/nifi
数据库驱动名称:com.mysql.cj.jdbc.Driver
数据库驱动位置:D:\nifi-1.17.0\lib\mysql-connector-java-8.0.20.jar
数据库用户名:root
密码:root
最长等待时间:默认500毫秒,-1 无限期等待。
最大总连接数:默认8,为负数表示无限制。
验证查询:用于在返回连接之前验证连接的验证查询。当连接无效时,它会被丢弃并返回新的有效连接。
最小空闲连接数:在不创建额外连接的情况下,池中可以保持空闲的最小连接数。设置为零允许没有空闲连接。
最大空闲连接数:池中可以保持空闲而不释放额外连接的最大连接数。设置为任何负值允许无限的空闲连接。
最长连接时间:以毫秒为单位,零或更小的值意味着连接具有无限的生命周期。
Record Reader
记录读取器,指定用于解析传入数据和确定数据架构的控制器服务。
选择兼容的控制器服务,所选范围如下:
- AvroReader
- XMLReader
- JsonPathReader
- GrokReader
- SyslogReader
- Syslog5424Reader
- CSVReader
- ReaderLookup
- ScriptedReader
- WindowsEventLogReader
- CEFReader
- JsonTreeReader
- ParquetReader