Incremental Results Loading

An overview of the incremental results feature and how to use it via API

For longer-running queries, ChaosSearch added the incremental loading feature to provide intermediate results for improved user feedback.

For example, this is visible when a user runs a longer Discover query within the ChaosSearch console interface. As the search runs, results stream in periodic updates to the histogram chart. The result is an active, visible update to the display, along with the progress bar that shows the status of the index segments being scanned and processed. Users have visual cues about the Discover scan size, and whether it is still running, until the search is complete.

The following image shows a Discover search running in the ChaosSearch console with incremental results loading.

For smaller searches when only a few segments are scanned, users typically do not see incremental results; all the results are returned to the histogram in one response.

Incremental Results Mechanism

The incremental results feature is implemented using the /kibana/internal/_msearch endpoint. TheCookie parameter incrementalQuerying=true must be added to the Headers of the endpoint to enable incremental results support.

When enabled, the system issues periodic /kibana/internal/_msearch requests while the search runs. While a smaller scale search might have only one _msearch call, longer searches could have many _msearch calls, until the search is finished.

Within the second and subsequent _msearch requests (up to the last one), the Response payload of the endpoint includes a streaming_result_id field with the unique query UUID value to indicate that the message contains a portion of the results for this query (e.g., "streaming_result_id": "d75b665c-a51d-4fff-9247-dd40f6f4d49b" at the end of the "body" field).

The last _msearch (when the query is complete) does not show thestreaming_result_id field in the Response payload.

Incremental Loading in ElasticSearch APIs

When using programmatic ElasticSearch APIs, you can use the /kibana/internal/_msearch endpoint and the incrementalQuerying=true Headers Cookie value in the API call to run in incremental results mode.

Sample Python Script for Incremental Load

The following python script is a sample search that matches on all records in my-sample-view and uses the incremental results loading feature. The results/hits are written to a sample log file incremental-api.log.

There is a version of the script that uses a JWT for authentication, and one that uses a ChaosSearch API Key for authentication.

import argparse
import json
import logging
import requests
from requests_aws4auth import AWS4Auth


