我们知道单表可以通过 GenerateTableFetch 和 ExecuteSQL
但是如果查询为大批量数据且多联表情况,为了避免内存溢出,需要手动编写 Groovy 脚本实现分批次查询,如下:
import groovy.sql.Sql
def dbService = context.getControllerService("b6a4e87d-0195-1000-8a9c-ffeaefb15995")
if (!dbService) {
log.error("No database")
return
}
def sql = new Sql(dbService.getConnection())
def minId = sql.firstRow("select MIN(id) as min_id from seim_ent_base_info")
def maxId = sql.firstRow("select MAX(id) as max_id from seim_ent_base_info")
if (!minId.min_id || !maxId.max_id) {
log.error("No id")
return
}
int min = minId.min_id
int max = maxId.max_id
int batchSize = 1000
for (int i = min; i < max; i += batchSize) {
int cur = Math.min(i + batchSize - 1, max)
String query = """select
ebi.uniscid,
ebi.cst_id,
ebi.org_no,
...(此处省略)
from seim_ent_base_info ebi
left join seim_ent_9v_portrait enp on enp.uniscid=ebi.uniscid
left join seim_9v_final_info nfi on nfi.uniscid=ebi.uniscid
left join seim_ent_buss_cust_total ebct on ebct.cst_id=ebi.cst_id
left join seim_ent_legal_total elt on elt.txm = ebi.uniscid
left join seim_sys_user usr on usr.user_username =ebct.manager_no
left join seim_t1_dim_bank_pub pub on pub.bank_no=usr.user_organization_code
...(此处省略)
where ebi.id between ${i} and ${cur}"""
def flowFile = session.create()
flowFile = session.putAttribute(flowFile, 'sql.query', query)
session.transfer(flowFile, REL_SUCCESS)
}
sql.close()
其中,context.getControllerService 后面需要跟 DBCPConnectionPool 的 ID。
如果想要将最后的 SQL 放在 Attribute 中,可以使用 session.putAttribute(flowFile, 'sql.query', query),后面 ExecuteSQL 获取 sql 时,SQL Query 中就要使用 ${sql.query}
【无效方式】将 SQL 放到 Content 中:
def flowFile = session.create()
flowFile = session.write(flowFile, {outputStream -> outputStream.write(query.bytes)} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
ExecuteSQL 不直接支持从 FlowFile 的内容中动态读取 SQL 查询语句,所以哪怕使用 ${flowFile.content} 也获取不到值。