最近自研了一款数据引擎X,算力框架使用的Spark SQL,在开始研发引擎的时候,考虑到兼容,迁移成本,生态对接等多种因素,于是我兼容了Hive JDBC协议,也就是可以直接使用Hive JDBC Driver或者beeline工具即可连接上。

这样对于已经使用Hive的生态工具来说,只需要切换一个端口即可,同时在Spark SQL层面尽可能的兼容了Hive SQL的语法,因此对于Hive的迁移更加的低成本。

当然一个引擎重点并不是在接口设计上,而是在于定位和解决的问题,以及做了哪些操作处理,这部分未来再说,在引擎开发完成后,我使用类似Superset这类BI工具进行了测试,都没有问题,可流畅使用。

于是以为大多数的BI工具基本都可直接使用,但是后来有同学反馈HUE不能使用,于是我去看了下,发现HUE里面的connector是分为了hive/spark sql 2中不同的type,当时测试使用的spark sql的drive。

虽然不知道这两者driver具体的细节差别是什么,但是我在想可能是spark sql driver有一些特殊处理,引擎x是兼容hive的,那么按理说把hue里面的hive connector port也就是10000切换成x的端口应该就没有问题。

后来又反馈说hive/spark sql connector都无法连接上x,这时候我觉得事情可能没有那么简单,也就是hue的connector里面的定义并不是jdbc driver的标准方式。

看来不得不仔细去分析一下hue的内部源码了,于是我下载了hue的源码,去看下了,hue的spark sql connector连接的是spark thrift serve,hive connector连接的是hive server2。

首先从语法上来说,hue对spark sql和hive做了分开处理,例如show tables,hive返回只有一列tables_nam,而spark sql 的返回是三列,包含db,table_name,和type。

而我在开发X的时候因为是为了和hive兼容,所以我自己写了一个处理器把这部分给兼容掉了,这样的话无论hue的connector是怎么实现的,spark sql肯定是无法连接x的。

但是我的另一个困惑是为啥hive connector也无法访问,我去翻了一下hue的实现,基本发现这个坑非常大。

hue访问hive 并不是走的标准的jdbc接口,当然本质上来说hive server2也不算标准jdbc接口,hive server2是一套thrift接口,是属于短链接的独立请求,为了兼容jdb,所以开发了一个hive jdbc driver封装成了常用的jdbc接口。

但是hue并没有使用标准的jdbc访问方式,也就是:创建连接->创建statement->execute->close,这样的流程,而是直接去通过thrift的方式去挨个api调用,这样就会延伸出一个问题。

就是x兼容的是jdbc接口,会在创建connection的时候才会初始化用户信息,以及分配资源,对应到java代码就是:

Connection connection = DriverManager.getConnection(JDBC_URL, properties);

由于hue是直接访问thrift,就不会有创建connection这一步,直接就进入了statement,这就是导致x收到的请求不存在用户信息,自然就会报错,而hive server2是现有thrift 接口,再有jdbc的封装。

所以他的每一个api都是独立的,其次hive server2是和hadoop生态强耦合,也没有那么多的安全校验,所以hue在访问的时候没有什么大问题。

同时hive server2还存在一个bug,那就是在进行结果获取的时候,hasMoreRows字段设置有问题,导致所有使用hive server2的工具默认都不判断这个字段,而是根据返回内容是否为空来判断是否还有下一页,这样就会多一次请求。

这个问题在我开发x的时候也碰到了,hue里面也说的很明白:

# Note: An operation_handle is attached to a session. All operations that require operation_handle cannot recover if the session is
# closed. Passing the session is not required
def fetch_data(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT, max_rows=1000):
# Fetch until the result is empty dues to a HS2 bug instead of looking at hasMoreRows
results, schema = self.fetch_result(operation_handle, orientation, max_rows)
return HiveServerDataTable(results, schema, operation_handle, self.query_server)

里面的这句:

Fetch until the result is empty dues to a HS2 bug instead of looking at hasMoreRows

找到了是这个问题,于是我对x做了一个修改,就是如果请求直接达到执行api上,依旧通过拦截器检查身份信息,然后重新把请求重定向到open session上去,再把流程拉回正轨,这个改动对x来说基本上是一个很大的影响了。

修改后hue可以正常访问没有大问题,不过还是有一个小问题,那就是对于查询请求来说,返回的数据有2中类型:日志和查询结果。

在hive的接口里面都是通过fetch result方法去查询,通过参数FetchType来区分,为1的时候是请求log,0的时候请求数据内容,但是我调试的时候发现执行成功后hue只会发起查询日志的请求。

不会来请求数据,在请求数据的逻辑上hue的代码为:

def fetch_result(self, operation_handle, orientation=TFetchOrientation.FETCH_FIRST, max_rows=1000):
if operation_handle.hasResultSet:
fetch_req = TFetchResultsReq(operationHandle=operation_handle, orientation=orientation, maxRows=max_rows)
(res, session) = self.call(self._client.FetchResults, fetch_req)
else:
res = TFetchResultsResp(results=TRowSet(startRowOffset=0, rows=[], columns=[]))

if operation_handle.hasResultSet and TFetchOrientation.FETCH_FIRST: # Only fetch for the first call that should be with start_over
meta_req = TGetResultSetMetadataReq(operationHandle=operation_handle)
(schema, session) = self.call(self._client.GetResultSetMetadata, meta_req)
else:
schema = None

return res, schema

也就是如果operation_handle.hasResultSet是false,那么直接不会发起数据内容的查询,我猜测可能是这个问题,但是为什么这个值会是false是个需要继续定位的问题,我怀疑可能又是一个hive的bug引起的特殊方法。

像hive server2这种非标jdbc有些bug,造成周边生态软件有很多特殊case,进行很多特殊处理,要兼容起来挺麻烦的。

我看了下hue的源码,这个部分是因为如下代码:

response['status'] = 'running' if status.value in (
QueryHistory.STATE.running.value, QueryHistory.STATE.submitted.value
) else 'available'
if operation.hasResultSet is not None:
response['has_result_set'] = operation.hasResultSet # HIVE-12442 - With LLAP hasResultSet can change after get_operation_status

也就是在LLAP架构下的Hive,对于判断是否有结果不再是exectue的hasResultSet了,而是通过get_operation_status 单独去问hasResultSet,所以需要在这里单独再维护一个状态。

至此,Hue兼容总算搞定了。


扫码手机观看或分享: