如何编写一个PG的FDW

之前写了那么多学习FDW的记录,最终还是需要通过写一个FDW来验证学习的成果。本次暂时实现的是查询的FDW。修改方面的还待后续继续深入。
根据之前的学习记录:学习PostgreSQL的FDW(#1),我们知道即使要实现一个最简单的FDW,也要实现其中7个回调函数:GetForeignRelSizeGetForeignPathsGetForeignPlanBeginForeignScanIterateForeignScanReScanForeignScanEndForeignScan。而他们的作用,之前的文章已详细解释过,这里就不再详述了。

control文件

因为FDW实质是也是PG的一个扩展,因此也是需要有control文件的

1
2
3
4
comment = 'sample fdw' # FDW 的描述
default_version = '0.0.1' # FDW 的版本号
module_pathname = '$libdir/sample_fdw' # fdw的路径
relocatable = true

sql文件

同上,之前也说过需要实现fdw_handler函数和选择性实现fdw_validator函数。因此需要在sql文件中定义这些函数

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE FUNCTION sample_fdw_handler()
RETURNS fdw_handler
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION sample_fdw_validator(text[], oid)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FOREIGN DATA WRAPPER sample_fdw
HANDLER sample_fdw_handler
VALIDATOR sample_fdw_validator;

在C文件中实现fdw_handler函数和选择性实现fdw_validator函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Datum
sample_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *fdwroutine = makeNode(FdwRoutine);

/* these are required */
fdwroutine->GetForeignRelSize = sampleGetForeignRelSize; /* S U D */
fdwroutine->GetForeignPaths = sampleGetForeignPaths; /* S U D */
fdwroutine->GetForeignPlan = sampleGetForeignPlan; /* S U D */
fdwroutine->BeginForeignScan = sampleBeginForeignScan; /* S U D */
fdwroutine->IterateForeignScan = sampleIterateForeignScan; /* S */
fdwroutine->ReScanForeignScan = sampleReScanForeignScan; /* S */
fdwroutine->EndForeignScan = sampleEndForeignScan; /* S U D */

PG_RETURN_POINTER(fdwroutine);
}

Datum
infocycle_fdw_validator(PG_FUNCTION_ARGS)
{
Datum optionArray = PG_GETARG_DATUM(0);
Oid optionContextId = PG_GETARG_OID(1);
List *optionList = untransformRelOptions(optionArray);
ListCell *optionCell = NULL;

// 其中做一些校验FDW的配置是否正确,比如options是否符合要求
PG_RETURN_VOID();
}

7个回调函数

GetForeignRelSize(PlannerInfo root, RelOptInfo baserel, Oid foreigntableid)

该函数提供外部表对于计算访问代价所需的基础数据。需要更新baserel的rows字段,即是告诉PG,此外部表的估算行数。也可以选择性更新baserel的width字段,即行宽。
实现此函数,只需根据自己要实现的外部表的数据源来更新baserel->rows即可。不要是完全准确的值。

GetForeignPaths(PlannerInfo root, RelOptInfo baserel, Oid foreigntableid)

该函数用于生成对目标外部表的访问路径,必须至少提供以后访问路径。可以使用PG提供的函数create_foreignscan_path来生成,我们只需把这个函数的所需的参数传进去即可,可以生成多个路径,并使用add_path将这些路径加入到PG的访问路径列表中。create_foreignscan_path函数声明如下:

1
2
3
4
5
6
7
8
ForeignPath *
create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
PathTarget *target,
double rows, Cost startup_cost, Cost total_cost,
List *pathkeys,
Relids required_outer,
Path *fdw_outerpath,
List *fdw_private)

root和rel分别是GetForeignPaths的入参root和baserel,target可以设置成NULL使用默认的pathtarget。rows是GetForeignRelSize中计算的外部表的行数,可以从baserel->rows获取。startup_cost是这个路径的起始代价,total_cost是这个路径的总代价,如果有多条路径,需要根据每一条访问路径计算其代价。pathkeys如提供此参数,即代表此路径是预排序好的结果。fdw_outerpath暂未知用途,fdw_private是此fdw的私有信息,这种信息被用来标识想要使用的指定扫描方法。

