> ## Documentation Index
> Fetch the complete documentation index at: https://sequinstream.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# How to stream Postgres to Elasticsearch

> Build real‑time search indexes with Postgres change data capture (CDC) and Elasticsearch. Learn to keep your search indices in sync with your database.

This guide shows you how to capture Postgres changes with Sequin and stream them to an Elasticsearch index.

With Postgres data flowing into Elasticsearch you can power
[full‑text search, vector retrieval, and analytics](https://www.elastic.co/docs/solutions/search) without manual re‑indexing.
By the end of this guide you'll have an Elasticsearch index that stays continuously in sync with your database.

<Tip>
  This is the how-to guide for streaming Postgres to Elasticsearch. See the [quickstart](/quickstart/elasticsearch) for a step-by-step walkthrough or the [reference](/reference/sinks/elasticsearch) for details on all configuration options.
</Tip>

## Prerequisites

If you self‑host Sequin:

1. [Install Sequin](/running-sequin)
2. [Connect a Postgres database](/connect-postgres)

If you use **Sequin Cloud**:

1. [Sequin installed](/running-sequin)
2. [Connect a Postgres database](/connect-postgres)

## Basic setup

### Prepare an Elasticsearch cluster

Sequin converts Postgres rows to JSON documents and sends them to the Elasticsearch [Bulk API](https://www.elastic.co/docs/solutions/search/elasticsearch-basics-quickstart#bulk-import).
You need a reachable cluster:

* Local development ⇒ run Elasticsearch in Docker:

  ```bash theme={null}
  curl -fsSL https://elastic.co/start-local | sh
  ```

  The script starts Elasticsearch + Kibana and prints connection details:

  ```text theme={null}
  🔌 Elasticsearch API endpoint: http://localhost:9200
  🔑 API key: <api-key>
  ```

  Use the **API key** (not the user/password) when configuring the sink.

* Production ⇒ Elastic Cloud or self‑managed cluster.

<Tip>
  If your Elasticsearch instance is running on your laptop but Sequin runs in the cloud, connect them with the Sequin CLI's
  [tunnel command](/reference/cli/overview#tunnel-sequin-cloud-only).
</Tip>

### Map Postgres tables to Elasticsearch indices

Elasticsearch stores documents in indices. As a rule of thumb create **one sink per Postgres table → one index**. That keeps the mapping consistent and simplifies queries.

Advanced scenarios – sharding, multi‑tenancy, or routing documents to multiple indices – can be handled with
[filters](/reference/filters) and soon with Sequin **Routing functions**.

### Create an Elasticsearch sink

Navigate to **Sinks → Create sink → Elasticsearch** and follow the steps below.

<Steps>
  <Step title="Source configuration">
    Under **Source** pick the table or schema you want to index in Elasticsearch.

    Optionally:

    * Select which actions (`insert`, `update`, `delete`) you care about.
    * Add [filters](/reference/filters) such as `in_stock = true` to index only a subset of rows.
  </Step>

  <Step title="Backfill existing rows">
    Enable **Initial backfill** if you want Sequin to load existing rows into Elasticsearch before streaming live changes.
  </Step>

  <Step title="Transform rows to documents">
    Your transform must emit JSON compatible with the index mapping.
    Sequin automatically builds the document `_id` from the table's primary key. [Let us know](https://github.com/sequinstream/sequin/issues/new) if you need to use a different `_id` field.

    Typical transform:

    ```elixir theme={null}
    def transform(_action, record, _changes, _metadata) do
      %{
        name: record["name"],
        description: record["description"],
        price: record["price"],
        in_stock: record["in_stock"]
      }
    end
    ```

    For dynamic schemas you can just return `record` unchanged.
    See the [Elasticsearch ingest best‑practices](https://www.elastic.co/docs/solutions/search/ingest-for-search) for mapping guidance.
  </Step>

  <Step title="Delivery settings">
    In the **Elasticsearch** card enter:

    * Endpoint URL: `http://host.docker.internal:9200` (or your cluster URL)
    * Index name: `products`
    * Authentication type: `api_key`
    * Authentication value: `<api-key>`

    Keep the default **Batch size** unless you have special throughput needs. Sequin supports up to 10,000 docs per batch.
  </Step>

  <Step title="Create the sink">
    Name the sink (e.g. `products-elasticsearch`) and click **Create sink**. Sequin queues a backfill (if selected) and then starts streaming live changes.
  </Step>
</Steps>

## Verify & debug

1. In Sequin's web console watch the **Messages** count increase.
2. Query Elasticsearch:

   ```bash theme={null}
   curl -X GET "http://localhost:9200/products/_search?pretty" \
     -H "Authorization: ApiKey <api-key>" \
     -H "Content-Type: application/json" \
     -d '{"query": {"match_all": {}}}'
   ```
3. If documents are missing:
   * Check the **Messages** tab for failed deliveries.
   * Inspect the error returned by Elasticsearch; mapping conflicts and authentication issues are common.

## Next steps

Ready to go further? Explore:

* [Deploy Sequin to production](/how-to/deploy-to-production)
* [Filters](/reference/filters)
* [Transforms](/reference/transforms)
