CLI for Indexing data from Postgres to Elasticsearch

Avi Aryan
All things #search
Published in
6 min readNov 15, 2017

--

abc import diagram

ElasticSearch is fantastic for indexing and filtering data. But hey, you have your data on a Postgres DB in production. How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

I am going to answer these questions in this blog post. To start off, yes it is indeed possible. We at appbase.io have made an awesome CLI tool called ABC which will allow you to do this with a single command.

abc import --src_type=postgres --src_uri=<uri> <elasticsearch_uri>

THAT’s it. Seriously, this is all you need to sync a Postgres database to an ElasticSearch index. Here’s a video showing the process.

The Steps

The first step is to install ABC if you have not done so already. So go to the GitHub releases page for ABC and download the most recent version. It’s a single no-dependancy binary so put it anywhere you like. We recommended putting it inside a PATH directory so that it can be access from anywhere in the terminal.

Ensure that ABC is working by running the following command.

abc version
abc version output

Now, let’s take a Postgres database and we are going to sync it to ElasticSearch index hosted on Appbase.io.

First, we are going to create a database called ‘users’.

CREATE USER test_user with password 'test_pass';
CREATE DATABASE users with OWNER test_user;
\c users;

Next we are going to create a table called ‘users’ inside the database ‘users’.

CREATE TABLE users (
email varchar(500) NOT NULL,
name varchar(100) NOT NULL,
bio text,
PRIMARY KEY(email)
);

After that we will add some sample data to it.

insert into users values ('foo@bar.com', 'foo', 'bar');
insert into users values ('zak@barker.com', 'zak', 'ceo');

The table looks like as follows now -

users table

The Postgres test source is now complete. It’s URL is

postgresql://test_user:test_pass@127.0.0.1:5432/users

Next we are going to create the sink ElasticSearch index. We go to appbase.io and create a new app called abcpostgrestest. The complete URL to this index looks like the following.

https://USER:PASS@scalr.api.appbase.io/abcpostgrestest

So now we have both the source and the sink ready. It’s time for some ABC magic. Using this source and sink, we can build the import command. It will be as follows

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Once you run this command, you should see that the command will finish in some time with no errors. Now if you visit appbaseio dashboard, you can see that the data has been transferred to the target ElasticSearch index.

appbaseio data browser

Voila. Everything works. The data has been transferred to ElasticSearch and that too without doing anything at all. Next we will see how to make abc listen to the changes in the Postgres database.

Indexing realtime data changes from Postgres

If you are using Postgres as your production database system, there are good chances that your data is constantly changing. How to sync the Elasticsearch index with all the changes?

abc has a nifty tail mode that allows synchronising the Postgres database in realtime to an Elasticsearch index. It uses the replication slot feature of Postgres to be able to do this.

It can be enabled by passing a --tail switch.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Now run the import command again with the tail option. You might see an error with the text like as follows.

must be superuser or replication role to use replication slots

Don’t panic. This happens because you don’t have replication setup on your Postgres database which is required for the tailing feature. So you will now have to make the database user as superuser. (Read ALTER ROLE)

psql

After running the above command, you should be inside the psql shell. Now run the following command (where ‘test_user’ is your database user name).

psql> ALTER ROLE test_user WITH SUPERUSER;
alter role command output

Once that is done, you need to create some replication slots. Open postgresql.conf in the text editor and change the wal_level and max_replication_slots as follows. You will have to restart your postgres server after this. This step enables the feature of replication slots.

wal_level=logical
max_replication_slots=1

After this, you will actually create a replication slot. Go back in the psql shell, connect to the source database and run the following command.

\c users;
select * from pg_create_logical_replication_slot('users_replication_slot', 'test_decoding');
SELECT * FROM pg_replication_slots;

You should see database users in the replication slot row now. This means that the replication slot is properly setup.

replication slots list

Now let’s try the tail again.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" --replication_slot="users_replication_slot" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Run the import again. It should work this time. 😄

Also notice that the import process won’t exit when finished now. It will keep on running and will listen to the changes. Let’s add a new entry to the database and see if it reflects on the appbaseio dashboard.

\c users;
insert into users values ('tony@stark.com', 'tony stark', 'iron man');

And yes, it works. You should see a new entry in the app dashboard when you hit reload.

data updated in appbase

Try making some more changes and all of them will be reflected on the appbaseio based Elasticsearch cluster. 😎

Transforming Data before Indexing into Elasticsearch

There are times when you don’t need the data to go as it is from source to the sink. You might like to change the target type name (example — users to accounts) or you might like to remove certain fields (e.g. bio) or create new fields. For all this, we have the transforms feature in ABC. It can be used by the transform_file parameter.

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" --transform_file="transform_file.js" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

The transform_file parameter takes the path to the transform file. That file in turn contains the JavaScript transforms that should be applied to the pipeline. Let’s take the contents of transform_file.js as follows.

t.Source("source", source, "/.*/")
.Transform(omit({"fields":["bio"]}))
.Save("sink", sink, "/.*/")

In the above transform, you can see that we are going to omit the bio field from the data transfer. Now when we run the new import command, we should have the following result.

data with bio field omitted

As you can see, bio field was omitted when data reached the sink. More documentation on transform file can be found on GitHub. It supports lots of inbuilt functions like omit and even supports running custom JavaScript code as a transform. It’s a good idea to explore its documentation.

Summary

This article is part of the article series “How to index data from <X> into ElasticSearch using ABC”. If you found this interesting, consider giving this article a 👏🏻 so that others readers can find it on Medium too. 🤓

Further Reading

ABC’s README is a good place to start if you want to learn more about the application. You can also have a look at the Postgres adaptor docs. Furthermore, you may ⭐️ the repo on GitHub and watch it to stay tuned for updates.

--

--