1 /**
  2  * @fileOverview Subscription Resource class definition
  3  */
  4 
  5 var Resource = require('./resource')
  6   , Message = require('./message')
  7   , _ = require('underscore')
  8   , async = require('async')
  9   ;
 10 
 11 /**
 12  * Represents a subscription in the spire api.
 13  *
 14  * <p>There are a few ways to get events from a subscription.
 15  *
 16  * <p>The first is to call <code>subscription.retrieveEvents</code> directly.
 17  * This is the most general method, and supports a number of options.
 18  *
 19  * <p>There are convenience methods <code>subscription.poll</code and
 20  * <code>subscription.longPoll</code> which wrap <code>retrieveEvents</code>.
 21  * The only difference is that <code>subscription.poll</code> has a timeout of
 22  * 0, so the request will always come back right away, while
 23  * <code>subscription.longPoll</code> has a 30 second timeout, so the request
 24  * will wait up to 30 seconds for new events to arrive before returning.
 25  *
 26  * <p>You can also use the <code>event</code> <code>message</code> <code>join</code> and <code>part</code> events to
 27  * listen for new events on the subscription.
 28  *
 29  * <p><pre><code>
 30  *    subscription.addListener('message', function (message) {
 31  *      console.log('Message received: ' + message.content);
 32  *    });
 33  *
 34  *    subscription.addListener('join', function (join) {
 35  *      console.log('Subscription joined: ' + join.subscription_name);
 36  *    });
 37  *
 38  *    subscription.addListener('part', function (part) {
 39  *      console.log('Subscription parted: ' + part.subscription_name);
 40  *    });
 41  *
 42  *    subscription.addListener('event', function (event) {
 43  *      // This fires for messages, joins, and parts.
 44  *      console.log('Received event!');
 45  *    });
 46  *
 47  *    subscription.startListening();
 48  * </code></pre>
 49  *
 50  * <p>By default this will get all events from the beginning of time.
 51  * If you only want messages created from this point forward, pass { min_timestamp: 'now' } in the options to `startListening`:
 52  *
 53  * <p><pre><code>
 54  *    subscription.startListening({ min_timestamp: 'now' });
 55  * </code></pre>
 56  *
 57  * @class Subscription Resource
 58  *
 59  * @constructor
 60  * @extends Resource
 61  * @param {object} spire Spire object
 62  * @param {object} data Subscription data from the spire api
 63  */
 64 function Subscription(spire, data) {
 65   /**
 66    * Reference to spire object.
 67    */
 68   this.spire = spire;
 69 
 70   /**
 71    * Actual data from the spire.io api.
 72    */
 73   this.data = data;
 74 
 75 
 76   /**
 77    * Timestamp of last event received.
 78    */
 79   this.last = null;
 80 
 81   /**
 82    * Whether the subscription is currently polling for events.
 83    * You should not change this value yourself.  Use
 84    * `subscription.startListening()` and
 85    * `subscription.stopListening()` instead.
 86    */
 87   this.listening = false;
 88 
 89   this.resourceName = 'subscription';
 90 }
 91 
 92 Subscription.prototype = new Resource();
 93 
 94 module.exports = Subscription;
 95 
 96 /**
 97  * Gets the name of the subscription.
 98  *
 99  * @returns {string} Name
100  */
101 Subscription.prototype.name = function () {
102   return this.data.name;
103 };
104 
105 /**
106  * Starts long polling for the subscription.
107  *
108  * <p>The <code>message</code> and <code>messages</code> events will fire when a
109  * request comes back with messages.  The <code>message</code> event will fire
110  * once per message, while the <code>messages</code> event fires every time a
111  * request comes back with more than one message.
112  *
113  * @example
114  * subscription.addListener('message', function (message) {
115  *   console.log('Message received: ' + message.content);
116  * });
117  *
118  * subscription.startListening();
119  *
120  * // Stop Listening after 100 seconds.
121  * setTimout(function () {
122  *   subscription.stopListening();
123  *  }, 100000);
124  *
125  * <p>By default this will get all events from the beginning of time.
126  * If you only want messages created from this point forward, pass { min_timestamp: 'now' } in the options to `startListening`:
127  *
128  * <p><pre><code>
129  *    subscription.startListening({ min_timestamp: 'now' });
130  * </code></pre>
131  * @param {object} [options] Optional options argument
132  * @param {number} [options.min_timestamp] Optional min_timestamp of events to receive
133  * @param {number} [options.max_timestamp] Optional max_timestamp of events to receive
134  * @param {number} [options.last] Optional last message (same as min_timestamp)
135  * @param {number} [options.delay] Optional delay
136  * @param {number} [options.timeout] Optional timeout
137  * @param {function (err, messages)} cb Callback
138  */
139 Subscription.prototype.startListening = function (opts) {
140   this.listening = true;
141   this._listen(opts);
142 };
143 
144 /**
145  * Stops listening on the subscription.
146  */
147 Subscription.prototype.stopListening = function () {
148   this.listening = false;
149 };
150 
151 /**
152  * Gets events for the subscription.
153  *
154  * <p>This method only makes one request.  Use
155  * <code>subscription.startListening</code> to poll repeatedly.
156  *
157  * @example
158  * subscription.retrieveEvents(function (err, events) {
159  *   if (!err) {
160  *     // `events` is a hash with `messages`, `joins`, and `parts` (each possibly empty)
161  *   }
162  * });
163  *
164  * @param {object} [options] Optional options argument
165  * @param {number} [options.min_timestamp] Optional min_timestamp of events to receive
166  * @param {number} [options.max_timestamp] Optional max_timestamp of events to receive
167  * @param {number} [options.last] Optional last message (same as min_timestamp)
168  * @param {number} [options.delay] Optional delay
169  * @param {number} [options.timeout] Optional timeout
170  * @param {function (err, messages)} cb Callback
171  */
172 Subscription.prototype.retrieveEvents = function (options, cb) {
173   var subscription = this;
174   var spire = this.spire;
175   if (!cb) {
176     cb = options;
177     options = {};
178   }
179 
180   var req = this.request('events', options, function (err, eventsData) {
181     if (err) {
182       if (subscription.listeners('error').length) {
183         subscription.emit('error', err);
184       }
185       return cb(err);
186     }
187 
188     if (eventsData.messages.length) {
189       var messagesData = eventsData.messages;
190       eventsData.messages = _(eventsData.messages).map(function (messageData) {
191         return new Message(spire, messageData);
192       });
193     }
194 
195     if (!subscription.listening) {
196       return cb(null, eventsData);
197     }
198 
199     var eventTypes = ["messages", "joins", "parts"];
200 
201     _.each(eventTypes, function (eventType) {
202       if (eventsData[eventType].length) {
203         subscription.emit(eventType, eventsData[eventType])
204       }
205     });
206 
207     var allEvents = _.reduce(eventTypes, function (_allEvents, eventType) {
208       return _allEvents.concat(eventsData[eventType]);
209     }, []);
210 
211     allEvents.sort(function (a, b) {
212       return a.timestamp - b.timestamp
213     });
214 
215     if (allEvents.length) {
216       process.nextTick(function () {
217         _.each(allEvents, function (event) {
218           subscription.emit(event.type, event);
219         });
220       });
221     }
222 
223     cb(null, eventsData);
224   });
225 
226   req.on('socket', function (socket) {
227     subscription.emit('socket', socket);
228   });
229 };
230 
231 /**
232  * Alias for subscription.retreiveEvents.
233  */
234 Subscription.prototype.get = Subscription.prototype.retreiveEvents;
235 
236 /**
237  * Gets new events for the subscription.  This method forces a 0 second
238  * timeout, so the request will come back immediately, but may have an empty
239  * array of events if there are no new ones.
240  *
241  * <p>This method only makes one request.  Use
242  * <code>subscription.startListening</code> to poll repeatedly.
243  *
244  * @example
245  * subscription.poll(function (err, events) {
246  *   if (!err) {
247  *     // `events.messages` is an array of messages (possably empty)
248  *   }
249  * });
250  *
251  * @param {object} [options] Optional options argument
252  * @param {number} [options.delay] Optional delay
253  * @param {function (err, events)} cb Callback
254  */
255 Subscription.prototype.poll = function (options, cb) {
256   if (!cb) {
257     cb = options;
258     options = {};
259   }
260   options.timeout = 0;
261   this.longPoll(options, cb);
262 };
263 
264 /**
265  * Gets new events for the subscription.
266  *
267  * <p>This method defaults to a 30 second timeout, so the request will wait up to
268  * 30 seconds for a new message to come in.  You can increase the wait time with
269  * the <code>options.timeout</code> paraameter.
270  *
271  * <p>This method only makes one request.  Use `subscription.startListening` to
272  * poll repeatedly.
273  *
274  * @example
275  * subscription.longPoll({ timeout: 60 }, function (err, events) {
276  *   if (!err) {
277  *     // `events.messages` is an array of messages (possably empty)
278  *   }
279  * });
280  *
281  * @param {object} [options] Optional options argument
282  * @param {number} [options.delay] Optional delay
283  * @param {number} [options.timeout] Optional timeout
284  * @param {function (err, events)} cb Callback
285  */
286 Subscription.prototype.longPoll = function (options, cb) {
287   var subscription = this;
288   if (!cb) {
289     cb = options;
290     options = {};
291   }
292 
293   options.last = this.last;
294   options.timeout = options.timeout || 30;
295 
296   this.retrieveEvents(options, function (err, events) {
297     if (err) return cb(err);
298     if (events.last) {
299       subscription.last = events.last;
300     }
301 
302     cb(null, events);
303   });
304 };
305 
306 /**
307  * Repeatedly polls for new events until `subscription.stopListening` is
308  * called.
309  *
310  * You should use `subscription.startListening` instead of calling this method
311  * directly.
312  */
313 Subscription.prototype._listen = function (opts) {
314   var subscription = this;
315   opts = opts || {};
316 
317   opts.min_timestamp = opts.min_timestamp || opts.last;
318 
319   if (typeof opts.min_timestamp !== 'undefined') {
320     this.last = opts.min_timestamp;
321   }
322   delete opts.min_timestamp;
323   delete opts.last;
324 
325   async.whilst(
326     function () { return subscription.listening; },
327     function (cb) {
328       optsClone = _.clone(opts);
329       subscription.longPoll(optsClone, cb);
330     },
331     function () {}
332   );
333 };
334 
335 /**
336  * Requests
337  *
338  * These define API calls and have no side effects.  They can be run by calling
339  *     this.request(<request name>);
340  */
341 
342 /**
343  * Gets the events for the subscription, according to various parameters.
344  * @name events
345  * @ignore
346  */
347 Resource.defineRequest(Subscription.prototype, 'events', function (options) {
348   options = options || {};
349 
350   reqOpts = {
351     timeout: options.timeout || 0,
352     delay: options.delay || 0,
353     limit: options.limit
354   };
355 
356   if (options.last) {
357     reqOpts.last = options.last;
358   }
359 
360   if (options.min_timestamp) {
361     reqOpts.min_timestamp = options.min_timestamp;
362   }
363 
364   if (options.max_timestamp) {
365     reqOpts.max_timestamp = options.max_timestamp;
366   }
367 
368   return {
369     method: 'get',
370     url: this.url(),
371     query: reqOpts,
372     headers: {
373       'Authorization': this.authorization('events'),
374       'Accept': this.mediaType('events')
375     }
376   };
377 });
378