1 /** 2 * @fileOverview Subscription Resource class definition 3 */ 4 5 var Resource = require('./resource') 6 , Message = require('./message') 7 , _ = require('underscore') 8 , async = require('async') 9 ; 10 11 /** 12 * Represents a subscription in the spire api. 13 * 14 * <p>There are a few ways to get events from a subscription. 15 * 16 * <p>The first is to call <code>subscription.retrieveEvents</code> directly. 17 * This is the most general method, and supports a number of options. 18 * 19 * <p>There are convenience methods <code>subscription.poll</code and 20 * <code>subscription.longPoll</code> which wrap <code>retrieveEvents</code>. 21 * The only difference is that <code>subscription.poll</code> has a timeout of 22 * 0, so the request will always come back right away, while 23 * <code>subscription.longPoll</code> has a 30 second timeout, so the request 24 * will wait up to 30 seconds for new events to arrive before returning. 25 * 26 * <p>You can also use the <code>event</code> <code>message</code> <code>join</code> and <code>part</code> events to 27 * listen for new events on the subscription. 28 * 29 * <p><pre><code> 30 * subscription.addListener('message', function (message) { 31 * console.log('Message received: ' + message.content); 32 * }); 33 * 34 * subscription.addListener('join', function (join) { 35 * console.log('Subscription joined: ' + join.subscription_name); 36 * }); 37 * 38 * subscription.addListener('part', function (part) { 39 * console.log('Subscription parted: ' + part.subscription_name); 40 * }); 41 * 42 * subscription.addListener('event', function (event) { 43 * // This fires for messages, joins, and parts. 44 * console.log('Received event!'); 45 * }); 46 * 47 * subscription.startListening(); 48 * </code></pre> 49 * 50 * <p>By default this will get all events from the beginning of time. 51 * If you only want messages created from this point forward, pass { min_timestamp: 'now' } in the options to `startListening`: 52 * 53 * <p><pre><code> 54 * subscription.startListening({ min_timestamp: 'now' }); 55 * </code></pre> 56 * 57 * @class Subscription Resource 58 * 59 * @constructor 60 * @extends Resource 61 * @param {object} spire Spire object 62 * @param {object} data Subscription data from the spire api 63 */ 64 function Subscription(spire, data) { 65 /** 66 * Reference to spire object. 67 */ 68 this.spire = spire; 69 70 /** 71 * Actual data from the spire.io api. 72 */ 73 this.data = data; 74 75 76 /** 77 * Timestamp of last event received. 78 */ 79 this.last = null; 80 81 /** 82 * Whether the subscription is currently polling for events. 83 * You should not change this value yourself. Use 84 * `subscription.startListening()` and 85 * `subscription.stopListening()` instead. 86 */ 87 this.listening = false; 88 89 this.resourceName = 'subscription'; 90 } 91 92 Subscription.prototype = new Resource(); 93 94 module.exports = Subscription; 95 96 /** 97 * Gets the name of the subscription. 98 * 99 * @returns {string} Name 100 */ 101 Subscription.prototype.name = function () { 102 return this.data.name; 103 }; 104 105 /** 106 * Starts long polling for the subscription. 107 * 108 * <p>The <code>message</code> and <code>messages</code> events will fire when a 109 * request comes back with messages. The <code>message</code> event will fire 110 * once per message, while the <code>messages</code> event fires every time a 111 * request comes back with more than one message. 112 * 113 * @example 114 * subscription.addListener('message', function (message) { 115 * console.log('Message received: ' + message.content); 116 * }); 117 * 118 * subscription.startListening(); 119 * 120 * // Stop Listening after 100 seconds. 121 * setTimout(function () { 122 * subscription.stopListening(); 123 * }, 100000); 124 * 125 * <p>By default this will get all events from the beginning of time. 126 * If you only want messages created from this point forward, pass { min_timestamp: 'now' } in the options to `startListening`: 127 * 128 * <p><pre><code> 129 * subscription.startListening({ min_timestamp: 'now' }); 130 * </code></pre> 131 * @param {object} [options] Optional options argument 132 * @param {number} [options.min_timestamp] Optional min_timestamp of events to receive 133 * @param {number} [options.max_timestamp] Optional max_timestamp of events to receive 134 * @param {number} [options.last] Optional last message (same as min_timestamp) 135 * @param {number} [options.delay] Optional delay 136 * @param {number} [options.timeout] Optional timeout 137 * @param {function (err, messages)} cb Callback 138 */ 139 Subscription.prototype.startListening = function (opts) { 140 this.listening = true; 141 this._listen(opts); 142 }; 143 144 /** 145 * Stops listening on the subscription. 146 */ 147 Subscription.prototype.stopListening = function () { 148 this.listening = false; 149 }; 150 151 /** 152 * Gets events for the subscription. 153 * 154 * <p>This method only makes one request. Use 155 * <code>subscription.startListening</code> to poll repeatedly. 156 * 157 * @example 158 * subscription.retrieveEvents(function (err, events) { 159 * if (!err) { 160 * // `events` is a hash with `messages`, `joins`, and `parts` (each possibly empty) 161 * } 162 * }); 163 * 164 * @param {object} [options] Optional options argument 165 * @param {number} [options.min_timestamp] Optional min_timestamp of events to receive 166 * @param {number} [options.max_timestamp] Optional max_timestamp of events to receive 167 * @param {number} [options.last] Optional last message (same as min_timestamp) 168 * @param {number} [options.delay] Optional delay 169 * @param {number} [options.timeout] Optional timeout 170 * @param {function (err, messages)} cb Callback 171 */ 172 Subscription.prototype.retrieveEvents = function (options, cb) { 173 var subscription = this; 174 var spire = this.spire; 175 if (!cb) { 176 cb = options; 177 options = {}; 178 } 179 180 var req = this.request('events', options, function (err, eventsData) { 181 if (err) { 182 if (subscription.listeners('error').length) { 183 subscription.emit('error', err); 184 } 185 return cb(err); 186 } 187 188 if (eventsData.messages.length) { 189 var messagesData = eventsData.messages; 190 eventsData.messages = _(eventsData.messages).map(function (messageData) { 191 return new Message(spire, messageData); 192 }); 193 } 194 195 if (!subscription.listening) { 196 return cb(null, eventsData); 197 } 198 199 var eventTypes = ["messages", "joins", "parts"]; 200 201 _.each(eventTypes, function (eventType) { 202 if (eventsData[eventType].length) { 203 subscription.emit(eventType, eventsData[eventType]) 204 } 205 }); 206 207 var allEvents = _.reduce(eventTypes, function (_allEvents, eventType) { 208 return _allEvents.concat(eventsData[eventType]); 209 }, []); 210 211 allEvents.sort(function (a, b) { 212 return a.timestamp - b.timestamp 213 }); 214 215 if (allEvents.length) { 216 process.nextTick(function () { 217 _.each(allEvents, function (event) { 218 subscription.emit(event.type, event); 219 }); 220 }); 221 } 222 223 cb(null, eventsData); 224 }); 225 226 req.on('socket', function (socket) { 227 subscription.emit('socket', socket); 228 }); 229 }; 230 231 /** 232 * Alias for subscription.retreiveEvents. 233 */ 234 Subscription.prototype.get = Subscription.prototype.retreiveEvents; 235 236 /** 237 * Gets new events for the subscription. This method forces a 0 second 238 * timeout, so the request will come back immediately, but may have an empty 239 * array of events if there are no new ones. 240 * 241 * <p>This method only makes one request. Use 242 * <code>subscription.startListening</code> to poll repeatedly. 243 * 244 * @example 245 * subscription.poll(function (err, events) { 246 * if (!err) { 247 * // `events.messages` is an array of messages (possably empty) 248 * } 249 * }); 250 * 251 * @param {object} [options] Optional options argument 252 * @param {number} [options.delay] Optional delay 253 * @param {function (err, events)} cb Callback 254 */ 255 Subscription.prototype.poll = function (options, cb) { 256 if (!cb) { 257 cb = options; 258 options = {}; 259 } 260 options.timeout = 0; 261 this.longPoll(options, cb); 262 }; 263 264 /** 265 * Gets new events for the subscription. 266 * 267 * <p>This method defaults to a 30 second timeout, so the request will wait up to 268 * 30 seconds for a new message to come in. You can increase the wait time with 269 * the <code>options.timeout</code> paraameter. 270 * 271 * <p>This method only makes one request. Use `subscription.startListening` to 272 * poll repeatedly. 273 * 274 * @example 275 * subscription.longPoll({ timeout: 60 }, function (err, events) { 276 * if (!err) { 277 * // `events.messages` is an array of messages (possably empty) 278 * } 279 * }); 280 * 281 * @param {object} [options] Optional options argument 282 * @param {number} [options.delay] Optional delay 283 * @param {number} [options.timeout] Optional timeout 284 * @param {function (err, events)} cb Callback 285 */ 286 Subscription.prototype.longPoll = function (options, cb) { 287 var subscription = this; 288 if (!cb) { 289 cb = options; 290 options = {}; 291 } 292 293 options.last = this.last; 294 options.timeout = options.timeout || 30; 295 296 this.retrieveEvents(options, function (err, events) { 297 if (err) return cb(err); 298 if (events.last) { 299 subscription.last = events.last; 300 } 301 302 cb(null, events); 303 }); 304 }; 305 306 /** 307 * Repeatedly polls for new events until `subscription.stopListening` is 308 * called. 309 * 310 * You should use `subscription.startListening` instead of calling this method 311 * directly. 312 */ 313 Subscription.prototype._listen = function (opts) { 314 var subscription = this; 315 opts = opts || {}; 316 317 opts.min_timestamp = opts.min_timestamp || opts.last; 318 319 if (typeof opts.min_timestamp !== 'undefined') { 320 this.last = opts.min_timestamp; 321 } 322 delete opts.min_timestamp; 323 delete opts.last; 324 325 async.whilst( 326 function () { return subscription.listening; }, 327 function (cb) { 328 optsClone = _.clone(opts); 329 subscription.longPoll(optsClone, cb); 330 }, 331 function () {} 332 ); 333 }; 334 335 /** 336 * Requests 337 * 338 * These define API calls and have no side effects. They can be run by calling 339 * this.request(<request name>); 340 */ 341 342 /** 343 * Gets the events for the subscription, according to various parameters. 344 * @name events 345 * @ignore 346 */ 347 Resource.defineRequest(Subscription.prototype, 'events', function (options) { 348 options = options || {}; 349 350 reqOpts = { 351 timeout: options.timeout || 0, 352 delay: options.delay || 0, 353 limit: options.limit 354 }; 355 356 if (options.last) { 357 reqOpts.last = options.last; 358 } 359 360 if (options.min_timestamp) { 361 reqOpts.min_timestamp = options.min_timestamp; 362 } 363 364 if (options.max_timestamp) { 365 reqOpts.max_timestamp = options.max_timestamp; 366 } 367 368 return { 369 method: 'get', 370 url: this.url(), 371 query: reqOpts, 372 headers: { 373 'Authorization': this.authorization('events'), 374 'Accept': this.mediaType('events') 375 } 376 }; 377 }); 378