/**
@overview A JavaScript library for remote communication with the OpenICE system.
@author Jeff Plourde <jeff@mdpnp.org>
@License BSD 2-Clause License
*/
"use strict";
var io = require('socket.io-client');
var Emitter = require('component-emitter');
module.exports = exports = OpenICE;
Emitter(OpenICE.prototype);
Emitter(Table.prototype);
Emitter(Row.prototype);
Emitter(Sample.prototype);
/**
* Calculates a string identifier for a table.
* @param {object} data - Object containing domain, partition and topic attributes
* @access private
*/
function calcTableKey(data) {
// Use blank for null partition
data.partition = 'undefined'==typeof data.partition || null == data.partition ? "" : data.partition;
return data.domain + '-' + data.partition + '-' + data.topic;
}
/**
* Represents a data sample at a point in time.
* @constructor
* @access protected
* @param {Row} row - The parent row for this data sample.
* @param {object} msg - The message containing details for this sample including sourceTimestamp, receptionTimestamp, and sample.
*/
function Sample(row,msg) {
/**
* @public
* @property {Row} row - The parent row.
*/
this.row = row;
/**
* @public
* @property {Date} sourceTimestamp - Timestamp at the data source.
*/
this.sourceTimestamp = msg.sourceTimestamp;
/**
* @public
* @property {object} data - The sample data itself.
*/
this.data = msg.sample;
}
/**
* @access public
*/
Sample.prototype.toString = function() {
return "@"+new Date(this.sourceTimestamp)+ " " + JSON.stringify(this.data);
}
/**
* @access private
* @fires Sample#expire
*/
Sample.prototype.expire = function() {
/**
* Fired when a sample is removed from the row by expiration policy.
* @event Sample#Sample:expire
* @type {object}
* @property {Sample} sample - The expired sample
*/
this.emit('expire', {'sample': this});
this.off();
};
/**
* Represents a data row for a unique instance of table data.
* @constructor
* @access protected
* @param {Table} table - The parent table for this row.
* @param {string} rowId - Unique identifier for this row.
*/
function Row(table, rowId) {
/**
* @access public
* @property {Table} table - The parent table containing this row.
*/
this.table = table;
/**
* @access public
* @property {string} rowId - Unique identifier for this row.
*/
this.rowId = rowId;
/**
* @access public
* @property {string[]} keyValues - Invariant values (constitute the primary key) for this row.
*/
this.keyValues = {};
/**
* @access public
* @property {Sample[]} samples - Collection of data samples for this row.
*/
this.samples = [];
}
/**
* @access public
*/
Row.prototype.toString = function() {
return this.table+" "+this.rowId+" "+JSON.stringify(this.keyValues)+" "+this.samples.length;
}
/**
* Add a sample to the row
* @access private
* @param {object} data - data containing sample information
* @fires Row#expire
*/
Row.prototype.addSample = function(data) {
var sample = new Sample(this, data);
var self = this;
sample.on('expire', function(evt) {
/**
* Fired when a sample is removed from the row by expiration policy.
* @event Row#Row:expire
* @type {object}
* @property {Row} row - The Row previously containing the Sample
* @property {Sample} sample - The expired Sample
*/
self.emit('expire', {'row':self, 'sample':sample});
});
this.samples.push(sample);
while(this.samples.length>=this.table.openICE.maxSamples) {
this.samples.shift().expire();
}
/**
* Fired when a sample is added to the row
* @event Row#Row:sample
* @type {object}
* @property {Row} row - The Row containing the new sample
* @property {Sample} sample - The newly added Sample
*/
this.emit('sample', {'row':this, 'sample':sample});
};
Row.prototype.removeAllSamples = function() {
while(this.samples.length>0) {
this.samples.shift().expire();
}
};
/**
* Represents a data table.
* @constructor
* @access protected
* @param {OpenICE} openICE - The parent OpenICE connection.
* @param {int} domain - The domain containing the table.
* @param {string[]} partition - The partition containing the table.
* @param {string} topic - The topic identifier for the table.
*/
function Table(openICE, domain, partition, topic) {
/** @property {OpenICE} openICE - The parent OpenICE instances. */
this.openICE = openICE;
/** @property {int} domain - The domain containing this table. */
this.domain = domain;
/** @property {string[]} partition - The partition(s) containing this table. */
this.partition = partition;
/** @property {string} topic - The Topic identifying this table. */
this.topic = topic;
/** @property {object} rows - Rows stored by row identifier. */
this.rows = {};
}
Table.prototype.setSchema = function(schema) {
this.schema = schema;
/**
* A schema has been (re)assigned to the table
* @event Table#Table:schema
* @type {object}
* @property {Table} table - The table
* @property {object} schema - The schema
*/
this.emit('schema', {'table':this, 'schema':schema});
};
Table.prototype.addRow = function(data) {
var row = this.rows[data.identifier];
if (null == row) {
row = new Row(this, data.identifier);
var self = this;
row.on('sample', function(e) {
/**
* A Sample is added to a Row of the table
* @event Table#Table:sample
* @type {object}
* @property {Table} table - The table
* @property {Row} row - The row
* @property {Sample} sample - The new sample
*/
self.emit('sample', {'table':self, 'row':e.row, 'sample':e.sample});
});
}
row.keyValues = data.sample;
/**
* A new row is about to be added to the table
* @event Table#Table:beforeadd
* @type {object}
* @property {Table} table - The table
* @property {Row} row - The new row
*/
this.emit('beforeadd', {'table':this, 'row':row});
this.rows[data.identifier] = row;
/**
* A new row was just added to the table
* @event Table#Table:afteradd
* @type {object}
* @property {Table} table - The table
* @property {Row} row - The new row
*/
this.emit('afteradd', {'table': this, 'row':row});
};
Table.prototype.removeRow = function(data) {
var row = this.rows[data.identifier];
if (null != row) {
/**
* A row is about to be removed from the table
* @event Table#Table:beforeremove
* @type {object}
* @property {Table} table - The table
* @property {Row} row - The row about to be removed
*/
this.emit('beforeremove', {'table':this, 'row':row});
this.rows[data.identifier].off();
this.rows[data.identifier].removeAllSamples();
delete this.rows[data.identifier];
/**
* A row has been removed from the table
* @event Table#Table:afterremove
* @type {object}
* @property {Table} table - The table
* @property {Row} row - The removed row
*/
this.emit('afterremove', {'table':this, 'row':row});
}
};
Table.prototype.removeAllRows = function() {
var keys = Object.keys(this.rows);
for(var i = 0; i < keys.length; i++) {
var rowKey = keys[i];
this.removeRow({identifier:rowKey});
}
};
Table.prototype.toString = function() {
return this.domain+" "+this.partition+" "+this.topic+" "+this.schema;
};
/**
* Returns rows with matching values for the specified key fields.
* @public
* @param {object} keys - Key values to match.
*/
Table.prototype.getRows = function(keys) {
var matchingRows = [];
for (rowKey in this.rows) {
if (this.rows.hasOwnProperty(rowKey)) {
var row = this.rows[rowKey];
// does this row match the incoming filter?
var match = true;
for(key in keys) {
if(keys.hasOwnProperty(key)) {
if(keys[key] != row.keyValues[key]) {
match = false;
}
}
}
if(match) {
matchingRows.push(row);
}
}
}
return (matchingRows);
};
/**
* Represents a connection back to the OpenICE system.
* @constructor
* @access public
* @param {string} url - The URL to connect to the OpenICE system.
*/
function OpenICE(url) {
/** @property {string} url - The URL of the remote OpenICE server. */
this.url = url;
this.connection = new io(this.url);
/** @property {object} tables - Tables hashed by table key string. */
this.tables = {};
/** @property {int} maxSamples - Max samples preserved for each row. */
this.maxSamples = 100;
this.connected = false;
var self = this;
this.connection.on('dds', function(data) {
// Find the appropriate reader
var tableKey = calcTableKey(data);
var table = self.tables[tableKey];
if (null == table) {
console.log("Nonfatal unknown Table (tableKey="+tableKey+")");
return;
}
if ("Schema" == data.messageType) {
table.setSchema(data.sample);
} else if ("Add" == data.messageType) {
table.addRow(data);
} else if ("Remove" == data.messageType) {
table.removeRow(data);
} else if ("Sample" == data.messageType) {
var row = table.rows[data.identifier];
if (null == row) {
console.log("No such row for sample");
return;
}
row.addSample(data);
} else {
console.log("Unknown message:" + e.data);
}
});
this.connection.on('connect', function() {
/**
* The connection has been (re)opened
* @event OpenICE#OpenICE:open
* @type {object}
* @property {OpenICE} openICE
*/
self.emit('open', {'openICE':self});
self.unsubscribeAll();
self.subscribeAllTables();
self.connected = true;
});
this.connection.on('reconnect', function(attemptNumber) {
self.emit('open', {'openICE':self});
self.unsubscribeAll();
self.subscribeAllTables();
self.connected = true;
});
this.connection.on('reconnect_attempt', function() {
});
this.connection.on('reconnecting', function(attemptNumber) {
});
this.connection.on('reconnect_error', function(err) {
});
this.connection.on('reconnect_failed', function() {
});
this.connection.on('error', function(err) {
/**
* An error has occurred
* @event OpenICE#OpenICE:error
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {object} err - Further information about the error
*/
self.emit('error', {'openICE':self, 'err':err});
self.removeAllRows();
self.connected = false;
});
this.connection.on('disconnect', function() {
/**
* The connection has been closed
* @event OpenICE#OpenICE:close
* @type {object}
* @property {OpenICE} openICE
*/
self.emit('close', {'openICE':self});
self.removeAllRows();
self.connected = false;
});
};
OpenICE.prototype.toString = function() {
return this.url;
};
/**
* Retrieves a table by identifying information.
* If the table does not exist it is NOT created.
* @public
* @param {object} args - Contains attributes domain, partition, and topic identifying the table.
*/
OpenICE.prototype.getTable = function(args) {
var tableKey = calcTableKey(args);
return this.tables[tableKey];
};
/**
* Creates a table with identifying information (or returns existing table if already created)
* and requests table information from the server.
* @public
* @param {object} args - Contains attributes domain, partition, and topic identifying the table.
* @returns {Table}
*/
OpenICE.prototype.createTable = function(args) {
var tableKey = calcTableKey(args);
var table = this.tables[tableKey];
if (null == table) {
table = new Table(this, args.domain, args.partition, args.topic);
var self = this;
table.on('sample', function(evt) {
/**
* A new sample has been added to a row of a table
* @event OpenICE#OpenICE:sample
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {Row} row - The row
* @property {Sample} sample - The new sample
*/
self.emit('sample', {'openICE':self, 'table':evt.table, 'row':evt.row, 'sample':evt.sample});
});
table.on('schema', function(evt) {
/**
* A schema has been (re)assigned to a table
* @event OpenICE#OpenICE:schema
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {object} schema - The new schema
*/
self.emit('schema', {'openICE':self, 'table':evt.table, 'schema':evt.schema});
});
table.on('beforeremove', function(evt) {
/**
* A row will be removed from a table
* @event OpenICE#OpenICE:beforeremove
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {Row} Row - The row to be removed
*/
self.emit('beforeremove', {'openICE':self, 'table':evt.table, 'row':evt.row});
});
table.on('afterremove', function(evt) {
/**
* A row has been removed from a table
* @event OpenICE#OpenICE:afterremove
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {Row} Row - The removed row
*/
self.emit('afterremove', {'openICE':self, 'table':evt.table, 'row':evt.row});
});
table.on('beforeadd', function(evt) {
/**
* A row will be added to a table
* @event OpenICE#OpenICE:beforeadd
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {Row} Row - The row to be added
*/
self.emit('beforeadd', {'openICE':self, 'table':evt.table, 'row':evt.row});
});
table.on('afteradd', function(evt) {
/**
* A row has been added to a table
* @event OpenICE#OpenICE:beforeadd
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The table
* @property {Row} Row - The added row
*/
self.emit('afteradd', {'openICE':self, 'table':evt.table, 'row':evt.row});
});
this.tables[tableKey] = table;
/**
* A table has been added
* @event OpenICE#OpenICE:addtable
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The added table
*/
this.emit('addtable', {'openICE':this, 'table':table});
if(this.connected) {
this.subscribe(table);
}
}
return table;
};
OpenICE.prototype.subscribe = function(table) {
var message = {messageType:'Subscribe',domain:table.domain,partition:table.partition,topic:table.topic};
this.connection.emit('dds', message);
};
OpenICE.prototype.destroyAllTables = function() {
var keys = Object.keys(this.tables);
for(var i = 0; i < keys.length; i++) {
var tableKey = keys[i];
var table = this.tables[tableKey];
this.destroyTable(table);
}
};
OpenICE.prototype.removeAllRows = function() {
var keys = Object.keys(this.tables);
for(var i = 0; i < keys.length; i++) {
var tableKey = keys[i];
var table = this.tables[tableKey];
table.removeAllRows();
}
};
OpenICE.prototype.subscribeAllTables = function() {
var keys = Object.keys(this.tables);
for(var i = 0; i < keys.length; i++) {
var tableKey = keys[i];
var table = this.tables[tableKey];
this.subscribe(table);
}
};
OpenICE.prototype.unsubscribeAll = function() {
this.connection.emit('dds', {messageType:'UnsubscribeAll'});
};
/**
* Destroys a table with identifying information (or no op if it does not exist)
* and requests that the server stop sending information about the table.
* @public
* @param {object} args - Contains attributes domain, partition, and topic identifying the table.
*/
OpenICE.prototype.destroyTable = function(args) {
var tableKey = calcTableKey(args);
var table = this.tables[tableKey];
if (null != table) {
if(this.connected) {
this.unsubscribe(table);
}
table.removeAllRows();
delete this.tables[tableKey];
/**
* A table has been removed
* @event OpenICE#OpenICE:removetable
* @type {object}
* @property {OpenICE} openICE - The OpenICE object
* @property {Table} table - The removed table
*/
this.emit('removetable', {'openICE':this, 'table':table});
}
return table;
};
OpenICE.prototype.unsubscribe = function(table) {
var message = {messageType:'Unsubscribe', domain:table.domain, partition:table.partition, topic: table.topic};
this.connection.emit('dds', message);
};
OpenICE.prototype.close = function() {
this.connection.disconnect();
};