Summary: A discussion of why and how you would need to backfill incrementally when using log replication with a PostgreSQL input. Follow these instructions carefully.
When you choose Log Replication for your PostgreSQL database input, Alooma begins importing events based on the WAL (write-ahead log) from the moment your input connects, including all new events. But what about all the data that was already there in your database prior to when you created the input?
While creating the input, if you select Log Replication, you can mark the checkbox that says to dump and load an initial snapshot of your tables into your target data warehouse. This works as a query dump (
select * from). Because that could potentially be a very large amount of data, and loading that in one big chunk could put strain on your database and take a long time to complete, that option is best used only for small tables.
For larger PostgreSQL databases, Alooma suggests an approach where you create an incremental replication type input just to handle the backfill. This "dump" input will load the data incrementally, leveraging the Code Engine to append necessary log replication metadata to each event. This method ensures a paced incremental influx of events and uses fewer resources from the source database.
You'll need to be familiar with the Alooma interface (creating inputs) as well as the Code Engine.
Create your log replication PostgreSQL input but do NOT mark the initial snapshot checkbox. In our example below, we use
Add the following code to the Code Engine:
from datetime import datetime
Next, create the following function and call it from the beginning of your main transform function:
def add_log_replication_metadata__postgres(event): event['_metadata']['log_file'] = 0 event['_metadata']['consolidation'] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + 'Z' event['_metadata']['log_position'] = 0 event['_metadata']['deleted'] = False return event
Here's what your transform might look like:
def transform(event): if event['_metadata']['input_label'] == 'prod-mypg': add_log_replication_metadata__postgres(event)
This is an advanced procedure and you may want to contact Alooma Support to validate your Code Engine logic before running.
Ensure there is logic in place in your transform to rename the event to make sure it passes through any further processing logic just like events from the original Log Replication input:
if event['_metadata']['input_label'] == <ORIGINAL_INPUT_LABEL> + '_dump': event['_metadata']['input_label'].replace('_dump', '')
<ORIGINAL_INPUT_LABEL>with the name of your PostgreSQL Log Replication input from Step 1.
Create the "dump" input (Click Add new input on the Plumbing page in Alooma and remember to specify incremental replication) to import the dump data. Be sure to use a naming schema that works with the logic in Step 4 so the data is imported correctly.
So for our example, the "dump" input would be named
Add tables to the new "dump" input. It's a good practice to start with a single table to verify/validate the Code Engine code.
Once the import (dump) is complete, remember to pause the "dump" input to ensure that no unnecessary credit overages are accumulated. Also keep an eye on the Restream Queue to make sure events aren't being queued.
That's it! Your historical data should begin importing. Any changes to your data (insert, update, delete) will be pulled in via your Log Replication input.
Check the input state for "received_events" under each table in the request via a URL like this:
"received_events":119032 indicates that 119,032 events have been received.
You can create this REST request by adding the
rest/inputs/ endpoint into the existing URL for an input:
Where the <Input_ID> is the text between
/live/ in the example URL above. Replace
/rest/inputs and remove
/live/ from the end of the URL.
You can monitor received events via the API endpoint for the "dump" input you just created, as long as the number of events is increasing, the import is progressing.