GetForeignPlan(PlannerInfo root, RelOptInfo baserel,Oid foreigntableid, ForeignPath best_path, List tlist, List scan_clauses, Plan outer_plan)

此函数用于生成访问目标外部表的ForeignScan计划节点。可以使用PG提供的函数make_foreignscan来生成。make_foreignscan函数声明如下:

1
2
3
4
5
6
7
8
9
ForeignScan *
make_foreignscan(List *qptlist,
List *qpqual,
Index scanrelid,
List *fdw_exprs,
List *fdw_private,
List *fdw_scan_tlist,
List *fdw_recheck_quals,
Plan *outer_plan)

qptlist查询目标列表,直接传入GetForeignPlan的入参tlist;qpqual是查询语句,可以使用PG提供的函数extract_actual_clauses对scan_clauses进行提取;scanrelid如果是单表,即是baserel->relid,如果baserel是Join relation 或者 upper relation 设置scanrelid为0;fdw_exprs额外的表达式,没有可传NIL;fdw_private是FDW的私有信息;可提供过执行器调用的回调函数中使用;fdw_scan_tlist暂未知具体用途,可传NIL;fdw_recheck_quals暂未知具体用途,可传NIL;outer_plan暂未知具体用途,可传NULL;其中fdw_exprs,fdw_scan_tlist,fdw_recheck_quals,outer_plan主要作用于Join relation 或者 upper relation。

BeginForeignScan(ForeignScanState *node, int eflags)

获取执行ForeignScan算子所需的信息,并将它们组织并保存在ForeignScanState中,比如说外部数据库的连接,或者是打开文件的句柄等资源信息都可以保存在ForeignScanState中,供IterateForeignScan使用。不需要在IterateForeignScan里面重复申请。

IterateForeignScan(ForeignScanState *node)

读取外部数据源的一行数据,并将它组织为PG中的TupleTableSlot。这个函数是最关键的函数,因为最终SQL执行后返回的数据都是通过这个函数来组织的,因此一行数据是如何获取到的,我们可以根据自己的需求灵活控制。基本的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
TupleTableSlot      *tupleSlot = node->ss.ss_ScanTupleSlot; //从node中获取元组槽
Datum *columnValues = tupleSlot->tts_values;//获取元组槽中的values数组
bool *columnNulls = tupleSlot->tts_isnull;//获取元组槽中的isnull数组
ExecClearTuple(tupleSlot);//执行此函数对元组槽进行一些清理工作,并标记此元组槽是空的
if(/* 判断是否能获取到新的一行数据 */){
// 对columnValues和columnNulls数组进行赋值
ExecStoreVirtualTuple(tupleSlot);//标记此元组槽是有效的
}

return tupleSlot;//最后返回元组槽

其中对columnValues和columnNulls数组进行赋值必须按照创建表时,字段的顺序来进行赋值,否则数据将会错乱或者返回结果失败。由于PG的数据是用PG自己定义的Datum类型存储数据的,因此我们在对columnValues数组赋值前,必须要先将数据转成Datum类型再进行赋值,而这一转换,其实可以直接使用PG已实现的一套转换函数:PointerGetDatumCStringGetDatumInt32GetDatumUInt32GetDatumCharGetDatumBoolGetDatumObjectIdGetDatum等等。

ReScanForeignScan(ForeignScanState *node)

将外部数据源的读取位置重置回最初的起始位置,比如说将文件的游标重置回起始位置,或者是迭代器之类的重置回起始位置。

EndForeignScan(ForeignScanState *node)

释放整个ForeignScan算子执行过程中占用的外部资源或FDW中的资源,即把在BeginForeignScan里申请的资源进行释放。

其他

在实现以上回调函数,难免会用到一些PG已经实现的函数或者数据结构,只有我们在源代码中引入PG相应的头文件即可,因为在编译安装FDW的时候,需要知道pg_config的,因此会自动编译进我们的so文件中。