> ## Documentation Index
> Fetch the complete documentation index at: https://sequinstream.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Elasticsearch sink

> Stream Postgres changes directly to Elasticsearch with Sequin's Elasticsearch sink.

The **Elasticsearch sink** writes change data into an Elasticsearch index via the Elasticsearch REST API.

<Tip>
  This is the reference for the Elasticsearch sink. See the [quickstart](/quickstart/elasticsearch) for a step-by-step walkthrough or the [how-to guide](/how-to/stream-postgres-to-elasticsearch) for an explanation of how to use the Elasticsearch sink.
</Tip>

## Configuration

* **Endpoint URL**

  Base URL of your Elasticsearch cluster (for example, `https://your-es-server:9200`).

* **Index name**

  Name of the Elasticsearch index to write to. The index **must exist** before Sequin can stream data.

* **Authentication type**

  One of `api_key`, `basic`, or `bearer`.

* **Authentication value**

  Credential corresponding to the chosen authentication type.

* **Batch size** *(optional)*

  Maximum number of documents per bulk request (default `100`, maximum `10 000`).

### Authentication header formats

| Auth type | Example header                             |
| --------- | ------------------------------------------ |
| `api_key` | `Authorization: ApiKey <encoded-key>`      |
| `basic`   | `Authorization: Basic <username:password>` |
| `bearer`  | `Authorization: Bearer <token>`            |

Sequin constructs the header automatically—supply only the value shown in angle brackets.

## Transform requirements

Your [transform](/reference/transforms) must return a JSON document compatible with the target index's [mapping](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html).

Sequin sets the Elasticsearch document `_id` to the concatenation of the primary‑key column values of the Postgres row. [Let us know](https://github.com/sequinstream/sequin/issues/new) if you need to use a different `_id` field.

Because the sink always uses the `index` bulk operation, every payload must contain the **full** document. Partial updates are not supported.

### Example transforms

Index the full document:

```elixir theme={null}
def transform(_action, record, _changes, _metadata) do
  record
end
```

Index certain fields, renaming from Postgres column names to Elasticsearch field names:

```elixir theme={null}
def transform(_action, record, _changes, _metadata) do
  %{
    id: record["product_id"],
    name: record["product_name"],
    description: record["product_description"]
  }
end
```

## Bulk import behaviour

For each batch Sequin posts a request to the Elasticsearch [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html):

`POST /{index_name}/_bulk`

| Change action              | Bulk operation | Behaviour                                               |
| -------------------------- | -------------- | ------------------------------------------------------- |
| `INSERT`, `UPDATE`, `READ` | `index`        | Creates the document if missing; replaces it otherwise. |
| `DELETE`                   | `delete`       | Removes the document by `_id`.                          |

All Bulk API payloads are newline‑delimited JSON (NDJSON) and end with a newline as required by Elasticsearch.

## API endpoints used by Sequin

* `POST /{index_name}/_bulk` – indexing, replacing, and deleting documents (always used, even for a single document).
* `POST /{index_name}/_search` – invoked by **Test Connection** in the console to verify connectivity.

No other Elasticsearch endpoints are called.

## Error handling

Typical errors surfaced in the Sequin console include:

* Connection or TLS failures.
* `404` – index not found.
* `401` / `403` – authentication or authorisation failure.
* Mapping conflicts (field type mismatches).
* Malformed bulk payload (usually due to transform output).

Errors from Elasticsearch are shown verbatim in the **Messages** tab. For bulk requests Sequin maps individual item failures to each corresponding document in the batch.

## Limits

* **Batch size** is limited to `10,000` documents by Sequin. Elasticsearch's default `http.max_content_length` (100 MB) may require smaller batches.
* One sink targets one index; multiple indices require multiple sinks.
* Only `index` and `delete` bulk operations are issued. Other operations (`update`, `create`, etc.) are not used.

## Routing

The Elasticsearch sink supports dynamic routing of the `index_name` with [routing functions](/reference/routing).

Example routing function:

```elixir theme={null}
def route(action, record, changes, metadata) do
  %{
    index_name: metadata.table_name
  }
end
```

When not using a routing function, messages will be indexed into the statically configured index.
