Elasticsearch Sink
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This sink writes data to a Elasticsearch Index, with a few options for dumping data.
How To Install
To use the Elasticsearch sink, you need to install the required dependencies:
How It Works
ElasticsearchSink
publishes messages to Elasticsearch in batches.
You can customize how to handle/export them with ElasticsearchSink
, but the most common
(and default) approach is having a 1:1 correspondence between Kafka message key
and
document _id
, and field types automatically ("dynamically") inferred.
Messages are sent to Elasticsearch in the same order they are received from Kafka for each topic partition.
Export Behavior
How data is dumped with ElasticsearchSink
primarily depends on two parameters: the
document_id_setter
and mapper
(with sensible defaults already defined).
Specifying Index Field Types
To specify an Index's field types, you generally pass an Elasticsearch mapping
dict.
By default, a simple "dynamic" mapping is used, which determines a field's type based on the initial type encountered.
ex: if the first encountered instance of field
quantity
is5
, then that field is assumed to be aninteger
field.
Using a Custom Mapping
You can override (or add to) the default behavior by passing your own mapping.
Mapping Example
Here we specify that the field host
should be a keyword
, otherwise
keep the default behavior:
custom_mapping = {
"mappings": {
"dynamic": "true", # keeps default behavior
"properties": {
"host": { # enforce type for `host` field
"type": "keyword"
}
},
},
}
elasticsearch_sink = ElasticsearchSink(
# ... other args here ...
mapping=custom_mapping,
)
To learn more, check out the Elasticsearch mappings docs.
Document ID setting
The default document_id_setter
assumes the message key
corresponds to an equivalently
named document _id
, so it attempts to match on that for updating.
If no such _id
exists (and ElasticsearchSink
has upsert=True
), the document will instead
be created with that _id
.
Using A Custom _id
A custom _id
can be used by simply providing your own
document_id_setter
to ElasticsearchSink
(which should return a string, or None
).
If no match is found, the document will instead be created with that _id
.
If no document_id_setter
or _id
specification is specified, Elasticsearch will
create a new document with a random unique _id
.
id setter Example
from quixstreams.sinks.community.elasticsearch import ElasticsearchSink
from quixstreams.sinks.base.item import SinkItem
def get_last_name(batch_item: SinkItem) -> str:
return batch_item.value["name"]["last"]
sink = ElasticsearchSink(
..., # other required stuff
document_id_setter=get_last_name,
)
Include Message Metadata
You can include topic
(topic, partition, offset) and message
(key, headers, timestamp) metadata using the flags
add_topic_metadata=True
and add_message_metadata=True
for ElasticsearchSink
.
They will be included as
__{field}
in the document.
Example document with add_message_metadata=True
:
{
"field_x": "value_a",
"field_y": "value_b",
"__key": b"my_key",
"__headers": {},
"__timestamp": 1234567890,
}
How To Use
Create an instance of ElasticsearchSink
and pass it to the StreamingDataFrame.sink()
method:
import os
from quixstreams import Application
from quixstreams.sinks.community.elasticsearch import ElasticsearchSink
app = Application(broker_address="localhost:9092")
topic = app.topic("topic-name")
# Message structured as:
# key: "CID_12345"
# value: {"name": {"first": "John", "last": "Doe"}, "age": 28, "city": "Los Angeles"}
# Configure the sink
elasticsearch_sink = ElasticsearchSink(
url="http://localhost:9200",
index="my_index",
)
sdf = app.dataframe(topic=topic)
sdf.sink(elasticsearch_sink)
# Elasticsearch Document:
# {"_id": "CID_12345", "name": {"first": "John", "last": "Doe"}, "age": 28, "city": "Los Angeles"}
if __name__ == "__main__":
app.run()
Authenticating
To provide the most amount of flexibility for authentication, ElasticsearchSink
forwards any unusedkwargs
directly to the underlying Elasticsearch
client.
Check out the Elasticsearch
connection documentation
to learn what authentication method applies to your use case.
Configuration Options
Required
url
: Elasticsearch urlindex
: Elasticsearch index name
Optional
mapping
: a custom mapping
Default: Dynamically maps all field typesdocument_id_setter
: how to select the document id.
Default:_id
set to the kafka message key.batch_size
: how large each chunk size is with bulk
Default: 500max_bulk_retries
: number of retry attempts for each bulk batch
Default: 3ignore_bulk_upload_errors
: ignore any errors that occur when attempting an upload
Default: Falseadd_message_metadata
: include key, timestamp, and headers as__{field}
Default: Falseadd_topic_metadata
: include topic, partition, and offset as__{field}
Default: False
Additional keyword arguments are passed to the Elasticsearch
client.
Testing Locally
You can test your application using a local Elasticsearch based on the Elasticsearch running locally guide:
-
Execute in terminal from a desired folder:
-
Connect using the
api_key
printed by the output from step 1: -
Optionally, try out the included
Kibana
frontend to see your uploaded data.