fix(csharp/src/Drivers/Databricks): Correct DatabricksCompositeReader and StatusPoller to Stop/Dispose Appropriately#3217
Conversation
ec41720 to
004a5a7
Compare
|
can you confirm, even without this fix, the polling will stop after statement being disposed, right? if not, we need fix there also |
| @@ -247,6 +247,10 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) | |||
| _downloadQueue.Add(EndOfResultsGuard.Instance, cancellationToken); | |||
| _isCompleted = true; | |||
There was a problem hiding this comment.
From testing:
Small nit but I think we need to avoid this here, since it's possible that DownloadQueue is full, then exception handling would be stuck. Should I modify the Exception handling below, or was there a reason why it it like this? (line 262) @jadewang-db
catch (Exception ex)
{
try
{
_downloadQueue.Add(EndOfResultsGuard.Instance, CancellationToken.None);
}
}
Alternatively, we can create a new CancellationToken with Timeout for this attempt
CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
There was a problem hiding this comment.
thanks, a cancellation token looks good
There was a problem hiding this comment.
Oh just saw this comment, let me implement
There was a problem hiding this comment.
Actually looks like TryAdd is better suited here
9caf8db to
9fd9fea
Compare
d55808c to
74c6ee8
Compare
| @@ -69,13 +72,18 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) | |||
| var operationHandle = _statement.OperationHandle; | |||
| if (operationHandle == null) break; | |||
|
|
|||
There was a problem hiding this comment.
We need to use a timeout token here, instead of cancelling when canceltoken is triggered; if an interrupt is triggered prematurely, the TCLI client may still have unsent/unconsumed results in the buffers, affecting subsequent calls with that client (which is any future call in the same Session)
There was a problem hiding this comment.
are you able to repro this? should we do this to all the thrift rpc calls in the driver?
There was a problem hiding this comment.
I think it is because in THTTPTransport (used by SparkHttpConnection -> DatabricksHttpconnection), a new Stream is created when the request is flushed. If cancellation happens before this, that stream doesn't get discarded:
https://github.com/apache/thrift/blob/master/lib/netstd/Thrift/Transport/Client/THttpTransport.cs#L281
Yes, during testing, got some errors. In the proxy logs, I remember seeing requests sent out with both GetOperationStatus and CloseOperationStatus (in the same request) while testing another PR
I think we are safe in HiveServer2Statement, but we might need to adjust CancellationToken in DatabricksReader, CloudFetchResultFetcher, and DatabricksCompositeReader
There was a problem hiding this comment.
Actually, I think this depends a bit on how CancellationToken could be used by PBI, too
@CurtHagenlocher will mashup ever trigger cancellationTokens passed into IArrowStreamReader.ReadNextBatchAsync? Do we need to ensure that the connection still remains usable for subsequent statements?
There was a problem hiding this comment.
At least for now, I think we can operate this way:
- If the user cancels the token passed in to ReadNextBatchAsync, we should not to break the client
- Dispose() should not break the client either
There was a problem hiding this comment.
@CurtHagenlocher will mashup ever trigger cancellationTokens passed into IArrowStreamReader.ReadNextBatchAsync? Do we need to ensure that the connection still remains usable for subsequent statements?
This is currently unimplemented but we'll need to implement it before GA for parity with the ODBC implementation. What is probably most important for cancellation is query execution, and unless we manage to push forward the proposed ADBC 1.1 API, currently the only way to cancel a running query is to call AdbcStatement.Cancel. There is currently no implementation of this method for any of the C#-implemented drivers :(.
There was a problem hiding this comment.
From a Power BI perspective, the most important use of cancellation is for Direct Query because users can generate a lot of queries simply by clicking around in a visual and in-progress queries will need to be cancelled if their output is no longer needed. DQ output tends to be relatively small, so being able to cancel in the middle of reading the output is arguably less important than being able to cancel before the results start coming back.
ecb0771 to
3263cef
Compare
8b88019 to
8e54490
Compare
579e26d to
be06c48
Compare
| request.StartRowOffset = offset; | ||
|
|
||
| // Cancelling mid-request breaks the client; Dispose() should not break the underlying client | ||
| CancellationToken expiringToken = ApacheUtility.GetCancellationToken(DatabricksConstants.DefaultCloudFetchRequestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); |
There was a problem hiding this comment.
should you respect the connection parameter DatabricksParameters.CloudFetchTimeoutMinutes instead of the default value?
There was a problem hiding this comment.
Do you mean I shouldn't create a new constant here?
There was a problem hiding this comment.
no, what I meant is if you should check the value of the connection parameter CloudFetchTimeoutMinutes (adbc.databricks.cloudfetch.timeout_minutes) which can be set by the client and customer.
There was a problem hiding this comment.
Oh got it, that makes sense, it should be a configurable parameter. To be consistent with the rest of HiveServer2Statement, I'm just using the QueryTimeout parameter (which is what other FetchResultsRequest uses)
I have some changes in a follow-up PR that will make this change easier to do for DatabricksReader, will leave this as a TODO
9242fd2 to
efecc82
Compare
efecc82 to
65f9d0d
Compare
CurtHagenlocher
left a comment
There was a problem hiding this comment.
Thanks! The linter error needs to be fixed and I made a few small low-priority suggestions.
f559692 to
5a48ef2
Compare
5a48ef2 to
4130c83
Compare
Motivation
The following cases are not properly stopping or disposing the status poller:
In addition:
Fixes
DatabricksOperationStatusPollerLogic is now more appropriately managed by DatabricksCompositeReader (moved out of BaseDatabricksReader) to handle all cases where null results (indicating completion) are returned.
Disposing DatabricksCompositeReader appropriately disposes the activeReader and statusPoller
TODO
Follow-up PR - when statement is disposed, it should also dispose the reader (the poller is currently stopped when operationhandle is set to null, but this should also happen explicitly)
Need add some unit testing (follow up pr: #3243)