Skip to main content
2 of 2
added 1914 characters in body
slm
  • 379.8k
  • 127
  • 793
  • 897

nodejs sub processes in nohup bash script lock up

I run scripts with nohup that contain a list of a few hundred to a few thousand nodejs commands. These nodejs sub processes sync data from mysql and salesforce to couchdb.

$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh

The script:

#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....

In a terminal I observe the sub processes stalling:

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ kill 6333

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6423 ?        00:00:00 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6449 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

NOTE: 30097 is the pid of the nohup process.

Examining the log before and after killing the subprocess, I see the next nodejs command is executed in sequence. I've tried running them with --debug flag for verbose output but I do not see anything unusual.

Additional notes

  • Nodejs has a memory limit of 1GB.

  • Couchdb defaults to 2048 max connections.

  • The contents of mf-sync.js.

      #!/usr/bin/env node
      process.title = 'mf-sync';
    
      var path = require('path')
      ,   fs = require('fs')
      ,   _ = require('underscore');
    
      // Parse command-line arguments
      var args = _.chain(process.argv).rest(2).map(function(arg) {
          arg = arg.replace('--', '').split('=');
          _.size(arg) === 1 && arg.push(true);
          return arg;
      }).object().value();
    
      if (!args.mfi) throw new Error('MFI ID not specified');
      if (!args.source) throw new Error('Source ID not specified');
    
      // Output when using `--debug` flag
      var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
    
      // Simulation mode
      var simulate = _.has(args, 'simulate');
    
      require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
      simulate && console.warn('Simulation mode enabled. No changes will occurr.');
      debug(args);
    
      // Load MySQL configuration
      var my = require('mysql');
      var myConfig = require(path.join(__dirname, 'mysql.json'));
      var db = 'gold';
      if (args.source == '101027') db = 'mfdb';
      var mysql = my.createConnection(myConfig[db]);
      debug('MySQL', myConfig[db].database);
    
      // Load Salesforce configuration
      var sf = require('node-salesforce');
      var sfConfig = require(path.join(__dirname, 'salesforce.json'));
      var salesforce = new sf.Connection(sfConfig);
      debug('Salesforce', sfConfig.username);
    
      // Load CouchDB configuration
      var cradle = require('cradle');
      var couchConfig = require(path.join(__dirname, 'couchdb.json'));
      var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
      debug('CouchDB', couchConfig.mfdb.name);
    
      // Add missing function to Underscore.js
      _.mixin({
          compactObject: function(obj) {
              _.each(obj, function(v, k) {
                  if (_.isNull(v) || _.isFunction(v)) delete obj[k];
              });
              return obj;
          }
      });
    
      // Get MFI data from MySQL
      // -----------------------
      var getMySQLData = function(mfi, callback) {
          mysql.connect();
    
          // Get master MFI metadata
          debug('Getting master MFI metadata from `mfi`.');
          mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
              if (err) throw new Error(err);
              _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
          });
    
          // Define MFDB data tables
          var tables = {
              'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
              'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
              'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
              'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
          };
          // Remove table name variance
          var baseTable = _.memoize(function(table) {
              return table.replace('_usd', '').replace('_adjusted', '');
          });
    
          var docs = {};
          // Get all available MFDB data for the current `mfi_vid`
          debug('Getting all available MFDB data for the current `mfi_vid`.');
          _.each(_.keys(tables), function(key) {
              _.each(tables[key], function(table) {
                  debug('Querying', key, 'data from', table);
                  mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                      if (err) throw new Error(err);
    
                      // Create full document data
                      _.each(rows, function(row) {
                          // Create doc._id
                          var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                          debug('Processing', table, 'data for', doc_id);
    
                          // Initialize document
                          if (!docs[doc_id]) docs[doc_id] = {
                              _id: doc_id,
                              type: 'mfi-period',
                              currency: key.split('/')[0],
                              adjusted: key.split('/')[1] === 'true',
                              fiscal_year: row.fiscal_year,
                              period_type: row.period_type,
                              as_of_date: row.as_of_date
                          };
                          if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
    
                          // Extend MFDB data into document
                          debug('Adding', table, 'data to', doc_id);
                          row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
                          if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
                      });
                  });
              });
          });
    
          // Get all scenario data to create dimension hierarchy
          var tree = {};
          mysql.query("SELECT * FROM scenarios", function(err, rows) {
              debug('Processing scenario data into hierarchical tree.');
              if (err) throw new Error(err);
    
              // Get all children scenarios for any given parent
              var getChildren = function(parent) {
                  var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
                      // Remove used scenarios from master list to decrease stack size
                      _.each(_.keys(scenarios), function(scenario) {
                          rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
                      });
                  }).value();
                  if (_.isEmpty(children)) return null;
                  return children;
              }
    
              // Recursively get dimension hierarchy
              var getTree = function(hierarchy) {
                  if (_.isEmpty(hierarchy)) return;
                  _.each(_.keys(hierarchy), function(p) {
                      hierarchy[p] = getChildren(p);
                      if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
                  });
              }
    
              tree = getChildren('');
              getTree(tree);
          });
    
          // Find path to nested object property
          var findPath = _.memoize(function(needles, haystack) {
              function constructPath(haystack, needle, path) {
                  if (!_.isObject(haystack)) return false;
                  if (typeof haystack !== 'object') return false;
                  for (var key in haystack) {
                      var value = haystack[key];
                      var currentPath = _.extend([], path);
                      currentPath.push(key);
                      if (key === needle) return currentPath;
                      var foundPath = constructPath(value, needle, currentPath);
                      if (foundPath) return foundPath;
                  }
              }
              // Handle comma-separated nested hierarchies
              return _.chain(needles.split(',')).map(function(needle) {
                  return constructPath(haystack, needle, []);
              }).flatten().compact().value();
          });
          // Assign value inside a nested object property
          var deepAssign = function(obj, path, val) {
              for (var i = 0 in path) {
                  var key = path[i];
                  if (i == path.length - 1) {
                      if (typeof obj[key] === 'object') obj[key].value = val;
                      else obj[key] = val;
                  } else if (typeof obj[key] !== 'object') {
                      obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
                  }
                  obj = obj[key];
              }
          }
          // Sanitize dimension names
          var sanitizeDimensions = _.memoize(function(dimensions) {
              return _.map(dimensions, function(dimension) {
                  dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
                  if (/:/.test(dimension)) return dimension.split(':')[1];
                  else return dimension;
              });
          });
    
          // Get dimension data for all available documents
          _.each(['usd', 'local'], function(currency) {
              var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
              debug('Querying', currency, 'data from', dimensions_table);
              mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                  debug('Processing all data from', dimensions_table);
                  if (err) throw new Error(err);
                  _.each(rows, function(row) {
                      var dimension_path = findPath(row.scenarios, tree);
                      if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
                      _.each(['true', 'false'], function(adjusted) {
                          var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                          var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
                          docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
                      });
                  });
              });
          });
    
          mysql.end(function(err) {
              debug('Disconnected from MySQL', db);
              if (err) throw new Error(err);
              callback(mfi, docs);
          });
      }
    
      // Get MFI metadata from Salesforce
      // --------------------------------
      var getSalesforceData = function(mfi, docs, callback) {
          var remaining = 4;
          var done = function(mfi, docs) {
              if (--remaining === 0) {
                  callback(mfi, docs);
    
                  // Logout from Salesforce
                  salesforce.logout(function(err) {
                      debug('Logged out from Salesforce');
                      if (err) throw new Error(err);
                  });
              }
          }
    
          // Login into Salesforce
          debug('Login into Salesforce');
          salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
              if (err) throw new Error(err);
    
              // Get main MFI Metadata
              debug('Getting MFI metadata from Salesforce');
              salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
                  if (err) throw new Error(err);
                  if (result.totalSize === 0) throw new Error('MFI does not exist');
                  var record = {};
                  _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
                      // Make attributes lowercase
                      record[k.toLowerCase()] = v;
                  });
                  _.extend(mfi, record);
                  mfi.mfi_name = mfi.name;
                  done(mfi, docs);
              });
    
              // Determine whether MFI contains Social Performance Profile data
              debug('Determining whether MFI contains SP Profile data.');
              salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
                  if (err) throw new Error(err);
                  mfi.sp_profile = !_.isEmpty(result.records);
                  done(mfi, docs);
              });
    
              // Get list of MFI Network Affiliations
              debug('Getting list of MFI Network Affiliations');
              salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
                  if (err) throw new Error(err);
                  mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
                  done(mfi, docs);
              });
    
              // Get annual diamonds
              debug('Getting annual diamonds.');
              salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
                  if (err) throw new Error(err);
                  // Group diamonds by year
                  var diamonds = _.chain(result.records).map(function(period) {
                      return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
                  }).object().value();
                  // Add diamonds to corresponding periods
                  _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
                      doc.annual_diamonds = diamonds[doc.fiscal_year];
                  });
                  done(mfi, docs);
              });
          });
      }
    
      // Calculate Peer Group data
      // -------------------------
      var calculatePeerGroupData = function(docs, callback) {
          // Safely get data point value
          var getVal = function(obj, group, prop) {
              if (_.has(obj, group) && _.has(obj[group], prop)) {
                  return obj[group][prop].value || obj[group][prop];
              }
              return undefined;
          }
    
          _.each(docs, function(doc, id) {
              var peer_groups = {};
    
              // Age
              debug('Calculating peer group age for', doc._id);
              if (_.has(doc, 'date_established__c')) {
                  var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
                  if (age) {
                      if (age < 4) peer_groups['age'] = 'New';
                      else if (age <= 8) peer_groups['age'] = 'Young';
                      else if (age > 8) peer_groups['age'] = 'Mature';
                  }
              }
    
              // Intermediation
              debug('Calculating peer group intermediation for', doc._id);
              var deposits = getVal(doc, 'balance_sheet', 'deposits');
              var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
              if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
                  var ratio = deposits / total_assets;
                  if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
                  else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
                  else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
              }
              else if (total_assets === 0) {
                  peer_groups['intermediation'] = 'Non FI';
              }
    
              // Market
              debug('Calculating peer group market for', doc._id);
              var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
              var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
              if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
                  if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
                  else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
                  else if ((depth >= 1.5)  && (depth < 2.5)) peer_groups['market'] = 'High End';
                  else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
              }
    
              // Outreach
              debug('Calculating peer group outreach for', doc._id);
              var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
              if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
              else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
              else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
    
              // Scale
              debug('Calculating peer group scale for', doc._id);
              if (_.has(doc, 'mix_region__c')) {
                  var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
                  if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
                  else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
                  else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
              }
    
              // Sustainability
              debug('Calculating peer group sustainability for', doc._id);
              var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
              if (!_.isUndefined(operational_self_sufficiency)) {
                  if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
                  else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
              }
    
              if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
          });
    
          callback(docs);
      }
    
      // Send data to CouchDB
      // --------------------
      var updateCouchDB = function(docs, callback) {
          // Update master MFI record
          debug('Updating master MFI record');
          var mfi = docs.shift();
          couch.get(mfi._id, function(err, doc) {
              if (err) {
                  if (err.error === 'not_found') {
                      require('util').log('Inserting ' + mfi._id);
                      !simulate && couch.save(mfi._id, mfi, function(err, res) {
                          debug('Inserted', res);
                          if (err) throw new Error(err);
                      });
                  } else throw new Error(err);
              } else if (doc._rev) {
                  require('util').log('Updating ' + mfi._id);
                  !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
                      debug('Updated', res);
                      if (err) throw new Error(err);
                  });
              }
          });
    
          // Get list of existing IDs in CouchDB
          debug('Getting list of existing IDs in CouchDB');
          couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
              if (err) throw new Error(err);
    
              // Remove outdated documents from CouchDB
              _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
                  return _.findWhere(ids, {id: id});
              }).each(function(doc) {
                  require('util').log('Removing ' + doc.id);
                  couch.remove(doc.id, doc.value.rev, function(err, res) {
                      debug('Removed', res);
                      if (err) throw new Error(err);
                  });
              });
    
              // Insert/update all documents for this MFI
              _.each(docs, function(doc) {
                  var update = _.findWhere(ids, {id: doc._id});
                  if (update) {
                      require('util').log('Updating ' + doc._id);
                      !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
                          debug('Updated', res);
                          if (err) throw new Error(err);
                      });
                  } else {
                      require('util').log('Inserting ' + doc._id);
                      !simulate && couch.save(doc._id, doc, function(err, res) {
                          debug('Inserted', res);
                          if (err) throw new Error(err);
                      });
                  }
              });
    
              callback();
          });
      }
    
      // Initialize MFI document
      var mfi = {
          _id: 'mfi/' + args.source + '/' + args.mfi,
          type: 'mfi',
          source_id: args.source,
          mfi_id: args.mfi,
          updated: new Date()
      };
    
      getMySQLData(mfi, function(mfi, docs) {
          getSalesforceData(mfi, docs, function(mfi, docs) {
              // Merge MFI metadata into each period
              _.each(docs, function(doc, id) {
                  docs[id] = _.extend(_.clone(mfi), doc);
              });
              calculatePeerGroupData(docs, function(docs) {
                  // Convert to array for bulk updating
                  docs = _.union([mfi], _.values(docs));
                  updateCouchDB(docs, function() {
                      require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
                  });
              });
          });
      });
    

Questions

I would like to know:

  1. Why these sub processes appear to freeze? (I cannot find any evidence that the ones that freeze are any different than the ones that execute and stop).
  2. How I might be able to script stopping a sub process that freezes for several minutes, so that I do not have to kill it manually?