Stream all the things! Like data into Postgres
Have you heard of the copy in PostgresSQL? I hadn't either until Brian Carlson pointed out his new-ish module node-pg-copy-stream. It means you can use streams to quickly pipe large amounts of data straight to Postgres all in a transaction.
I found a need for it and transform streams in order to reliably copy data from a proprietary file format straight into Postgres. Now, whether I'm reading from the local filesystem or reading the file from a slow internet connection, I can pipe the stream to my transform and then pipe that straight into postgres.
The basic process is to read a stream of data, transform it into something Postgres can make sense of, then pipe that into Postgres.
Using this method, my first iteration was able to process a 2MB file containing ~200k records (after parsing) in about 20 to 30 seconds.
Here's a simple, high-level example:
// language: javascript; var fs = require('fs'); var pg = require('pg'); var copy = require('pg-copy-streams'); var someTransform = require('./some-transform'); var sql = 'copy my_table ( col_a, col_b, col_c ) from stdin'; pg.connect( DB_CONN_STRING, function( error, client, done ){ if ( error ) throw error; pgstream = client.query( copy.from( sql, [] ) ); // Keep client open until `end` or `error` pgstream.on( 'error', done ); pgstream.on( 'end', done ); // Transform prepares the data for text, csv, or binary format fs.createReadStream( './some-file.dat' ).pipe( someTransform.create() ).pipe( pgstream ); });
This is a pretty barebones example and my own projects make magic happen with MoSQL and dirac.js, but you get the idea of what can be done.
How do I write my own transform stream?
Or a better question, what is a transform stream? When do you use it? Let's checkout node's docs:
A "transform" stream is a duplex stream where the output is causally connected in some way to the input, such as a zlib stream or a crypto stream.
God that's terrible. I mean just awful. Ok, a transform stream is a stream that is both readable and writable, but you're basically taking output from one stream and transforming it into something else.
You do it one chunk at a time by implementing your own _transform method on the custom stream class. Here's the basic skeleton of an extended Transform Stream:
// language: javascript; var util = require('util'); var Stream = require('stream'); util.inherits( CustomTransform, Stream.Transform ); module.exports = CustomTransform; module.exports.create = function( options ){ return new CustomTransform( options ); }; function CustomTransform( options ){ return Stream.Transform.call( this, options ); }; CustomTransform.prototype._transform = function( chunk, encoding, callback ){ // TODO: // Transform the chunk and then push it to internal buffer this.push( chunk ); callback(); };
Our Input Stream: Weather and Audio Data
Suppose you're reading data from multiple sensors, and each writes to a file every minute. You've got barometric pressure readings, decibel (dB) levels, and sound pressure levels, but you only want the dB's. Well, let's poke around and see what the file looks like:
2014-03-13 00:00:00 Barometric Pressure: 30.20,30.10,29.90,29.80,30.10...[120 times] Sound Pressure Levels: 54.7,54.5,54.1,53.9,54.6,54.7...[120 times] dB: 55.3,55.2,54.8,54.1,55.9,56.3...[120 times] 2014-03-13 00:01:00 Barometric Pressure: 29.20,29.10,29.90,30.80,30.20...[120 times] Sound Pressure Levels: 53.6,53.4,53.1,53.9,53.6,54.1...[120 times] dB: 54.2,54.5,54.8,54.1,55.1,54.9...[120 times] 2014-03-13 00:02:00 Barometric Pressure: 30.20,30.10,29.90,29.80,30.10...[120 times] Sound Pressure Levels: 54.7,54.5,54.1,53.9,54.6,54.7...[120 times] dB: 55.3,55.2,54.8,54.1,55.9,56.3...[120 times] ...
This file format is fairly obvious:
Each minute of data is separated by two new lines
The first line of each minute is the timestamp
The first line of each reading is a label (the header) followed by the values
There are 2 readings per second
Each value is separated by a comma
It's likely that the sensors would have a few failed writes, resulting in gaps in the file, so we'll need to account for that.
In the end, the data structure gleaned from our data file should look like this:
Decibel { double value timestamp created_at }
What we'll want to do is look for the next timestamp, parse it, find the decibel values, and for each value increment the timestamp by 500 milliseconds.
Our Output Stream: Postgres
You could use CSV, but that's a little bit overkill for a stream IMO. I use the default text format. Rows are separated by new lines, columns by tabs. Column names and order are specified in the original SQL copy query:
// language: sql; copy my_table ( col_a, col_b, col_c ) from stdin
Implementing the transform stream
Actually implementing a stream can be a little tricky because we have to make the following considerations:
The source may be slow and the chunks may not be immediately processible
The source may be indefinitely large, so it's not possible to store all data in-memory
What if the first chunk you receive contains only half of a timestamp? Then you should store that in your own internal buffer and wait until you get the next part.
// language: javascript; var util = require('util'); var Stream = require('stream'); util.inherits( DecibelToPostgres, Stream.Transform ); module.exports = DecibelToPostgres; // Generally good practice to expose a factory method module.exports.create = function( options ){ return new DecibelToPostgres( options ); }; function DecibelToPostgres( options ){ var defaults = { // This is the timestamp format used in the source stream fileTimestampFormat: 'YYYY-mm-dd hh:MM:ss' }; options = options || {}; for ( var key in defaults ){ if ( !(key in options) ) options[ key ] = defaults[ key ]; } // Streams are all about maintaining, containing, and hiding state // so the stream consumer does not have to worry about it. Since // data comes into the stream as indeterminate chunks, we need to // keep track of what phase we're in in regards to data processing. // We're either searching the buffer for the next timestamp... this.lookingForTimestamp = true; // ...Or looking for the next decibel chunk this.lookingForDecibels = false; // Did we find decibels? Then we'll assume // that the next data that comes in will be decibel data this.decibelsFound = false; // Be sure to call the Transform super to setup your instance properly return Stream.Transform.call( this, options ); }; DecibelToPostgres.prototype._transform = function( chunk, encoding, callback ){ // If we previously had data that was too small // to process, add this to chunk if ( this.buffer ){ chunk = Buffer.concat([ this.buffer, chunk ]); delete this.buffer; } // If chunk is too small to process, save for later if ( this.lookingForTimestamp && DecibelToPostgres.indexOfTimestamp( chunk ) === -1 ){ this.buffer = chunk; return callback(); } // If chunk is too small to process, save for later if ( this.lookingForDecibels && DecibelToPostgres.indexOfDecibelChunk( chunk ) === -1 ){ this.buffer = chunk; return callback(); } // If chunk is too small to process, save for later if ( this.decibelsFound && DecibelToPostgres.indexOfComma( chunk ) === -1 ){ this.buffer = chunk; return callback(); } if ( this.lookingForTimestamp ){ // Slice the timestamp out of the buffer and parse this.currTimestamp = chunk.slice( DecibelToPostgres.indexOfTimestamp( chunk ) , this.options.fileTimestampFormat.length ).toString(); this.currTimestamp = new Date( this.currTimestamp ); this.lookingforTimestamp = false; this.lookingforDecibels = true; // Re-run with timestamp taken out return this._transform( chunk.slice( this.options.fileTimestampFormat.length ) , encoding , callback ); } if ( this.lookingForDecibels ){ this.lookingForDecibels = false; this.decibelsFound = true; // Re-run with dB header taken out return this._transform( // + 4 because indexOf returns start of dB:\n header DecibelToPostgres.indexOfDecibelChunk( chunk ) + 4 , encoding , callback ); } if ( this.decibelsFound ){ for ( var i = 0, l = chunk.length; i < l; i++ ){ // New line, decibel search over if ( chunk[ i ] === 10 ){ // 10=\n return this._transform( chunk.slice( i ), encoding, callback ); } // Skip comma delimiters if ( chunk[ i ] === 44 ) continue; // 44=, // Push this data to the underlying buffer property supplied by the Stream super // Separate each item in the buffer by a \t and add \n to the end // This is the content that will actually get read by consumers this.push( new Buffer([ // In the format I worked with, the number is stored as a // 16-bit Little Endian Integer multiplied by 10 data.readUInt16LE( i ) / 10 , this.currTimestamp.format('{yyyy}-{MM}-{dd} {HH}:{mm}:{ss}.{fff}') ].join('\t') + '\n') ); this.currTimestamp = this.currTimestamp.addMilliseconds(500); } } return callback(); }; // For full source, see https://gist.github.com/jrf0110/9688719 DecibelToPostgres.indexOfTimestamp = function( data, offset ){ /* ... */ }; // For full source, see https://gist.github.com/jrf0110/9688719 DecibelToPostgres.indexOfDecibelChunk = function( data, offset ){ /* ... */ }; // For full source, see https://gist.github.com/jrf0110/9688719 DecibelToPostgres.indexOfComma = function( data, offset ){ /* ... */ };
Hopefully this gives you a decent idea of how to use transform streams and PG's copy. Now go out there and transform you some data!
Oh and thanks to Paul Fraze for some proof-reading!