es_url, host, access_key, secret_key, account, region = "", "", "", "", "", ""
es_payload = {"searches": [{
 "header": {"index": "my-sample-view", "preference": 1722883631615},
 "body": {
  "query": {
    "bool": {
      "must": [],
      "filter": [
        {
          "match_all": {}
        },
        {
          "range": {
            "timestamp": {
              "gte": "2018-08-01T00:00:00.000Z",
              "lte": "2024-08-06T00:00:00.000Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
 }
}]}


def parse_args():
    parser = argparse.ArgumentParser(description='Concurrency')
    parser.add_argument('--access_key', required=True, help='The API access key')
    parser.add_argument('--secret_key', required=True, help='The API secret key')
    parser.add_argument('--region', required=True, help='The AWS region')
    parser.add_argument('--host', required=True, help='The hostname')
    parser.add_argument('--account', required=True, help='The CS account id')
    return parser.parse_args()


def make_es_request(streaming_result_id=None):
    awsauth = AWS4Auth(access_key, secret_key, region, 's3')
    es_headers = {
                    "Accept": "*/*",
                    "Content-Type": "application/json",
                 }

    cookies = {
                "incrementalQuerying": "true",
                "chaossumo_route_token": account,
                "streaming_result_id": streaming_result_id
              }
    logging.info(f"Posting request {es_url}")
    response = requests.request("POST", es_url, auth=awsauth, json=es_payload, headers=es_headers, cookies=cookies, verify=True, allow_redirects=True)
    logging.info(f"Response code: {response.status_code}")
    logging.info(f"Response text: {response.text}")
    es_response_json = json.loads(response.text)
    logging.info(f"Response json: {es_response_json}")
    if response.status_code != 200:
        logging.info("Request failed %s", es_response_json)
        raise Exception("non-200 es response code")
    else:
        logging.info("Successful request")
        logging.info(es_response_json)

        if not "streaming_result_id" in es_response_json["body"]:
            logging.info("Query complete")
            hits = es_response_json["body"]["responses"][0]["hits"]["hits"]
            x = 0
            for hit in hits:
                logging.info(f"hit: {hit}")
                x += 1
            return json.loads(response.text)
        else:
            total = es_response_json["body"]["responses"][0]["hits"]["total"]
            logging.info(f"Incremental response received: {total} hits")
            return make_es_request(es_response_json["body"]["streaming_result_id"])


def main():
    logging.info("Main: Start")
    args = parse_args()
    global host
    global access_key
    global secret_key
    global account
    global region
    global es_url
    host = args.host
    account = args.account
    access_key = args.access_key
    secret_key = args.secret_key
    region = args.region
    es_url = "https://" + host + "/kibana/internal/_msearch"
    resp_text = make_es_request()
    logging.info(f"Response: {resp_text}")
    logging.info("Main: End")


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, filename='incremental-api.log')
    main()
import argparse
import json
import logging
import requests
from lowercase_booleans import true


view = "my-sample-view"
bucket = ""
token_url = ""
token_headers = {
    "Accept": "application/json",
    "Content-Type": "application/json",
    "x-amz-chaossumo-route-token": "login"
}
token_payload = {}
es_url = ""
es_payload = {"searches": [{
 "header": {"index": "my-sample-view", "preference": 1722883631615},
 "body": {
  "query": {
    "bool": {
      "must": [],
      "filter": [
        {
          "match_all": {}
        },
        {
          "range": {
            "timestamp": {
              "gte": "2024-08-01T00:00:00.000Z",
              "lte": "2024-08-06T00:00:00.000Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
 }
}]}

def parse_args():
    parser = argparse.ArgumentParser(description='Concurrency')
    parser.add_argument('--cs_user', required=True, help='The CS user id')
    parser.add_argument('--cs_pwd', required=True, help='The CS user password')
    parser.add_argument('--host', required=True, help='The hostname')
    parser.add_argument('--account', required=True, help='The CS account id')
    return parser.parse_args()


def get_token():
    response = requests.request("POST", token_url, json=token_payload, headers=token_headers, verify=True)
    logging.info("Token request response %s", response)
    if response.status_code != 200:
        logging.info("Request failed to retrieve token")
        raise Exception("non-200 token response code")
    else:
        logging.info("Successfully retrieved token")
    return json.loads(response.text)['Token']


def make_es_request(streaming_result_id=None):
    token = get_token()
    es_headers = {
                    "Accept": "*/*",
                    "Content-Type": "application/json",
                 }

    cookies = {
                "incrementalQuerying": "true",
                "chaossumo_route_token": account,
                "chaossumo_session_token": token,
                "streaming_result_id": streaming_result_id
              }
    logging.info(f"Posting request {es_url}")
    response = requests.request("POST", es_url, json=es_payload, headers=es_headers, cookies=cookies, verify=True, allow_redirects=True)
    logging.info(f"Response code: {response.status_code}")
    logging.info(f"Response text: {response.text}")
    es_response_json = json.loads(response.text)
    logging.info(f"Response json: {es_response_json}")
    if response.status_code != 200:
        logging.info("Request failed %s", es_response_json)
        raise Exception("non-200 es response code")
    else:
        logging.info("Successful request")
        logging.info(es_response_json)

        if not "streaming_result_id" in es_response_json["body"]:
            logging.info("Query complete")
            hits = es_response_json["body"]["responses"][0]["hits"]["hits"]
            x = 0
            for hit in hits:
                #print(hit)
                logging.info(f"hit: {hit}")
                x += 1
            #print(x)
            return json.loads(response.text)
        else:
            total = es_response_json["body"]["responses"][0]["hits"]["total"]
            logging.info(f"Incremental response received: {total} hits")
            return make_es_request(es_response_json["body"]["streaming_result_id"])


def main():
    logging.info("Main: Start")
    args = parse_args()
    global host
    global token_url
    global token_payload
    global account
    global es_url
    host = args.host
    account = args.account
    token_url = "https://" + host + "/User/login"
    token_payload = {"Username": args.cs_user, "Password": args.cs_pwd,
                     "ParentUid": account, "Type": "service", "UniqueSession": true}
    es_url = "https://" + host + "/kibana/internal/_msearch"
    resp_text = make_es_request()
    logging.info(f"Response: {resp_text}")
    logging.info("Main: End")


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, filename='incremental-api.log')
    main()


In the sample python script, update the my-sample-view name and _msearch query parameters as applicable for your search. (The preference value is required, but not used at present. Specify any integer value.)

Run the API Keys version of the script with a python command similar to the following:

python incremental-api.py --access_key <cs_api_key> --secret_key <secret> --region <region> --host <cs_host> --account \<cs_external_uuid>

Run the JWT script with a python command similar to the following:

python incremental-api.py --cs_user <cs_user> --cs_pwd <cs_password> --host <cs_host> --account <cs_external_uuid>

The script might require a few minutes to complete.

📘

This feature is available for embedded analytics customers on worker-based pricing.

Contact your ChaosSearch Customer Success representative for more information about this feature.