Indexing large file / Mongo collection using API & Node

My Node-fu isn’t the strongest so I suspect my issue has to do with my code specifically, but since I had a hard time finding updated examples across the web I figured I’d ask my question here in case it helps other ‘mostly-front-end-dev’ types.

My Q: Does Algolia have a recommended Node approach to uploading large datasets?

I’ve tried three methods for my 3.5m record dataset, and while they work while testing with thousands of records, they error out when attempting the full dataset upload.

Some background info:

  1. Running DEBUG=algoliasearch* node upload.js tells me it’s flooding the API with requests.
  2. I’ve tried cranking --max-old-space-size up to 8gb and still no luck.
  3. The best results so far were running method #3 via Google Cloud Functions, pulling from a Mongo instance on Google Cloud Compute (uploaded 30k files, then errored out).

I’m guessing that tells us it’s a network/latency/concurrency thing?

Any and all feedback and guidance most welcome!


My end goal is to put the upload script on Google Cloud Functions, which has a 2GB memory limit.

My core sync code:

    index.addObjects(batch, function gotTaskID(error, content) {
        if (error) throw new Error(error);
        let recordsCount = batch.length;
        index.waitTask(content.taskID, function contentIndexed() {
            console.log('Indexed ' + recordsCount + ' records in ' + content.taskID);
        });
    });

Approach #1 - An updated version of @Bobylito’s script that uses Streams
Result: Locks up local 16GB RAM machine - no errors or timeouts

var algoliasearch = require('algoliasearch');
var stream = require( 'stream' );
var JSONStream = require('JSONStream');
var fs = require('fs');
var transform = require('stream-transform');
var Batch = require( 'batch-stream' );

getStream()
          .pipe(transform(toAlgoliaRecord))
          .pipe(new Batch({ size : 1000 }))
          .pipe(algoliaSaveStream());

function getStream() {
  var jsonData = 'source.json',
        stream = fs.createReadStream(jsonData, {encoding: 'utf8'}),
        parser = JSONStream.parse('*');
        return stream.pipe(parser);
}

function toAlgoliaRecord( data, cb ) {
  delete data._id;
  var record = data
  cb( null, record );
}

function algoliaSaveStream() {
  var appId = '<appid>';
  var apiKey = '<apikey>';
  var client = algoliasearch(appId, apiKey);
  var index = client.initIndex( 'indexname' );

  var streamToAlgolia = new stream.Stream()
  streamToAlgolia.writable = true;
  streamToAlgolia.write = function (data) {
    index.addObjects( data, function( err, content ) {
      console.log('Sending batch to Algolia');
      if (err) throw new Error(err);
      let recordsCount = data.length;
      index.waitTask(content.taskID, function() {
        console.log('Indexed ' + recordsCount + ' records in taskID: ' + content.taskID);
      });
    });
    return true;
  }
  streamToAlgolia.end = function (data) {
  }

  return streamToAlgolia;
}

Approach #2 - Promises (source)
Result - buffer.js: 503 throw new Error('"toString()" failed');

const Promise = require('bluebird');
const _ = require('lodash');
const data = require('./source');

const algoliasearch = require('algoliasearch');
const appId = '<appid>';
const apiKey = '<apikey>';
const client = algoliasearch(appId, apiKey);
const index = client.initIndex( 'indexname' );

const chunkSize = 10000;
const maxConcurrency = 1;

const chunks = _.map(data, (chunk, index) => {
  return index % chunkSize === 0 ? data.slice(index, index + chunkSize) : null; 
})
.filter(chunk => {
  return chunk; 
});

console.time('Batch indexing!');
Promise.resolve(chunks).map((chunk, index, length) => {
  console.log('CHUNK | INDEX | LENGTH\n');
  //console.log(chunk, index, length);
  console.log(chunk, index, length);
  console.log('\n================================\n')
  return chunk;
}, { concurrency: maxConcurrency })
.then((data) => {
  data = data[0];
  console.timeEnd('Batch indexing!');
  index.addObjects(data, function(err, content) {
    if (err) throw new Error(err);
    let recordsCount = data.length;
    index.waitTask(content.taskID, function() {
      console.log('Indexed ' + recordsCount + ' records in ' + content.taskID);
    });
  });
})
.catch(e => {
  console.log(e);
});

Approach #3 - pull directly from Mongo (preferred)
Result (local) - Javascript heap out of memory
Result (Google Cloud Functions) - TypeError: Callback is not a function

const algoliasearch = require('algoliasearch');

// Algolia
const appID = <appid>;
const apiKey = <apikey>;
const indexName = <indexname>;
const client = algoliasearch(appID, apiKey);
const index = client.initIndex(indexName);

