Real-Time Indexing - Logstash & Bulk API
Index and search data in real-time with the Elasticsearch Bulk API
The ChaosSearch platform offers users the ability to have a real-time set-up which will allow them to select certain data sources that need real-time indexing and searching capabilities. This guide will walk through the configuration and requirements needed to set-up the real-time ingest pipeline.
Create Native Object Group
Native Object Groups are created just like normal Object Groups except without a bucket attached to them and with a subset of the normal options.
- Select the Native Object Groups bucket at the top of the S3 list window
- Click Create Native Group
- Input the name of the native group
- Click *Create Native Group
Start/Stop Native Object Group
For ChaosSearch to accept any Bulk requests, the Native Object Group must be started. This functions exactly like normal Object Groups. It is controlled through the exact same endpoint and has all the same features.
- Select the new native object group
- To start indexing, click the Start Indexing button
You'll now see the group content page where you can pause indexing, update the retention policy and allocate compute in the target active index field.
Elasticsearch Bulk Requests
Once the Native Object Group has been started, we accept normal ES Bulk requests (as described below). It follows the Elasticsearch BULK API documentation but requires the AWS V4 authentication headers.
Method: POST
Headers: Must be consistent normal API requests (ex. AWS V4 Signing and related headers)
URI: /elastic/_bulk
Body: As described in the Elasticsearch documentation. We only support "index" and "create" actions
Once sent, the user will be able to see the Indexed data as normal in the Refinery. Users can create Views on it.
import time
import gzip
from json import JSONDecoder, JSONDecodeError
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth
AWS_ACCESS_ID = "CHAOSSEARCH-API-KEY" # fill in
AWS_SECRET_KEY = "CHAOSSEARCH-API-KEY" # fill in
SUBDOMAIN = "poc-trial" # fill in
OBJECT_GROUP = "cs-real-time-logs"
FILENAME = "file_name" # download from s3
IS_GZIPPED = False
CHUNK_SIZE = 1000 # the higher the better without timing out, this is upload-speed dependent (it should be able to be bumped to at least 10000)
awsauth = AWS4Auth(AWS_ACCESS_ID, AWS_SECRET_KEY, "us-east-1", 's3')
es = Elasticsearch(
hosts = [{'host': SUBDOMAIN + '.chaossearch.io', 'port': 443, 'url_prefix': '/elastic', 'use_ssl': True}],
http_auth=awsauth,
connection_class=RequestsHttpConnection,
verify_certs=True
)
def process_blob(full):
decoder = JSONDecoder()
pos = 0
while True:
try:
obj, pos = decoder.raw_decode(full, pos)
yield obj
except JSONDecodeError as e:
return
def actions(filename, object_group, is_gzip):
if is_gzip:
with gzip.open(filename) as f:
blobs = process_blob(f.read().decode("utf-8"))
else:
with open(filename) as f:
blobs = process_blob(f.read())
for blob in blobs:
yield {
'op_type': 'create',
'_index': object_group,
"_id": "1",
"_source": blob
}
print("Feeding start")
start = time.time()
errors = []
failed = 0
success = 0
for ok, item in helpers.parallel_bulk(es, actions(FILENAME, OBJECT_GROUP, IS_GZIPPED), chunk_size=CHUNK_SIZE, max_chunk_bytes=10000000):
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
end = time.time()
print("Feeding complete")
print("errors: " + str(errors))
print("failures: " + str(failed) + ", successes: " + str(success))
print("took " + str(end - start))
Please allow for ~30 seconds after the first ES Bulk request completes for the Index Dataset to appear. After this initial delay, data from new requests will be available for querying in <20 seconds.
Elasticsearch LogStash Integration
To connect to an existing Logstash pipeline to ChaosSearch, the user must set up the Amazon ES Output plugin and configure it to point at ChaosSearch.
input {
s3 {
access_key_id => "AMAZON_ACCESS_KEY_ID"
secret_access_key => "AMAZON_SECRET_ACCESS_KEY"
bucket => "S3-Bucket-Name"
prefix => "prefix-filter"
}
}
output {
amazon_es {
hosts => ["demo.chaossearch.io"]
path => "elastic"
aws_access_key_id => "CHAOSSEARCH_ACCESS_KEY_ID"
aws_secret_access_key => "CHAOSSEARCH_SECRET_ACCESS_KEY"
index => "name-of-ObjectGroup"
}
}
Please ignore the warnings that Logstash throws on startup. It tries to create an Index Template, but CHAOSSEARCH doesn't require one.
Updated 11 months ago