Apache NiFi 分批查询数据

我们知道单表可以通过 GenerateTableFetch 和 ExecuteSQL

但是如果查询为大批量数据且多联表情况,为了避免内存溢出,需要手动编写 Groovy 脚本实现分批次查询,如下:

import groovy.sql.Sql
def dbService = context.getControllerService("b6a4e87d-0195-1000-8a9c-ffeaefb15995")
if (!dbService) {
  log.error("No database")
  return
}

def sql = new Sql(dbService.getConnection())

def minId = sql.firstRow("select MIN(id) as min_id from seim_ent_base_info")
def maxId = sql.firstRow("select MAX(id) as max_id from seim_ent_base_info")

if (!minId.min_id || !maxId.max_id) {
  log.error("No id")
  return
}

int min = minId.min_id
int max = maxId.max_id
int batchSize = 1000

for (int i = min; i < max; i += batchSize) {
  int cur = Math.min(i + batchSize - 1, max)

  String query = """select 
        ebi.uniscid,
        ebi.cst_id,
        ebi.org_no,
        ...(此处省略)
        from seim_ent_base_info ebi
        left join seim_ent_9v_portrait enp on enp.uniscid=ebi.uniscid
        left join seim_9v_final_info nfi on nfi.uniscid=ebi.uniscid
        left join seim_ent_buss_cust_total ebct on ebct.cst_id=ebi.cst_id
        left join seim_ent_legal_total elt on elt.txm = ebi.uniscid
        left join seim_sys_user usr on  usr.user_username =ebct.manager_no
        left join seim_t1_dim_bank_pub pub  on pub.bank_no=usr.user_organization_code
        ...(此处省略)
        where ebi.id between ${i} and ${cur}"""

  def flowFile = session.create()
  flowFile = session.putAttribute(flowFile, 'sql.query', query)
  session.transfer(flowFile, REL_SUCCESS)
}
sql.close()

其中,context.getControllerService 后面需要跟 DBCPConnectionPool 的 ID。

如果想要将最后的 SQL 放在 Attribute 中,可以使用 session.putAttribute(flowFile, 'sql.query', query),后面 ExecuteSQL 获取 sql 时,SQL Query 中就要使用 ${sql.query}

【无效方式】将 SQL 放到 Content 中:

  def flowFile = session.create()
  flowFile = session.write(flowFile, {outputStream -> outputStream.write(query.bytes)} as OutputStreamCallback)
  session.transfer(flowFile, REL_SUCCESS)

ExecuteSQL 不直接支持从 FlowFile 的内容中动态读取 SQL 查询语句,所以哪怕使用 ${flowFile.content} 也获取不到值。