// MongoDB
const Db = require('mongodb').Db;
const Server = require('mongodb').Server;
const db = new Db('database_name', new Server('127.0.0.1', '27017'));

  // Open a db connection
  db.open(function(e) {
      if (e) throw new Error(e);

      if (result) {
        // Get the collection
        db.collection('collection_name', function (error, collection) {
          if (error) throw new Error(error);

          // Get the collection count
          collection.count()
            .then(function(count) {
              const collectionCount = count;
              let processedCount = 0;
              let batch = [];

              // Iterate over the whole collection using a cursor
              collection.find().forEach(function(doc) {
                // Remove unnecessary fields
                delete doc._id;
                
                // Add doc to batch array
                batch.push(doc);
                ++processedCount;

                // Send documents by batch of 5000 to Algolia
                if (batch.length >= 5000) {
                  sendToAlgolia(batch);
                  batch = [];
                }

                // Send remaining documents
                if (processedCount === collectionCount) {
                  sendToAlgolia(batch);
                  console.log('Algolia sync complete \n');
                  db.close();
                }
              });
            });
        });
  });

  function sendToAlgolia(batch) {
    index.addObjects(batch, function gotTaskID(error, content) {
      if (error) throw new Error(error);
      let recordsCount = batch.length;
      index.waitTask(content.taskID, function contentIndexed() {
        console.log('Indexed ' + recordsCount + ' records in ' + content.taskID);
      });
    });
  }

Hi @chad, if you are able to use mongodb for this task then it’s a good approach, the overall program flow should be simple:

while(hasMoreResults from mongodb) {
saveResultsToAlgolia(arrayOfObjects);
}

Is your code complete? Because I can see if (result) while result is defined nowhere.

Also I see mongodb have ways to check if there are more documents (hasNext) and batch/limit options too (http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#batchSize).

What may happen is that your local database goes SO FAST that the number of batch requests of 5000 sent to Algolia in memory reaches a point where it no more fits in memory.

Basically you have a big INPUT bandwidth (localhost) while the output is very thin (upload to Algolia). So it means at some point in time you have 30 * 5000 objects trying to be saved in Algolia and your bandwidth can’t support that, which means the whole 30*5000 objects is now in memory.

To solve this you should wait for the batches to be sent to Algolia before asking for more data. If MongoDB respects promises then what you can try is:

if (batch.length >= 5000) {
  return sendToAlgolia(batch).then(() => {batch = []});
}

By using return …, mongoDb should wait for the objects to be sent to Algolia before asking for more data. If that’s not the case let us know.

function sendToAlgolia(batch) {
  return index.addObjects(batch);
}

There’s no need to waitTask and error handling should be done at the top most of your program, like that:

connect()
.then(() => {
  return collection.find().forEach(() => {
    return saveToAlgolia();
  })
})
.catch(err) {
  throw err;
})

This is pseudo code but what you have to keep in mind is to always return promiseCall() instead of calling it without return().

So that you can handle failures at the top root level instead of at every level.

If you need further assistance, can you create a GitHub repository with your full code in it? Thanks.

Thanks so much for the insightful response @vvo . Sincerely. You just taught a man to fish :facepunch:

Due to an impending deadline I had to upload my dataset manually for now, but I’ll be sure to test my updated script fully next month when operations reset and report back.

In case anyone else is looking to sync with Algolia via Google Cloud Functions, the code is here and the dataset (in Mongo/bson format) can be downloaded here (warning: 1GB+ file).

And my apologies for the orphaned if (result) statement in the original code…for simplicity sake I stripped out the function export required by Google Cloud Functions.

Pasting the full Google Cloud Function below for posterity sake:

const algoliasearch = require('algoliasearch');
const secrets = require('./secrets');

// Algolia
const appID = secrets.algolia.appID;
const apiKey = secrets.algolia.apiKey;
const indexName = secrets.algolia.indexName;
const client = algoliasearch(appID, apiKey);
const index = client.initIndex(indexName);

// MongoDB
const Db = require('mongodb').Db;
const Server = require('mongodb').Server;
const db = new Db('irs', new Server(secrets.gce.host, '27017'));

exports.syncWithAlgolia = function syncWithAlgolia(req, res) {
  // Open a db connection
  db.open(function(oErr) {
    if (oErr) throw new Error(oErr);

    db.admin().authenticate(secrets.gce.user, secrets.gce.password, function(aErr, result) {
      if (result) {
        // Get the collection
        db.collection('grants', function(gErr, collection) {
          if (gErr) throw new Error(gErr);

          // Get the collection count
          collection.count()
            .then(function(count) {
              const collectionCount = count;
              let processedCount = 0;
              let batch = [];

              // Iterate over the whole collection using a cursor
              return collection.find().forEach(function loopThroughCollection(doc) {
                // Remove unnecessary fields
                delete doc._id;
                
                // Add doc to batch array
                batch.push(doc);
                ++processedCount;

                // Send documents by batch of 5000 to Algolia
                if (batch.length >= 5000) {
                  return sendToAlgolia(batch).then(function sendBatchToAlgolia() {
                    batch = [];
                  });
                }

                // Send remaining documents
                if (processedCount === collectionCount) {
                  return sendToAlgolia(batch).then(function sendFinalBatchToAlgolia() {
                    res.send('Algolia sync complete \n');
                    db.close();
                  });
                }
                return false;
              });
            })
            .catch(function(cErr) {
              throw new Error(cErr);
            });
        });
      }
    });
  });

  function sendToAlgolia(batch) {
    return index.addObjects(batch);
  }
};

Great! don’t forget to put return here too ^ so it returns the Promise

Good catch! Edited my script above.

Was hoping the Node Mongo driver’s batchSize method was the answer (thanks for the suggestion!), but it appears it doesn’t actually provide data objects.

I found a way to stall the Mongo reads so as not to overflow memory (this post was incredibly helpful), but it essentially turns the script into a synchronous one.

I’d still love to find a way to do this with native promises, so any help is most welcome.

Sharing my working script below in case others find it useful. Be forewarned though, while reliable, it’s quite slow. Took over 10 minutes to sync 70k records, and is currently only half way churning through a 3.5m record set after two hours.

const algoliasearch = require('algoliasearch');
const secrets = require('./secrets');

// Algolia
const appID = secrets.algolia.appID;
const apiKey = secrets.algolia.apiKey;
const indexName = secrets.algolia.indexName;
const client = algoliasearch(appID, apiKey);
const index = client.initIndex(indexName);

// Mongo
const MongoClient = require('mongodb').MongoClient;
const f = require('util').format;
const user = encodeURIComponent(secrets.gce.user);
const password = encodeURIComponent(secrets.gce.password);
const host = encodeURIComponent(secrets.gce.host);
const database = encodeURIComponent(secrets.gce.database);
const authSource = encodeURIComponent(secrets.gce.authDatabase);
const url = f('mongodb://%s:%s@%s:27017/%s?authSource=%s',
  user, password, host, database, authSource);

exports.syncWithAlgolia = function syncWithAlgolia(req, res) {
  // Open a db connection
  MongoClient.connect(url)
    .then(db => {
      const query = {};
      const batchSize = 1000;
    
      const collection = db.collection('algolia');
      const cursor = collection.find(query);
      let currentBatch = [];
      
      // Start processing
      return cursor.next(process);
  
      function process(err, doc) {
        // Check if more docs exist
        let hasMore = doc !== null ? true : false;
    
        if (doc === null) {
          if (currentBatch.length > 0) {
            processBatch(currentBatch)
              .then(function() {
                res.send('Algolia sync complete \n');
                return db.close();
              });
          }
          console.log('Finished processing documents');
          return;
        } else {
          setTimeout(function() {
            currentBatch.push(doc);
            if (currentBatch.length % batchSize === 0) {
              processBatch(currentBatch)
                .then(function() {
                  currentBatch = [];
                  return cursor.next(process);
                });
            } else if (hasMore) {
              cursor.next(process);
            } else {
              return;
            }
          });
        }
      }
    })
    .catch(function(cErr) {
      throw new Error(cErr);
    });

  function processBatch(batch) {
    console.log('Batch sent to Algolia');
    return index.addObjects(batch);
  }
};

Hi @chad thanks for getting back here. I guess now your issue is that maybe MongoDB is too slow giving you results?

You could trace that by doing:
console.time(‘mongodb’);
// mongodb get
console.timeEnd(‘mongodb’);

Do the same with Algolia save, compare. And you will se what’s slow

Appreciate the quick response as always @vvo!

Verified that Mongo continues to flood the script with results, so no speed issues there. The slowness is a result of changing things to essentially pause Mongo until after Algolia is done indexing the previous batch.

My test dataset has an average record size of 1.27kb, which results in a roughly 5-6 second (on average) round trip from Algolia with batches of 1000.

I’ve got a working script, so I suppose we can mark this thread as solved and consider it a feature request.

I’d love to see an example Node script that can sync X million records to Algolia.
@Bobylito’s script was a great source of inspiration, but I could never get it to work with my 3m dataset and at three years old, I’d imagine there are all sorts of new ways to go about it.

Thanks again :raised_hands:

1 Like