Asynchronous Operations

In Development
This feature is actively being developed. Breaking changes should be expected. Please contact us before using this feature.

Introduction

Synqly Connector APIs are synchronous by default, meaning operations are executed immediately when called and Provider data is returned in the response to the call. This model is very simple to work with and suitable for many types of operations.

Some operations however may do not fit very well into the synchronous model, for example when large amounts data is returned or when a Provider takes a very long time to respond.

Such operations can instead be executed asynchronously. What this means is that Synqly will schedule the operation, but not return the result of the operation in the same call. Instead the result is sent to a pre-configured output Sink.

Asynchronous Operations supports:

  • filters and large response limits, automatically performing multiple requests as necessary
  • performing long running operations (up to 1 hr)
  • automatic continuation/retry on failure where possible
  • polling operation status, or pushing status events to an Organization webhook
  • posting the operation results to a pre-configured output Sink of your choice

Example

Note: You must configure an output Sink before scheduling an asynchronous operation. For more information, see Synqly Integrations.

In this example using our Go SDK, we'll be creating a Splunk SIEM integration, and then schedule an asynchronous operation to query some data. This example assumes an output Sink has been configured.

Creating an Integration

Asynchronous operations, just like their synchronous counterpart, are called using an Integration token.

In this example we'll be using the long-lived Integration token returned when creating the integration, but it is of course also possible to use the shorter-lived tokens generated via the Create Integration Token API.

The following code creates a Splunk SIEM integration:

integrationName := "Async Operations Example"
integration, err := fixture.MgmtClient.Integrations.Create(ctx, accountId, &mgmt.CreateIntegrationRequest{
        Fullname: &integrationName,
        ProviderConfig: &mgmt.ProviderConfig{
                 SiemSplunk: &mgmt.SiemSplunk{
                        HecUrl:                  hecUrl,
                        HecCredential:           &mgmt.SplunkHecToken{Token: &mgmt.TokenCredential{Secret: hecToken}},
                        SkipTlsVerify:           true,
                        SourceType:              engine.String("ocsf"),
                        SearchServiceCredential: &mgmt.SplunkSearchCredential{Token: &mgmt.TokenCredential{Secret: restToken}},
                        SearchServiceUrl:        engine.String(restUrl),
                },
        },
})
require.NoError(t, err)

Scheduling an operation

The main difference between the synchronous and asynchronous execution models is that when scheduling an asynchronous operation you don't call the Connector API directly. Rather you specify the operation by its name when scheduling it to run asynchronously.

The name of the operation can be found by navigating to the documentation of API you'd like to call, and then extracting the last segment of the URL for that documentation page. For example, the URL for the Query Events operation documentation is /api-reference/connectors/siem/siem_query_events. The last segment of this URL, and also the name of the operation, is siem_query_events.

In the following code we use the result of creating the integration above – specifically the Integration access token – to schedule the Query Events operation:

engine := engineClient.NewClient(
        engineClientOption.WithBaseURL(fixture.Env.EngineAddress),
        engineClientOption.WithToken(integration.Result.Token.Access.Secret),
)
        
// Schedules the siem_query_events operation. The result will be posted to the pre-configured output Sink.
qlimit := 20
operation, err := engine.Operations.Create(ctx, &engine.CreateOperationRequest{
        Operation: "siem_query_events",
        Input:     &engine.OperationInput{Limit: &qlimit},
})
require.NoError(t, err)
require.NotEmpty(t, operation.Result.Operation.Id)

Checking operation state

Once the operation has been scheduled, we can at any time poll for its status like so:

// Prints the current status of a scheduled operation
operation, err := engine.Operations.Get(ctx, operation.Result.Operation.Id)
require.NoError(t, err)
fmt.Println("Operation status", operation.Result.Status)

While polling is good to continuously check in on an operation, it can sometimes be easier just to get notified of updates. To do this we can set up an Organization Webhook to notify an endpoint when operations are completed. In this example, the URL at svrURL will be notified as operations are completed:

// setup organization webhook
webhookName := "test-webhook-1"
webhookRsp, err := fixture.MgmtClient.OrganizationWebhooks.Create(ctx,
        &mgmt.CreateOrganizationWebhookRequest{
                Name:    &webhookName,
                Filters: []mgmt.WebhookFilter{mgmt.WebhookFilterOperationComplete},
                Url:     svrURL,
                Secret:  secret,
        })
require.NoError(t, err)
assert.NotEqual(t, "", webhookRsp.Result.Id)

The data sent to this endpoint will identify which operation was completed.

Resources

API reference